使用 Executors 创建线程池
1.newFixedThreadPool()
由于使用了LinkedBlockingQueue所以maximumPoolSize没用,当corePoolSize满了之后就加入到LinkedBlockingQueue队列中。
每当某个线程执行完成之后就从LinkedBlockingQueue队列中取一个。
所以这个是创建固定大小的线程池。
源码分析:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(
nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
2.newSingleThreadPool()
创建线程数为1的线程池,由于使用了LinkedBlockingQueue所以maximumPoolSize 没用,corePoolSize为1表示线程数大小为1,满了就放入队列中,执行完了就从队列取一个。
源码分析
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService
(
new ThreadPoolExecutor(
1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>())
);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
3.newCachedThreadPool()
创建可缓冲的线程池。没有大小限制。由于corePoolSize为0所以任务会放入SynchronousQueue队列中,SynchronousQueue只能存放大小为1,所以会立刻新起线程,由于maxumumPoolSize为Integer.MAX_VALUE所以可以认为大小为2147483647。受内存大小限制。
源码分析
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
使用 ThreadPoolExecutor 创建线程池
源码分析 ,ThreadPoolExecutor 的构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
构造函数参数
1、corePoolSize 核心线程数大小,当线程数 < corePoolSize ,会创建线程执行 runnable
2、maximumPoolSize 最大线程数, 当线程数 >= corePoolSize的时候,会把 runnable 放入 workQueue中
3、keepAliveTime 保持存活时间,当线程数大于corePoolSize的空闲线程能保持的最大时间。
4、unit 时间单位
5、workQueue 保存任务的阻塞队列
6、threadFactory 创建线程的工厂
7、handler 拒绝策略
任务执行顺序
1、当线程数小于 corePoolSize时,创建线程执行任务。
2、当线程数大于等于 corePoolSize并且 workQueue 没有满时,放入workQueue中
3、线程数大于等于 corePoolSize并且当 workQueue 满时,新任务新建线程运行,线程总数要小于 maximumPoolSize
4、当线程总数等于 maximumPoolSize 并且 workQueue 满了的时候执行 handler 的 rejectedExecution。也就是拒绝策略。
JDK7提供了7个阻塞队列。(也属于并发容器)
1、 ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
2、LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
3、PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
4、DelayQueue:一个使用优先级队列实现的无界阻塞队列。
5、SynchronousQueue:一个不存储元素的阻塞队列。
6、LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
7、LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
什么是阻塞队列?
阻塞队列是一个在队列基础上又支持了两个附加操作的队列。
2个附加操作:
支持阻塞的插入方法:队列满时,队列会阻塞插入元素的线程,直到队列不满。
支持阻塞的移除方法:队列空时,获取元素的线程会等待队列变为非空。
阻塞队列的应用场景
阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。简而言之,阻塞队列是生产者用来存放元素、消费者获取元素的容器。
几个方法
在阻塞队列不可用的时候,上述2个附加操作提供了四种处理方法
JAVA里的阻塞队列
JDK 7 提供了7个阻塞队列,如下
1、ArrayBlockingQueue 数组结构组成的有界阻塞队列。
此队列按照先进先出(FIFO)的原则对元素进行排序,但是默认情况下不保证线程公平的访问队列,即如果队列满了,那么被阻塞在外面的线程对队列访问的顺序是不能保证线程公平(即先阻塞,先插入)的。
2、LinkedBlockingQueue一个由链表结构组成的有界阻塞队列
此队列按照先出先进的原则对元素进行排序
3、PriorityBlockingQueue支持优先级的无界阻塞队列
4、DelayQueue支持延时获取元素的无界阻塞队列,即可以指定多久才能从队列中获取当前元素
5、SynchronousQueue不存储元素的阻塞队列,每一个put必须等待一个take操作,否则不能继续添加元素。并且他支持公平访问队列。
6、LinkedTransferQueue由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,多了tryTransfer和transfer方法
transfer方法
如果当前有消费者正在等待接收元素(take或者待时间限制的poll方法),transfer可以把生产者传入的元素立刻传给消费者。如果没有消费者等待接收元素,则将元素放在队列的tail节点,并等到该元素被消费者消费了才返回。
tryTransfer方法
用来试探生产者传入的元素能否直接传给消费者。,如果没有消费者在等待,则返回false。和上述方法的区别是该方法无论消费者是否接收,方法立即返回。而transfer方法是必须等到消费者消费了才返回。
7、LinkedBlockingDeque链表结构的双向阻塞队列,优势在于多线程入队时,减少一半的竞争。
四个拒绝策略
ThreadPoolExecutor默认有四个拒绝策略:
1、ThreadPoolExecutor.AbortPolicy() 直接抛出异常RejectedExecutionException
2、ThreadPoolExecutor.CallerRunsPolicy() 直接调用run方法并且阻塞执行
3、ThreadPoolExecutor.DiscardPolicy() 直接丢弃后来的任务
4、ThreadPoolExecutor.DiscardOldestPolicy() 丢弃在队列中队首的任务
当然可以自己继承RejectedExecutionHandler来写拒绝策略.
TestThreadPoolExecutor 示例
TestThreadPoolExecutor.java
package io.ymq.thread.TestThreadPoolExecutor;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 描述:
*
* @author yanpenglei
* @create 2017-10-12 15:39
**/
public class TestThreadPoolExecutor {
public static void main(String[] args) {
long currentTimeMillis = System.currentTimeMillis();
// 构造一个线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 6, 3,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3)
);
for (int i = 1; i <= 10; i++) {
try {
String task = "task=" + i;
System.out.println("创建任务并提交到线程池中:" + task);
threadPool.execute(new ThreadPoolTask(task));
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
}
try {
//等待所有线程执行完毕当前任务。
threadPool.shutdown();
boolean loop = true;
do {
//等待所有线程执行完毕当前任务结束
loop = !threadPool.awaitTermination(2, TimeUnit.SECONDS);//等待2秒
} while (loop);
if (loop != true) {
System.out.println("所有线程执行完毕");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("耗时:" + (System.currentTimeMillis() - currentTimeMillis));
}
}
}
ThreadPoolTask.java
package io.ymq.thread.TestThreadPoolExecutor;
import java.io.Serializable;
/**
* 描述:
*
* @author yanpenglei
* @create 2017-10-12 15:40
**/
public class ThreadPoolTask implements Runnable, Serializable {
private Object attachData;
ThreadPoolTask(Object tasks) {
this.attachData = tasks;
}
public void run() {
try {
System.out.println("开始执行任务:" + attachData + "任务,使用的线程池,线程名称:" + Thread.currentThread().getName());
System.out.println();
} catch (Exception e) {
e.printStackTrace();
}
attachData = null;
}
}
遇到java.util.concurrent.RejectedExecutionException
第一
你的线程池 ThreadPoolExecutor 显示的 shutdown() 之后,再向线程池提交任务的时候。 如果你配置的拒绝策略是 AbortPolicy 的话,这个异常就会抛出来。
第二
当你设置的任务缓存队列过小的时候,或者说, 你的线程池里面所有的线程都在干活(线程数== maxPoolSize),并且你的任务缓存队列也已经充满了等待的队列, 这个时候,你再向它提交任务,则会抛出这个异常。
响应
可以看到线程 pool-1-thread-1 到5 循环使用
创建任务并提交到线程池中:task=1
开始执行任务:task=1任务,使用的线程池,线程名称:pool-1-thread-1
创建任务并提交到线程池中:task=2
开始执行任务:task=2任务,使用的线程池,线程名称:pool-1-thread-2
创建任务并提交到线程池中:task=3
开始执行任务:task=3任务,使用的线程池,线程名称:pool-1-thread-3
创建任务并提交到线程池中:task=4
开始执行任务:task=4任务,使用的线程池,线程名称:pool-1-thread-4
创建任务并提交到线程池中:task=5
开始执行任务:task=5任务,使用的线程池,线程名称:pool-1-thread-5
创建任务并提交到线程池中:task=6
开始执行任务:task=6任务,使用的线程池,线程名称:pool-1-thread-1
创建任务并提交到线程池中:task=7
开始执行任务:task=7任务,使用的线程池,线程名称:pool-1-thread-2
创建任务并提交到线程池中:task=8
开始执行任务:task=8任务,使用的线程池,线程名称:pool-1-thread-3
创建任务并提交到线程池中:task=9