Java 线程池艺术探索

文章来自:http://www.54tianzhisheng.cn/2017/07/29/ThreadPool/

Wiki 上是这样解释的:Thread Pool

作用:利用线程池可以大大减少在创建和销毁线程上所花的时间以及系统资源的开销!

下面主要讲下线程池中最重要的一个类 ThreadPoolExecutor 。

ThreadPoolExecutor

ThreadPoolExecutor 构造器:

有四个构造器的,挑了参数最长的一个进行讲解。

七个参数:

corePoolSize:核心池的大小,在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;

maximumPoolSize:线程池最大线程数;

keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止;

unit:参数keepAliveTime的时间单位(DAYS、HOURS、MINUTES、SECONDS 等);

workQueue:阻塞队列,用来存储等待执行的任务;

ArrayBlockingQueue (有界队列)

LinkedBlockingQueue (无界队列)

SynchronousQueue

threadFactory:线程工厂,主要用来创建线程

handler:拒绝处理任务的策略

AbortPolicy:丢弃任务并抛出 RejectedExecutionException 异常。(默认这种)

DiscardPolicy:也是丢弃任务,但是不抛出异常

DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

CallerRunsPolicy:由调用线程处理该任务

重要方法:

execute():通过这个方法可以向线程池提交一个任务,交由线程池去执行;

shutdown():关闭线程池;

execute() 方法:

注:JDK 1.7 和 1.8 这个方法有点区别,下面代码是 1.8 中的。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

publicvoidexecute(Runnable command){

if(command ==null)

thrownewNullPointerException();

intc = ctl.get();

//1、如果当前的线程数小于核心线程池的大小,根据现有的线程作为第一个 Worker 运行的线程,新建一个 Worker,addWorker 自动的检查当前线程池的状态和 Worker 的数量,防止线程池在不能添加线程的状态下添加线程

if(workerCountOf(c) < corePoolSize) {

if(addWorker(command,true))

return;

c = ctl.get();

}

//2、如果线程入队成功,然后还是要进行 double-check 的,因为线程在入队之后状态是可能会发生变化的

if(isRunning(c) && workQueue.offer(command)) {

intrecheck = ctl.get();

// recheck 防止线程池状态的突变,如果突变,那么将 reject 线程,防止 workQueue 中增加新线程

if(! isRunning(recheck) && remove(command))

reject(command);

elseif(workerCountOf(recheck) ==0)//上下两个操作都有 addWorker 的操作,但是如果在workQueue.offer 的时候 Worker 变为 0,那么将没有 Worker 执行新的 task,所以增加一个 Worker.

addWorker(null,false);

}

//3、如果 task 不能入队(队列满了),这时候尝试增加一个新线程,如果增加失败那么当前的线程池状态变化了或者线程池已经满了然后拒绝task

elseif(!addWorker(command,false))

reject(command);

}

其中调用了 addWorker() 方法:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

privatebooleanaddWorker(Runnable firstTask,booleancore){// firstTask: 新增一个线程并执行这个任务,可空,增加的线程从队列获取任务;core:是否使用 corePoolSize 作为上限,否则使用 maxmunPoolSize

retry:

for(;;) {

intc = ctl.get();

intrs = runStateOf(c);

// Check if queue empty only if necessary.

/**

* rs!=Shutdown || fistTask!=null || workQueue.isEmpty

* 如果当前的线程池的状态 > SHUTDOWN 那么拒绝 Worker 的 add 如果 =SHUTDOWN

* 那么此时不能新加入不为 null 的 Task,如果在 workQueue 为 empty 的时候不能加入任何类型的 Worker,

* 如果不为 empty 可以加入 task 为 null 的 Worker, 增加消费的 Worker

*/

if(rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask ==null&&! workQueue.isEmpty()))

returnfalse;

for(;;) {

intwc = workerCountOf(c);

//如果当前的数量超过了 CAPACITY,或者超过了 corePoolSize 和 maximumPoolSize(试 core 而定)

if(wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))

returnfalse;

//CAS 尝试增加线程数,如果失败,证明有竞争,那么重新到 retry。

if(compareAndIncrementWorkerCount(c))// AtomicInteger 的 CAS 操作;

breakretry;

c = ctl.get();// Re-read ctl

//判断当前线程池的运行状态,状态发生改变,重试 retry;

if(runStateOf(c) != rs)

continueretry;

// else CAS failed due to workerCount change; retry inner loop

}

}

booleanworkerStarted =false;

booleanworkerAdded =false;

Worker w =null;

try{

w =newWorker(firstTask);// Worker 为内部类,封装了线程和任务,通过 ThreadFactory 创建线程,可能失败抛异常或者返回 null

finalThread t = w.thread;

if(t !=null) {

finalReentrantLock mainLock =this.mainLock;

mainLock.lock();

try{

// Recheck while holding lock.

// Back out on ThreadFactory failure or if

// shut down before lock acquired.

intrs = runStateOf(ctl.get());

if(rs < SHUTDOWN ||

(rs == SHUTDOWN && firstTask ==null)) {

if(t.isAlive())// precheck that t is startable

// SHUTDOWN 以后的状态和 SHUTDOWN 状态下 firstTask 为 null,不可新增线程

thrownewIllegalThreadStateException();

workers.add(w);

ints = workers.size();

if(s > largestPoolSize)

largestPoolSize = s;//记录最大线程数

workerAdded =true;

}

}finally{

mainLock.unlock();

}

if(workerAdded) {

t.start();

workerStarted =true;

}

}

}finally{

if(! workerStarted)

addWorkerFailed(w);//失败回退,从 wokers 移除 w, 线程数减一,尝试结束线程池(调用tryTerminate 方法)

}

