线程池

Executor.png

Executor的主要作用是解耦任务提交和任务执行(包括如何使用线程,如何调度)

class DirectExecutor implements Executor {
   public void execute(Runnable r) {
    r.run();
  }
 }

Executor本身并不表示使用线程

ExecutorService提供了关闭机制以及提交任务返回Future对象用于追踪任务执行进度或取消任务。

FutureTask.png

先分析一下AbstractExecutorService实现中用到的Future的实现类FutureTask的实现机制

状态.png

如果当前的任务是Runnable,通过RunnableAdapter转为Callable

RunnableAdapter.png
run.png

FutureTask自身实现了Runnable,包装内部的Callable或Runnable

get.png
awaitDone.png
report.png

Future#get实现机制:如果任务还没开始,调用线程加入任务的等待队列,等待任务完成或取消时被唤醒,否则等待任务到达最终状态,正常执行返回结果或异常时抛出ExecutionException异常

cancel.png
finishCompletion.png

Future#cancle实现机制:如果任务还没开始,状态改为INTERRUPTING或CANCELLED,如果支持中断,打断当前线程(参考run方法,先设置runner线程,再修改状态,所以可能当前状态是NEW,但是runner已经设置了),然后唤醒所有之前等待的线程。

AbstractExecutorService的实现机制:任务的具体执行都委托给从Executor继承的execute方法,主要实现了submit和invokeAll,invokeAny方法。

invokeAll.png
ExecutorCompletionService.png

任务的执行都委托给Executor,所有提交的任务都用QueueingFuture包装,任务执行完加入内部的BlockingQueue。

invokeAny.png

invokeAny:先提交一个任务,然后循环检查ExecutorCompletionService的阻塞队列是否有已完成的任务,有就返回,没有就再提交一个新任务,直到任务都提交完,然后阻塞。第一个任务完成后,cancel所有可以cancel的任务。

AbstractExecutorService有两个具体的子类:ThreadPoolExecutor和ForkJoinPool,ScheduledThreadPoolExecutor又继承了ThreadPoolExecutor

ThreadPoolExecutor:

线程池运行状态.png
线程池参数.png

workQueue表示任务队列,workers表示当前执行任务的线程集合。

Worker.png

Worker继承了AbstractQueuedSynchronizer,自身就是一个简单的互斥锁,实现了Runnable,Worker在构造时内部会利用ThreadFactory产生一个线程,线程启动时,执行Worker自身的run方法。

runWorker.png

Worker执行过程中,会通过getTask获取任务,每次执行任务之前都会获取worker自身的互斥锁

getTask.png

getTask通过返回null(线程池stop,或shutdown之后任务队列为空,或者动态调整参数之后线程太多,或者获取任务超时(说明任务太少了,不需要那么多线程)),控制Worker结束循环

processWorkerExit.png

Worker循环结束有两种原因:执行的任务抛出异常,getTask返回null。
如果是后者,再次检查以确保目前的线程数不低于最低要求,线程数不够时添加worker线程。因异常而结束任务循环也会添加新的worker线程。

addWorker-1.png
addWorker-2.png

添加worker失败的原因有三:线程池stop;shutdown之后任务队列为空;当前线程数超过最大线程数。worker添加成功之后,启动内部的线程,开始循环处理任务。

execute.png

关键点在于,核心线程全部启动之后,任务会先加入任务队列,只有任务队列是有界队列,且队列满了才会启动非核心线程!!!

shutdown.png
interruptIdleWorkers.png
tryTerminate.png

shutdown之后,修改状态为SHUTDOWN,然后打断所有idle线程,所谓idle,就是可以获取worker的互斥锁,说明worker当前在等待任务而不是执行任务,参考runWorker方法。如果当前所有worker正巧都在等待任务,所有worker都会被打断(processWorkerExit方法会在worker退出循环时调用,根据情况再添加worker)。tryTerminate中会先检查如果当前状态是SHUTDOWN但是任务队列不为空,不能进入terminal状态,如果当前是shutdown且任务队列为空且线程数为空,修改状态为过渡状态TIDYING,然后修改为最终状态TERMINATED。

