线程池优点:
降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁的消耗
提高响应速度: 当任务到达时,任务可以不需要等待线程创建就能立即执行(线程复用/线程预热prestartAllCoreThreads、prestartCoreThread)
提高线程的可管理性: 线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会降低系统的稳定性(线程切换的开销),使用线程池可以进行统一的分配,调优和监控。
ThreadPoolExecutor构造方法
|
|
corePoolSize:核心线程数大小
maximumPoolSize:最大线程数大小(maximumPoolSize-corePoolSize的值为非核心线程数大小)
keepAliveTime:非核心线程在执行完任务后,等待下一个任务的时间,超过这个时间则会销毁
workQueue:用来暂时保存任务的阻塞队列
threadFactory:用于创建工作线程的工厂,比如可以在这里给线程命名
handler:任务拒绝策略(线程池非RUNNING状态/阻塞队列已经满,线程池线程数量达到maximumPoolSize)
自带线程池创建方式分析
Executors.newFixedThreadPool(int nThreads)
|
|
假设指定mThreads = 10
1.主线程执行execute
2.执行workerCountOf(c) < corePoolSize(10),条件成立
3.执行addWorker(command, true),创建一个Worker(extends AbstractQueuedSynchronizer implements Runnable),Worker内部维护了一个Thread线程执行任务,thread启动时,会调用Worker覆写的run方法,最终调用runWorker(this)
4.进入runWorker(this)后,执行当前task后,将task置为null,然后进入getTask();
5.进入getTask()后,执行workQueue.take(),即执行LinkedBlockingQueue的take方法,默认是非公平模式(且无法指定为公平模式),然后线程会阻塞在notEmpty.await()方法中。
6.当工作线程数为10,不满足workerCountOf(c) < corePoolSize(10),执行isRunning(c) && workQueue.offer(command),即调用LinkedBlockingQueue的offer方法,执行enqueue(node)后,notFull.signal()通知队列未满,notEmpty.signal()通知正在等待的线程,某个等待的线程收到通知后,继续执行take()方法中的dequeue(),并且判断如果队列中还堆积了任务,则再次调用notEmpty.signal()通知其他正在等待的线程,最终返回当前取到的task,回到runWoker(this)中执行task.run()。
Executors.newSingleThreadPool()
|
|
Executors.newCachedThreadPool()
|
|
1.主线程执行execute
2.执行workerCountOf(c) < corePoolSize(0),条件不成立。
3.执行isRunning(c) && workQueue.offer(command),即调用SynchronousQueue的offer方法,默认是非公平模式,调用TransferStack的transfer方法,transferer.transfer(e, true, 0),满足if (timed && nanos <= 0),返回null,最终SynchronousQueue的offer返回false,条件不成立。
4.执行!addWorker(command, false),即调用addWorker(command, false),创建一个Worker(extends AbstractQueuedSynchronizer implements Runnable),Worker内部维护了一个Thread线程执行任务,thread启动时,会调用Worker覆写的run方法,最终调用runWorker(this)(此线程为非核心线程,在执行完任务后,会等待60秒,超时后,会被销毁)
5.进入runWorker(this)后,执行当前task后,将task置为null,然后进入getTask();
6.进入getTask()后,执行workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),即执行SynchronousQueue的poll方法,默认是非公平模式,调用TransferStack的transfer方法,transferer.transfer(null, true, unit.toNanos(timeout)),unit.toNanos(timeout)值为60000,满足else if (casHead(h, s = snode(s, e, h, mode))),然后线程会阻塞在awaitFulfill方法中。
6.1 如果等待超时,最终会回到runWorker(this)中的processWorkerExit(w, completedAbruptly);线程执行结束,即被销毁。
6.2,没有超时,此时第二个任务进入。进入7
7.主线程执行execute
8.执行workerCountOf(c) < corePoolSize(0),条件不成立。
9.执行isRunning(c) && workQueue.offer(command),即调用SynchronousQueue的offer方法,默认是非公平模式,调用TransferStack的transfer方法,transferer.transfer(e, true, 0),满足else if (!isFulfilling(h.mode)),执行else if (casHead(h, s=snode(s, e, h, FULFILLING|mode)))中的m.tryMatch(s),最终返回匹配的task,回到runWoker(this)中执行task.run()
线程池主要处理流程分析
|
|
工作线程Worker分析
submit(Runnable runnable)与execute(Runnable runnable)
提交到submit的runnable任务每次都会被包装成FutureTask
区别点一:
执行100次submit,runnable.toString()得到100个不同的对象。
执行100次execute, runnable.toString()得到是同一个对象。
区别点二:
提交到submit的runnable任务,最终执行的是FutureTask的run方法。异常被捕获。应用无法获取异常日志,所以要么用get拉取异常处理,要么自己写try catch把任务执行的逻辑包起来。
|
|