前言
以下部分代码处于简便,使用了lambda表达式,需要Java 8才支持,内容本身只需要Java 1.5及以上即可。lambda表达式可以使用匿名内部类代替。望各位读者知悉。
由于多线程具有不确定性,建议读者可以将代码复制过去,自己运行多次,感受一下。你的运行结果很可能与我不一样,甚至同样代码,每次运行都不一样,属于正常现象。
正文
自Java 1.0开始,Sun(现被Oracle收购)公司推出的Java就有java.lang.Thread
类对多线程编程提供了支持。
但是实际运用中,这是一件很痛苦的事情,特别是在Java 8以前,代码中充斥了各种匿名内部类、各种申明,最痛苦的是Thread类只是一个最基本的执行单元,至于线程管理、调度都需要程序员自己负责。稍不留神,很容易出现bug,而且难以调试、重现、排查。(Java 8 中引用了类似.Net中委托概念,当然官方的叫法不称之为委托)
所幸的是,自从Java 1.5开始这种情况有所改观(实际上Java 8又再次对并发编程有所加强,有时间,下次介绍Java 8对于多线程编程的新支持)
那么,切入正题,今天要来介绍的主角就是java.util.concurrent.ExecutorService
。一个官方提供的线程池管理工具。ExecutorService的实例需要由工厂java.util.concurrent.Executors
来构造,其共有4个实例化方法:
方法 | 用途、说明 |
---|---|
newCachedThreadPool() | 带有缓存的线程池。当一个任务丢进来时,如果有可以重用的线程,则不会创建新的线程,利用闲置线程来执行任务。若没有可重用的线程,则创建新线程执行任务。池中的闲置线程默认在60秒内没有得到重用,会被强制结束,不再等待任务。 |
newFixedThreadPool() | 固定线程数的线程池。即当需要执行的任务大于线程池中允许的最大数量,则进入一个等待队列,直到池中的一个线程结束后,新的任务才会被放进来执行。 |
newScheduledThreadPool() | 可调度的线程池,用于执行周期性的任务。 |
SingleThreadExecutor() | 单线程的线程池,与newFixedThreadPool() 创建的线程池一致,但是池子的大小为1。 |
ExecutorService
一共只有三种状态,即运行、停止、关闭。当ExecutorService对象实例化完成后就处于运行状态,此时一旦一个任务丢到线程池以后,就立刻开始执行。不再需要调用start()
方法。当调用ExecutorService.shutdown()
方法后,ExecutorService就不在接受新的任务,在调用shutdown方法之前已经在运行的任务,在调用shutdown后还没有结束的,会继续运行,直到该任务结束。不会强制中断任务的执行。
一般来说,在多线程情况下,很少会强制停止一个线程的执行,这样做会导致难以预期、重现的偶发性故障。
newCachedThreadPool 例子
public class ExecuteServerExample {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
Thread.sleep(0, 10);
executorService.execute(() -> System.out.println("Invoke -> " + Thread.currentThread().getName()));
}
executorService.shutdown();
//Output:
//Invoke -> pool-1-thread-2
//Invoke -> pool-1-thread-2
//Invoke -> pool-1-thread-2
//Invoke -> pool-1-thread-1
//Invoke -> pool-1-thread-1
//Invoke -> pool-1-thread-1
//Invoke -> pool-1-thread-1
//Invoke -> pool-1-thread-1
//Invoke -> pool-1-thread-1
//Invoke -> pool-1-thread-1
// 结果随机
}
}
上面这个例子可以看出来,创建10个任务,但是至创建了两个线程(每次运行结果都不一样,也可能创建了10个线程,甚至只用了1个),就以我这次运行结果为例,说明前三个任务,都是前一个已经结束了,后一个任务还没有分配下来,因此一直复用同一个线程。(其实从ThreadName看得出,thread2 应该是后创建的线程,之所以会显示在前面,是因为标准输出流是有缓冲区,并不是实时输出,且先运行的线程并不代表一定先完成)
代码中的lambda表达式,实际上是个Runnable接口的实例。Java 8以下版本可以用匿名内部类来跑起来这段小代码。(下同,略)
newFixedThreadPool 例子
public class ExecuteServerExample {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(3); // 线程池最多允许3个线程同时运行
for (int i = 0; i < 10; i++) { // 创建10个任务
executorService.execute(() -> {
try {
Thread.sleep(500); // 暂停0.5秒,为了看到列队效果
} catch (InterruptedException e) {
e.printStackTrace();
}
System.err.println("Invoke -> " + Thread.currentThread().getName());
});
}
executorService.shutdown();
//Output:
//Invoke -> pool-1-thread-1
//Invoke -> pool-1-thread-3
//Invoke -> pool-1-thread-2
//Invoke -> pool-1-thread-1
//Invoke -> pool-1-thread-3
//Invoke -> pool-1-thread-2
//Invoke -> pool-1-thread-1
//Invoke -> pool-1-thread-3
//Invoke -> pool-1-thread-2
//Invoke -> pool-1-thread-1
}
}
从输出结果可以看到,一共只创建了3个线程,当第四个任务丢进来的时候,并没有被执行,而是等待前三个有一个结束了,第四个才可以运行。可以把代码自己跑一遍,感受一下中间有明显的停顿。
newScheduledThreadPool 例子
public class ExecuteServerExample {
public static class MyThread implements Runnable {
private final String name;
public MyThread(String name) {
this.name = name + " -> ";
}
@Override
public void run() {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.err.println(this.name + new Date().getSeconds());
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newScheduledThreadPool(3);
((ScheduledExecutorService) executorService).scheduleAtFixedRate(new MyThread("FixedRate"), 1, 2, TimeUnit.SECONDS);
//((ScheduledExecutorService) executorService).scheduleWithFixedDelay(new MyThread("FixedDelay"), 1, 2, TimeUnit.SECONDS);
//executorService.shutdown();
//FixedRate -> 50
//FixedRate -> 52
//FixedRate -> 54
//FixedRate -> 56
//FixedRate -> 58
//FixedRate -> 0
//FixedRate -> 2
//FixedRate -> 4
//FixedDelay -> 11
//FixedDelay -> 14
//FixedDelay -> 18
//FixedDelay -> 21
//FixedDelay -> 25
//FixedDelay -> 28
}
}
两行执行计划任务,交换执行对比一下结果,应该是类似我下面注释贴的差不多。这里为了看到结果,没有终止线程池,因此实际运行时,请按Ctrl + C
终止Java进程的运行。
这里说说scheduleAtFixedRate
与scheduleWithFixedDelay
的区别,首先两者的参数是一样的。第一个参数是Runnable实例;第二个是丢到线程池后多久第一次执行;第三个参数是每次间隔多久执行一次;第四个参数指定第二、第三个参数的时间单位。
区别在于scheduleAtFixedRate
是从任务开始运行就计算时间,当任务实际运行时间小于等于scheduleAtFixedRate指定的间隔时,则按照指定的间隔周期重复运行。当任务实际运行时间大于scheduleAtFixedRate指定的间隔时,则指定任务完成后立即开始下一次任务,中间没有间隔;而scheduleWithFixedDelay
则不管任务用了多长时间,从任务结束开始计算间隔时间,然后周期执行下次任务。通俗地讲,可以有下面的公式:
scheduleWithFixedDelay
实际的执行周期 = 函数指定的间隔时间 + 任务本身执行所需的时间-
当任务运行所需时间小于等于
scheduleAtFixedRate
指定的间隔周期时-
scheduleAtFixedRate
实际的执行周期 = 函数指定的间隔时间
-
-
当任务所需时间大于
scheduleAtFixedRate
指定的间隔周期时-
scheduleAtFixedRate
实际的执行周期 = 任务本身执行所需的时间
-
执行更为复杂的周期性任务
上面的ScheduledExecutor
只能执行相对简单的周期性任务,基于固定的时间周期来运行。稍微复杂一点的周期性任务就难以完成,例如某个任务要求每周一下午3点整开始执行,或者是每个工作日早上8点执行某个任务。为了解决这样的问题,可以使用Calendar
间接达到这样的目的。在代码中加入条件判断,另外也可以使用Quartz、JCronTab等第三方包实现相关的功能。由于这些不是Java原生内容,这里不作展开。各位读者可以自行搜索相关资料。
有返回值的任务调度
上面的例子中,所有任务都是实现了Runnable接口,因此都是没有返回值的,调用者也无法知道任务的执行情况。显然Java的设计者也考虑到这个问题,不可能所有任务都是不关注结果的。因此,在ExecutorService中除了可以接受Runnable接口的实例外,还可以接受Callable
接口的实例。而Callable是具有返回值的。除此以外,两者还有一个显著的区别,即Runnable是不允许抛出任何异常的,而Callable允许抛出Checked Exception。
闲言碎语不要讲,先看例子:
public class ExecuteServerExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(1);
final int threadCount = 5;
Future<String>[] threadsResult = new Future[threadCount];
// 创建任务
for (int i = 0; i < threadCount; i++) {
final int tmp = i;
threadsResult[i] = executorService.submit(() -> {
Thread.sleep(500);
return String.format("Fixed thread running in %d", tmp);
});
}
// 获得线程返回结果
for (Future<String> threadResult : threadsResult)
System.err.println(threadResult.get());
executorService.shutdown();
//Fixed thread running in 0
//Fixed thread running in 1
//Fixed thread running in 2
//Fixed thread running in 3
//Fixed thread running in 4
}
}
为了是运行过程中,结果更容易被人感知到,这里用了newFixedThreadPool。
首先,从输出结果来看,线程内的返回值,确实被调用者拿到了;其次,这里把任务丢到线程池不再使用execute方法,而是使用了submit方法。两者基本上差不多,都是调用该方法后,任务即被执行(具体取决于线程池的调度策略)。使用上两者几乎没有什么区别,只不过后者可以有返回值并且允许抛出异常。
需要留意的是,当获取结果,调用Future.get()
方法时,如果对应的任务还没有执行或没有执行完毕,该方法会阻塞调用线程,直到该任务执行完毕然后返回结果。因此实际使用时,可以先调用Future.isDone()
判断任务是否已完成,在决定是否调用get方法。
自定义线程池
虽然Java的设计者们已经智者千虑,但是难以考虑到实际情况的复杂情况,因此必有一失,以上介绍的种种也会无法满足实际需求。那怎么办呢?显然不可能自己从头到脚重新实现一遍线程池,且不说难度巨大,容易出纰漏,作为程序员应该是极其排斥做重复性劳动的。造轮子的任务显然不应该由我们来做,所以... ...隆重介绍自定义线程池,充分满足你的猎奇...不对,是偷懒心理。
同样的,憋说话,翠花,上代码:
public class ExecuteServerExample {
public static void main(String[] args) {
BlockingQueue<Runnable> waitQueue = new ArrayBlockingQueue(10); // 线程池满后的等待队列
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, waitQueue);
final int threadCount = 10;
for (int i = 0; i < threadCount; i++) {
final int tmp = i;
poolExecutor.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.err.printf("Index %d -> Thread %s is running.%n", tmp, Thread.currentThread().getName());
});
}
poolExecutor.shutdown();
//Index 0 -> Thread pool-1-thread-1 is running.
//Index 1 -> Thread pool-1-thread-2 is running.
//Index 2 -> Thread pool-1-thread-1 is running.
//Index 3 -> Thread pool-1-thread-2 is running.
//Index 4 -> Thread pool-1-thread-1 is running.
//Index 5 -> Thread pool-1-thread-2 is running.
//Index 6 -> Thread pool-1-thread-1 is running.
//Index 7 -> Thread pool-1-thread-2 is running.
//Index 8 -> Thread pool-1-thread-1 is running.
//Index 9 -> Thread pool-1-thread-2 is running.
}
}
从输出结果可以看出来,一个10个任务在两个线程上执行。这里提一下构造方法的参数含义:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
参数名 | 含义 |
---|---|
corePoolSize | 线程池中保留的核心线程数,包括闲置的线程。 |
maximumPoolSize | 线程池中允许的最大线程数。 |
keepAliveTime | 闲置线程在线程池中允许存活的最大时间 |
unit | keepAliveTime指定的时间单位 |
workQueue | 因线程池满,而出现的任务等待队列。该队列仅保存Runnable任务 |
而当一个任务通过execute方法丢到线程池后,遵循这样的规则:
- 若线程池中当前的线程数小于corePoolSize,则无论如何都会创建新的线程执行任务。
- 若线程池中的线程数大于等于corePoolSize:
- 且workQueue没有满,即等待队列中还有空位,则将新的任务放置到等待队列中,按照先进先出的原则,依次排队等待被执行
- 且workQueue已满:
- 当前的线程数小于maximumPoolSize,则会创建新的线程来处理被添加的新任务
- 当前的线程数大于等于maximumPoolSize时,有4种处理结果(需要调用6个或7个参数的构造方法指定,默认的处理方式为直接终止),这里不继续展开,有兴趣的可以看看
java.util.concurrent.RejectedExecutionHandler
的注释。
通俗地讲,线程池同时允许接受的最大任务数 = maximumPoolSize + workQueue.length,一旦超过这个数字,就会引发异常处理,默认的处理方式就是终止。(通过源码可以看到)(实际上有例外,在最后介绍等待队列时有提到,无界队列的最大任务数是无穷大,受限于物理内存的大小)
实践出真知,各位可以通过不断改变上面代码中ThreadPoolExecutor构造方法的第一个、第二个参数以及threadCount的值来观察运行结果,以便更直观的了解。
下面来看看Executors.newCachedThreadPool()
方法是如何构造一个线程池的:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
他首先将corePoolSize设置为0,即不保留任何线程;maximumPoolSize设置为无限大。则一个新的任务进来时,首先会检查有没有闲置线程,没有则创建新的线程执行任务。当闲置线程到达存活最大周期后被销毁(60秒),当所有任务结束后,60秒内没有任何新的任务进来,则线程池内的所有线程会被全部销毁,此时为一个空的线程池。至于SynchronousQueue,最后再讲。
再来看看Executors.newFixedThreadPool()
是如何构造的
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
核心线程数和最大线程数均设置成固定值,因此达到了线程池内无论如何都是指定线程数的目的,不会随着任务增加而增加新的线程。同时keepAliveTime被设置成0,即任务一旦完成就将线程销毁,不保留闲置线程。
这里可以看到我自定义线程池中用的等待队列是ArrayBlockingQueue,上面两段源码中还出现了SynchronousQueue和LinkedBlockingQueue。这边简单提一下不同队列的区别:
队列 | 策略 |
---|---|
SynchronousQueue | 同步队列。它将任务直接交给线程处理而不保持它们。如果不存在可用于立即运行任务的线程(即线程池中的线程都在工作),则试图把任务加入缓冲队列将会失败,因此会构造一个新的线程来处理新添加的任务,并将其加入到线程池中。直接提交通常要求无界 maximumPoolSizes(Integer.MAX_VALUE) 以避免拒绝新提交的任务。(就像Executors.newCachedThreadPool() 一样,但这不是必须的) |
LinkedBlockingQueue | 无界队列。该队列会导致在所有 corePoolSize 线程都工作的情况下将新任务加入到缓冲队列中(由于是采用链表的数据结构,因此实际上的最大值取决于内存的大小,通常认为是无穷大)。这样,创建的线程就不会超过 corePoolSize(回顾上面提到的execute方法执行的策略),也因此,maximumPoolSize 的值也就无效了。 |
ArrayBlockingQueue | 有界队列。采用该队列即能够同时接受的最大任务数为队列长度 + maximumPoolSize。该队列有助于硬件资源在程序员疏忽或其他意外情况下被消耗殆尽。但是由于不便于动态调整大小,需要精心设计队列长度与maximumPoolSizes。 |
当前,实际使用时,可以在有界队列与无界队列中折中取舍。例如可以继承LinkedBlockingQueue类,设置一个警戒的队列长度,一旦超出该长度通过日志或其他方式记录或提示相关技术人员,同时不至于系统拒绝接收新任务导致其他问题。
总的来说,这次就是介绍了Java 1.5开始提供一个新的多线程模型。一般的运用还是比较简单的,有时间下次介绍一下Java 8中再次对多线程、并发应用的增强。
我个人以为,代码就是一个实践出真知的东西,理论再多没有实践都是虚的。当然,需要理论来指导实践,但这需要达到一定高度以后。最主要的也是因为代码比较直观,能够看出区别,而理论上晦涩难懂的语言,不管是英文原著还是中文译注,都让我感觉很难懂,大概主要还是水平不够吧。