returnworkerStarted;

}

示意图:

执行流程:

1、当有任务进入时,线程池创建线程去执行任务,直到核心线程数满为止

2、核心线程数量满了之后,任务就会进入一个缓冲的任务队列中

当任务队列为无界队列时,任务就会一直放入缓冲的任务队列中,不会和最大线程数量进行比较

当任务队列为有界队列时,任务先放入缓冲的任务队列中,当任务队列满了之后,才会将任务放入线程池,此时会与线程池中最大的线程数量进行比较,如果超出了,则默认会抛出异常。然后线程池才会执行任务,当任务执行完,又会将缓冲队列中的任务放入线程池中,然后重复此操作。

shutdown() 方法:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

publicvoidshutdown(){

finalReentrantLock mainLock =this.mainLock;

mainLock.lock();

try{

//判断是否可以操作目标线程

checkShutdownAccess();

//设置线程池状态为 SHUTDOWN, 此处之后,线程池中不会增加新 Task

advanceRunState(SHUTDOWN);

//中断所有的空闲线程

interruptIdleWorkers();

onShutdown();// hook for ScheduledThreadPoolExecutor

}finally{

mainLock.unlock();

}

//转到 Terminate

tryTerminate();

}

参考资料:深入理解java线程池—ThreadPoolExecutor

JDK 自带四种线程池分析与比较

1、newFixedThreadPool

创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

2、newSingleThreadExecutor

创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

3、newCachedThreadPool

创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

4、newScheduledThreadPool

创建一个定长线程池,支持定时及周期性任务执行。

四种线程池其实内部方法都是调用的 ThreadPoolExecutor 类,只不过利用了其不同的构造器方法而已(传入自己需要传入的参数),那么利用这个特性,我们自己也是可以实现自己定义的线程池的。

自定义线程池

1、创建任务类

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

packagecom.zhisheng.thread.threadpool.demo;

/**

* Created by 10412 on 2017/7/24.

* 任务

*/

publicclassMyTaskimplementsRunnable

{

privateinttaskId;//任务 id

privateString taskName;//任务名字

publicintgetTaskId(){

returntaskId;

}

publicvoidsetTaskId(inttaskId){

this.taskId = taskId;

}

publicStringgetTaskName(){

returntaskName;

}

publicvoidsetTaskName(String taskName){

this.taskName = taskName;

}

publicMyTask(inttaskId, String taskName){

this.taskId = taskId;

this.taskName = taskName;

}

@Override

publicvoidrun(){

System.out.println("当前正在执行 ******  线程Id-->"+ taskId +",任务名称-->"+ taskName);

try{

Thread.currentThread().sleep(5*1000);

}catch(InterruptedException e) {

e.printStackTrace();

}

System.out.println("线程Id-->"+ taskId +",任务名称-->"+ taskName +"  -----------  执行完毕!");

}

}

2、自定义拒绝策略,实现 RejectedExecutionHandler 接口,重写 rejectedExecution 方法

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

packagecom.zhisheng.thread.threadpool.demo;

importjava.util.concurrent.RejectedExecutionHandler;

importjava.util.concurrent.ThreadPoolExecutor;

/**

* Created by 10412 on 2017/7/24.

* 自定义拒绝策略,实现 RejectedExecutionHandler 接口

*/

publicclassRejectedThreadPoolHandlerimplementsRejectedExecutionHandler

{

publicRejectedThreadPoolHandler(){

}

@Override

publicvoidrejectedExecution(Runnable r, ThreadPoolExecutor executor){

System.out.println("WARNING 自定义拒绝策略: Task "+ r.toString() +" rejected from "+ executor.toString());

}

}

3、创建线程池

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

packagecom.zhisheng.thread.threadpool.demo;

importjava.util.concurrent.ArrayBlockingQueue;

importjava.util.concurrent.ThreadPoolExecutor;

importjava.util.concurrent.TimeUnit;

/**

* Created by 10412 on 2017/7/24.

*/

publicclassThreadPool

{

publicstaticvoidmain(String[] args){

//核心线程数量为 2,最大线程数量 4,空闲线程存活的时间 60s,有界队列长度为 3,

//ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3));

//核心线程数量为 2,最大线程数量 4,空闲线程存活的时间 60s, 无界队列,

//ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingDeque<>());

//核心线程数量为 2,最大线程数量 4,空闲线程存活的时间 60s,有界队列长度为 3, 使用自定义拒绝策略

ThreadPoolExecutor pool =newThreadPoolExecutor(2,4,60, TimeUnit.SECONDS,

newArrayBlockingQueue(3),newRejectedThreadPoolHandler());

for(inti =1; i <=10; i++) {

//创建 10 个任务

MyTask task =newMyTask(i,"任务"+ i);

//运行

pool.execute(task);

System.out.println("活跃的线程数:"+pool.getActiveCount() +",核心线程数:"+ pool.getCorePoolSize() +",线程池大小:"+ pool.getPoolSize() +",队列的大小"+ pool.getQueue().size());

}

//关闭线程池

pool.shutdown();

}

}

这里运行结果就不截图了,我在本地测试了代码是没问题的,感兴趣的建议还是自己跑一下,然后分析下结果是不是和前面分析的一样,如有问题,请在我博客下面评论!

总结

本文一开始讲了线程池的介绍和好处,然后分析了线程池中最核心的 ThreadPoolExecutor 类中构造器的七个参数的作用、类中两个重要的方法,然后在对比研究了下 JDK 中自带的四种线程池的用法和内部代码细节,最后写了一个自定义的线程池。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容