shutdownNow.png

打断所有已经启动的worker,返回所有还未执行的任务。

awaitTermination.png

shutdown之后线程池并不一定关闭!!!所以正确的做法是shutdown之后调用awaitTermination等待所有任务执行完后所有线程被打断。

ThreadPoolExecutor.png

ThreadPoolExecutor可控制参数:
corePoolSize:核心线程数,worker数量小于corePoolSize时每次提交任务都启动一个core线程,可以使用set方法在运行时调整。
maximumPoolSize:最大线程数,包括core和非core线程,从上面的源码分析可以直到只有任务队列为有界队列时才会启动非core线程。
workQueue:任务队列,只有任务队列为有界队列时才会启动非core线程。
keepAliveTime:worker在指定时间内获取不到任务,说明此时人浮于事,需要裁员,getTask会返回null,结束获取任务超时的worker。
threadFactory:定义如何产生线程,默认直接new Thread。
handler:提交任务时任务队列满了或线程池shutdown之后的行为,默认抛出RejectedExecutionException异常,可选策略包括忽略(DiscardPolicy),在提交任务的线程中执行(CallerRunsPolicy),移除任务队列里最前面的任务(DiscardOldestPolicy)。
keepAliveTime:如果通过set设置了值,如果一个worker超过指定时间未获得任务就会timeout而结束循环,如果当前线程数超过了corePoolSize,不会再添加新的worker,默认不支持timeout。
allowCoreThreadTimeOut:默认线程数小于corePoolSize,timeout之后就会添加新的worker,如果设置了allowCoreThreadTimeOut,只有当前线程为0时才会添加新的worker。

下面分析一下ThreadPoolExecutor的子类ScheduledThreadPoolExecutor的实现机制:

ScheduledThreadPoolExecutor.png

从构造上看,主要是任务队列使用了DelayedWorkQueue,DelayedWorkQueue是一个简单的基于二叉堆实现的优先级阻塞无界队列,所有任务按触发时刻排序,keepAliveTime为0,不支持worker超时。从上文的分析可知,使用无界队列时是不会启动非core线程的,maximumPoolSize设置成了Integer.MAX_VALUE而不是corePoolSize,避免运行时修改corePoolSize时还要修改maximumPoolSize。

ScheduledFutureTask.png

所有提交的任务都会用ScheduledFutureTask包装

compareTo.png

任务先按触发时刻排序,同时触发的任务按提交顺序排序

run.png
setNextRunTime.png
triggerTime.png

如果是重复任务,任务执行完,计算下次触发时刻,重新加入任务队列。此处有一个细节:就算是fixed-rate的任务,也是上次执行完之后才会再次加入任务队列。

onShutdown.png

shutdown之后不允许提交新任务,如果是之前提交的延迟任务还没到时间或者是周期性任务,根据参数决定是否还能继续执行,默认运行继续等待执行延迟任务,不允许执行周期任务。

ForkJoinPool:日后补充!!!

下面来分析一下Executors里的静态方法构造的都是什么线程:

newFixedThreadPool.png

无界队列,不支持timeout,固定线程数。

newSingleThreadExecutor.png

newSingleThreadExecutor = newFixedThreadPool(1)

newCachedThreadPool.png

使用特殊的队列SynchronousQueue,相当于容量为1的阻塞队列,只有这样,如果已经有任务在等待执行了,再次提交任务时才会启动非core线程。

newScheduledThreadPool.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,752评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,100评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,244评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,099评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,210评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,307评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,346评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,133评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,546评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,849评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,019评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,702评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,331评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,030评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,260评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,871评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,898评论 2 351

推荐阅读更多精彩内容