前言
在编程中经常会使用线程来异步处理任务,但是每个线程的创建和销毁都需要一定的开销。如果每次执行一个任务都需要一个新进程去执行,则这些线程的创建和销毁将消耗大量的资源;并且线程都是“各自为政”的,很难对其进行控制,更何况有一堆的线程在执行。这时候就需要线程池来对线程进行管理。在Java 1.5中提供了Executor框架用于把任务的提交和执行解耦。任务的提交交给RUnnable或者Callable,而Executor框架用来处理任务。Executor框架中最核心的成员就是ThreadPoolExecutor,它是线程池的核心实现类。本篇文章就着重讲解ThreadPoolExecutor。
ThreadPoolExecutor介绍
可以通过ThreadPoolExecutor开创建一个线程池,ThreadPoolExecutor类一共有四个构造方法。下面展示的都是拥有最多参数的的构造方法。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize:
核心线程数。默认情况下线程池是空的,只有任务提交时才会创建线程。如果当前运行的线程数少于corePoolSize,则会创建新线程来处理任务;如果等于或者多于corePoolSize,则不会创建,如果调用线程池的prestartAllcoreThread()
方法,线程池会提前创建并启动所有核心线程来等待任务。maximumPoolSize:
线程池允许创建的最大线程数,如果任务队列满了并且线程数小于maximumPoolSize时,则线程池仍旧会创建新的线程来处理任务。keepAliveTime:
非核心线程闲置的超时时间,超过这个事件则回收,如果任务很多,并且每个任务的执行事件很短,则可以调用keepAliveTime来提高线程的利用率。另外,如果设置allowCoreThreadTimeOut属性为true时,keepAliveTime也会应用到核心线程上。TimeUnit:
keepAliveTime参数的时间单位,可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、秒(SECONDS)、毫秒(MILLOSECONDS)等。BlockingQueue<Runnable> :
任务队列,如果当前线程数大于corePoolSize,则将任务添加到此任务队列中。该任务队列是BlockiingQueue类型,也就是阻塞队列。ThreadFactory:
线程工厂。可以用线程工厂给每个创建出来的线程设置名字。一般情况下无须设置参数。RejectedExecutionHandler :
饱和策略,这是当任务队列中和线程池都满了时所采取的对应策略,默认是ABordPolicy,表示无法处理新任务,并抛出RejetctedExecutionException异常。此外还有3种策略,它们分别如下:
(1)CallerRunsPolicy:用调用者所在的线程来处理任务,此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
(2)DiscardPolicy:不能执行的任务,并将该任务删除。
(3)DiscardOldestPolicy:丢弃队列最近的任务,并执行当前的任务。
线程池的处理流程和原理
从上图1中可以得知线程的处理流程主要分为3个步骤:
- 提交任务后,线程池先判断线程数是否达到核心线程数(corePoolSize)。如果未核心线程数,则创建核心线程处理任务;否则就执行下一步操作。
- 接着线程池判断任务队列是否满了。如果没满,则将任务添加到任务队列中,否则,就执行下一步操作。
- 接着因为任务队列满了,线程池就会判断线程数是否达到了最大线程数,如果未达到,则创建非核心线程1处理任务;否则,就执行饱和策略,默认会抛出RejectedExecutionException异常。
虽然上面介绍了线程池的处理流程,但还不是很直观。我们结合下面的图2来更好的了解线程池的原理。
从图2中可以看到,如果我们执行ThreadPoolExecutor的execute方法,会遇到各种情况:
(1)如果线程池中的线程数未达到核心线程数,则创建核心线程处理任务。
(2)如果线程数大于或者等于核心线程数,则将任务加入任务队列,线程池中的空闲线程会不断地从任务队列中取出任务进行处理。
(3)如果任务队列满了,并且线程数没有达到最大线程数,则创建非核心线程去处理任务。
(4)如果线程数超过了最大线程数,则执行饱和策略。
ThreadPoolExecutor的基本使用
<?xml version="1.0" encoding="utf-8"?>
<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:app="http://schemas.android.com/apk/res-auto"
xmlns:tools="http://schemas.android.com/tools"
android:layout_width="match_parent"
android:layout_height="match_parent"
android:orientation="vertical"
tools:context=".MainActivity">
<Button
android:id="@+id/btn_start"
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:text="启动" />
</LinearLayout>
package com.ju.executordemo;
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import android.view.View;
import android.widget.Button;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MainActivity extends AppCompatActivity{
private Button btnStart;
private final int CORE_POOL_SIZE = 4;//核心线程数
private final int MAX_POOL_SIZE = 5;//最大线程数
private final long KEEP_ALIVE_TIME = 10;//空闲线程超时时间
private ThreadPoolExecutor executorPool;
private int songIndex = 0;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
initView();
//创建线程池
initExec();
}
private void initView() {
btnStart = findViewById(R.id.btn_start);
btnStart.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
begin();
}
});
}
public void begin() {
songIndex++;
try {
executorPool.execute(new WorkerThread("歌曲" + songIndex));
} catch (Exception e) {
Log.e("threadtest", "AbortPolicy...已超出规定的线程数量,不能再增加了....");
}
// 所有任务已经执行完毕,我们在监听一下相关数据
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(20 * 1000);
} catch (Exception e) {
}
sout("monitor after");
}
}).start();
}
private void sout(String msg) {
Log.i("threadtest", "monitor " + msg
+ " CorePoolSize:" + executorPool.getCorePoolSize()
+ " PoolSize:" + executorPool.getPoolSize()
+ " MaximumPoolSize:" + executorPool.getMaximumPoolSize()
+ " ActiveCount:" + executorPool.getActiveCount()
+ " TaskCount:" + executorPool.getTaskCount()
);
}
private void initExec() {
executorPool = new ThreadPoolExecutor(CORE_POOL_SIZE,MAX_POOL_SIZE,KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(), Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
}
class WorkerThread implements Runnable {
private String threadName;
public WorkerThread (String name){
threadName = name;
}
@Override
public void run() {
boolean flag = true;
try {
while (flag){
String tn = Thread.currentThread().getName();
//模拟耗时操作
Random random = new Random();
long time = (random.nextInt(5) + 1) * 1000;
Thread.sleep(time);
Log.e("threadtest","线程\"" + tn + "\"耗时了(" + time / 1000 + "秒)下载了第<" + threadName + ">");
//下载完毕跳出循环
flag = false;
}
}catch (Exception e){
e.printStackTrace();
}
}
}
}
上述代码模拟一个下载音乐的例子来演示ThreadPoolExecutor的基本使用,启动ThreadPoolExecutor的函数是
execute()
方法,然后他需要一个Runnable的参数来进行启动。ThreadPoolExecutor的其它种类
通过直接或者间接地配置ThreadPoolExecutor的参数可以创建不同类型的ThreadPoolExecutor,其中有 4 种线程池比较常用,它们分别是 FixedThreadPool、CachedThreadPool、SingleThreadExecutor和 ScheduledThreadPool。下面分别介绍这4种线程池。
- FixedThreadPool
FixedThreadPool 是可重用固定线程数的线程池。在 Executors 类中提供了创建FixedThreadPool的方法, 如下所示:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool的corePoolSize和maximumPoolSize都设置为创建FixedThreadPool指定的参数nThreads,也就意味着FixedThreadPool只有核心线程,并且数量是固定的,没有非核心线程。keepAliveTime设置为0L 意味着多余的线程会被立即终止。因为不会产生多余的线程,所以keepAliveTime是无效的参数。另外,任 务队列采用了无界的阻塞队列LinkedBlockingQueue。FixedThreadPool的execute方法的执行示意图如图4所 示。
当执行
execute()
方法时,如果当前运行的线程未达到corePoolSize(核心线程数)时 就创建核心线程来处理任务,如果达到了核心线程数则将任务添加到LinkedBlockingQueue中。 FixedThreadPool就是一个有固定数量核心线程的线程池,并且这些核心线程不会被回收。当线程数超过corePoolSize时,就将任务存储在任务队列中;当线程池有空闲线程时,则从任务队列中去取任务执行- CachedThreadPool
CachedThreadPool是一个根据需要创建线程的线程池,创建CachedThreadPool的代码如下所示:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
CachedThreadPool的corePoolSize为0,maximumPoolSize设置为Integer.MAX_VALUE,这意味着 CachedThreadPool没有核心线程,非核心线程是无界的。keepAliveTime设置为60L,则空闲线程等待新任务 的最长时间为 60s。在此用了阻塞队列 SynchronousQueue,它是一个不存储元素的阻塞队列,每个插入操作 必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入操作。CachedThreadPool 的execute方法的执行示意图如图5所示。
当执行
execute()
方法时,首先会执行SynchronousQueue的offer()
方法来提交任务,并且查询线程池中是否有空闲的线程执行SynchronousQueue的poll()
方法来移除任务。如果有则配对成功,将任务交给这个空闲的线程处理;如果没有则配对失败,创建新的线程去处理任务。当线程池中的线程空闲时,它会执行 SynchronousQueue的poll()
方法,等待SynchronousQueue中新提交的任务。如果超过 60s 没有新任务提交到 SynchronousQueue,则这个空闲线程将终止。因为maximumPoolSize 是无界的,所以如果提交的任务大于线 程池中线程处理任务的速度就会不断地创建新线程。另外,每次提交任务都会立即有线程去处理。所以,CachedThreadPool比较适于大量的需要立即处理并且耗时较少的任务。- SingleThreadExecutor
SingleThreadExecutor是使用单个工作线程的线程池,其创建源码如下所示:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
corePoolSize和maximumPoolSize都为1,意味着SingleThreadExecutor只有一个核心线程,其他的参数都 和FixedThreadPool一样,这里就不赘述了。SingleThreadExecutor的execute()
方法的执行示意图如图5所示。
当执行
execute()
方法时,如果当前运行的线程数未达到核心线程数,也就是当前没有运行的线程,则创建一个新线程来处理任务。如果当前有运行的线程,则将任务添加到阻塞队列LinkedBlockingQueue中。因此,SingleThreadExecutor能确保所有的任务在一个线程中按照顺序逐一执行。- ScheduledThreadPool
ScheduledThreadPool是一个能实现定时和周期性任务的线程池,它的创建源码如下所示:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
这里创建了ScheduledThreadPoolExecutor,ScheduledThreadPoolExecutor继承自ThreadPoolExecutor,它主要用于给定延时之后的运行任务或者定期处理任务。ScheduledThreadPoolExecutor 的构造方法如下所示:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
从上面的代码可以看出,ScheduledThreadPoolExecutor 的构造方法最终调用的是ThreadPoolExecutor的 构造方法。corePoolSize是传进来的固定数值,maximumPoolSize的值是Integer.MAX_VALUE。因为采用的 DelayedWorkQueue是无界的,所以maximumPoolSize这个参数是无效的。ScheduledThreadPoolExecutor的 execute方法的执行示意图如图6所示。
当执行ScheduledThreadPoolExecutor的
scheduleAtFixedRate()
或者scheduleWithFixedDelay()
方法时,会向DelayedWorkQueue添加一个 实现RunnableScheduledFuture接口的ScheduledFutureTask(任务的包装类),并会检查运行的线程是否达到corePoolSize。如果没有则新建线程并启动它,但并不是立即去执行任务,而是去DelayedWorkQueue中取ScheduledFutureTask,然后去执行任务。如果运行的线程达到了corePoolSize时,则将任务添加到DelayedWorkQueue中。DelayedWorkQueue会将任务进行排序,先要执行的任务放在队列的前面。其跟此前介绍的线程池不同的是,当执行完任务后,会将ScheduledFutureTask中time变量改为下次要执行的时间并放回到DelayedWorkQueue中。参考
- [刘望舒]Android进阶之光
- Android线程池(一)简单使用