在这里记录下线程池的实际应用场景。包括:
ExecutorService 使用(包含CountDownLatch使用,和一个CyclicBarrier的demo)
ForkJoinPool 使用 (包含RecursiveAction,RecursiveTask)
spring的threadPool 使用
-
ExecutorService 使用
拿一个很简单的需求来说,群发短信。如果给一个公司的全部客户群发。中小型公司的客户也有几十万。如果用单线程来执行,很慢。
所以这里就拿多线程来实现这个群发短信的需求。(当然只是demo)
看代码
/**
* 暂且把这个方法看成一个发短信的方法。
* 发送一个短信,然后保存一条发短信的记录。
* @param user
* @throws Exception
*/
@Override
public void sendMessage(User user) throws Exception{
MessageRecord messageRecord = new MessageRecord();
try {
messageRecord.setUserId(user.getId());
messageRecord.setUserPhone(user.getPhone());
messageRecord.setMessageContent(user.getName()+",你好,这是你的短信。");
messageRecord.setCurrentTimeMillis(System.currentTimeMillis()+"");
messageRecordMapper.insertSelective(messageRecord);
System.out.println("给用户id:["+user.getId()+"]发送短信。当前线程:["+Thread.currentThread().getName()+"]");
}catch (Exception e){
throw new Exception("发送短信异常");
}
}
/**
* 发短信的线程。
* 简单说下,Callable 和Runnable 的区别,
* Callable有返回结果,带泛型;泛型为返回结果的类型,
* Runnable 没有返回结果。
* 下边这个线程有返回结果,就不写了。
*/
public class MessageCallable implements Callable{
//需要被操作的用户
private User user;
//服务类
private UserService userService;
//倒数计数器,为了阻塞主线程
private CountDownLatch latch;
/**
* 有参构造器,为了给这三个赋值
* @param user
* @param userService
* @param latch
*/
public MessageCallable(User user,UserService userService,CountDownLatch latch){
this.user = user;
this.userService = userService;
this.latch = latch;
}
@Override
public Object call() throws Exception{
try {
//调用发短信方法。
userService.sendMessage(user);
}finally {
//必须执行在finally中。
//如果 没有 countDown 会导致主线程在子线程都执行后也阻塞
latch.countDown();
//可以把返回结果在这里返回
return null;
}
}
}
Executors 可以通过工厂方法创建好几种线程池,这里说下几种典型的。
1,newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
2,newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
3,newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
4,newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
(在JDK1.8中,还有几种,无非是基于上述4个线程池的。除了newWorkStealingPool,这个一会说)
上述4个线程池,都是基于这个构造器的
corePoolSize:线程池基本大小
maximumPoolSize:线程池允许创建的最大线程数 (如果选用FixedThreadPool,那么corePoolSize和maximumPoolSize都是构造器中指定的大小。如果选用newCachedThreadPool,那么corePoolSize为0,maximumPoolSize为Integer最大值)
keepAliveTime :线程池的工作线程空闲后,保持存活的时间。
timeUnit :时间单位
-
workQueue(任务队列) : 用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列:
- ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,按FIFO原则进行排序
- LinkedBlockingQueue:一个基于链表结构的阻塞队列,吞吐量高于ArrayBlockingQueue。静态工厂方法
Excutors.newFixedThreadPool()使用了这个队列 - SynchronousQueue: 一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则
插入操作一直处于阻塞状态,吞吐量高于LinkedBlockingQueue,静态工厂方法
Excutors.newCachedThreadPool()使用了这个队列 - PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
threadFactory:线程工厂,可以给该线程池起名字
-
RejectedExecutionHandler :当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略还处理新提交的任务。它可以有如下四个选项:
- AbortPolicy:直接抛出异常,默认情况下采用这种策略
- CallerRunsPolicy:只用调用者所在线程来运行任务
- DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务
- DiscardPolicy:不处理,丢弃掉
构造器参数参考文献:https://blog.csdn.net/u010723709/article/details/50377543
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
下边继续看代码。这里使用的是newFixedThreadPool 定长线程池
/**
* threads time(ms)
* fix30 - 3485
* 30 - 2995
* 30 - 3092
* 10 - 2890
* 10 - 2976
* 10 - 3023
* 4 - 3584
* 4 - 3775
* 4 - 3781
* cache - 14025
* cache - 7093
* cache - 10235
* single - 8950
* single - 8948
* single - 9364
*
* 注:本机器cpu为4核,上述为几个线程池处理10000条用户的效率对比
* @return
*/
@Override
public Map massTextingByThreadPoolExecutorAndLatch() {
Map<String,Object> resultMap = new HashMap(2);
//创建定长线程池
ExecutorService executorService = Executors.newFixedThreadPool(8);
//声明倒数计数器
CountDownLatch latch = null;
//要处理的用户
List<User> users = null;
long start = System.currentTimeMillis();
try {
users = this.findAllUser();
if(users==null || users.size()==0){
resultMap.put("isSuccess",false);
resultMap.put("message","没有用户");
}else{
//通过构造器创建 user总数的通过CountDownLatch
latch = new CountDownLatch(users.size());
for (User user : users){
//循环执行。
executorService.submit(new MessageCallable(user,this,latch));
}
//主线程阻塞等待所有的子线程循环执行完毕users.size()的数量
//如果,子线程中的CountDownLatch没有countDown。await 会一直等待,
//当然也可以使用 latch(long timeout, TimeUnit unit)这个方法来规定阻塞多少时间。
latch.await();
resultMap.put("isSuccess",true);
resultMap.put("message","发送成功");
}
}catch (Exception e){
resultMap.put("isSuccess",false);
resultMap.put("message","系统异常");
}finally {
System.out.println("耗时:["+ (System.currentTimeMillis()-start)+"]毫秒");
return resultMap;
}
}
如果想要接收返回结果呢。其实可以用CompletionService来完成。
CompletionService实际上可以看做是Executor和BlockingQueue的结合体。CompletionService在接收到要执行的任务时,通过类似BlockingQueue的put和take获得任务执行的结果。CompletionService的一个实现是ExecutorCompletionService,ExecutorCompletionService把具体的计算任务交给Executor完成。
在实现上,ExecutorCompletionService在构造函数中会创建一个BlockingQueue(使用的基于链表的无界队列LinkedBlockingQueue),该BlockingQueue的作用是保存Executor执行的结果。当计算完成时,调用FutureTask的done方法。当提交一个任务到ExecutorCompletionService时,首先将任务包装成QueueingFuture,它是FutureTask的一个子类,然后改写FutureTask的done方法,之后把Executor执行的计算结果放入BlockingQueue中。
看实现代码,其实就多一行就行。
但是一般都多线程了。都不要返回结果了。
public Map massTextingByThreadPoolExecutorAndLatchC() {
Map<String,Object> resultMap = new HashMap(2);
//创建定长线程池
ExecutorService executorService = Executors.newFixedThreadPool(8);
//用完成服务来包一下线程池,用来接收结果。
CompletionService cs = new ExecutorCompletionService(executorService);
//声明倒数计数器
CountDownLatch latch = null;
//要处理的用户
List<User> users = null;
long start = System.currentTimeMillis();
try {
users = this.findAllUser();
if(users==null || users.size()==0){
resultMap.put("isSuccess",false);
resultMap.put("message","没有用户");
}else{
//通过构造器创建 user总数的通过CountDownLatch
latch = new CountDownLatch(users.size());
for (User user : users){
//循环执行。通过cs来获取结果
cs.submit(new MessageCallable(user,this,latch));
}
//主线程阻塞等待所有的子线程循环执行完毕users.size()的数量
//如果,子线程中的CountDownLatch没有countDown。await 会一直等待,
//当然也可以使用 latch(long timeout, TimeUnit unit)这个方法来规定阻塞多少时间。
latch.await();
for(int i=0;i<users.size();i++){
//循环打印结果
//cs.take() 返回的是 Future 在get就行了。
System.out.println(cs.take().get());
}
resultMap.put("isSuccess",true);
resultMap.put("message","发送成功");
}
}catch (Exception e){
resultMap.put("isSuccess",false);
resultMap.put("message","系统异常");
}finally {
System.out.println("耗时:["+ (System.currentTimeMillis()-start)+"]毫秒");
return resultMap;
}
}
CountDownLatch算是一个多线程的辅助类,该类利用倒数的方式来阻塞主线程,达到一种所有子线程执行完毕在走主线程的目的。
和CountDownLatch类似功能的还有CyclicBarrier,该类用的是增加的方式在阻塞子线程。可以让子线程在某个节点阻塞,然后所有子线程执行完毕后从该节点继续运行和执行别的线程。(和CountDownLatch最大的区别是 一个阻塞主线程,一个是阻塞子线程)
看一个CyclicBarrier运用的demo吧。
/**
* 子线程
*/
public class MessageCallableByBarrier implements Callable{
private User user;
private CyclicBarrier barrier;
public MessageCallableByBarrier(User user,CyclicBarrier barrier){
this.user = user;
this.barrier = barrier;
}
@Override
public Object call() throws Exception{
System.out.println("我是用户:"+user.getName()+",开始等待。");
//此时开始阻塞子线程
barrier.await();
System.out.println("我是用户:"+user.getName()+",全部等待完毕。一起执行");
return null;
}
}
public Map massTextingByThreadPoolExecutorAndBarrier(){
Map<String,Object> resultMap = new HashMap(2);
List<User> users = this.findAllUser().subList(0,10);
//创建定长线程池
ExecutorService executorService = Executors.newFixedThreadPool(users.size());
//创建cyclicBarrier,
//cyclicBarrier 有两个构造器,
// CyclicBarrier(int parties, Runnable barrierAction)这个构造器第一个参数是阻塞多少线程,第二个参数是所有子线程等待完毕要执行的线程
CyclicBarrier cyclicBarrier = new CyclicBarrier(users.size(),()->
System.out.println("在所有的子线程await之后执行")
);
for(User user:users){
executorService.submit(new MessageCallableByBarrier(user,cyclicBarrier));
}
System.out.println("主线程不会阻塞");
return resultMap;
}
输出
主线程不会阻塞
我是用户:dajiejie0,开始等待。
我是用户:dajiejie9,开始等待。
我是用户:dajiejie8,开始等待。
我是用户:dajiejie7,开始等待。
我是用户:dajiejie6,开始等待。
我是用户:dajiejie5,开始等待。
我是用户:dajiejie4,开始等待。
我是用户:dajiejie3,开始等待。
我是用户:dajiejie2,开始等待。
我是用户:dajiejie1,开始等待。
在所有的子线程await之后执行
我是用户:dajiejie1,全部等待完毕。一起执行
我是用户:dajiejie0,全部等待完毕。一起执行
我是用户:dajiejie9,全部等待完毕。一起执行
我是用户:dajiejie8,全部等待完毕。一起执行
我是用户:dajiejie7,全部等待完毕。一起执行
我是用户:dajiejie5,全部等待完毕。一起执行
我是用户:dajiejie4,全部等待完毕。一起执行
我是用户:dajiejie3,全部等待完毕。一起执行
我是用户:dajiejie6,全部等待完毕。一起执行
我是用户:dajiejie2,全部等待完毕。一起执行
-
ForkJoinPool使用
forkJoinPool 是一个分拆/合并的线程池。他可以充分利用CPU把一个大任务拆成多个小任务。多个小任务执行完后在合并起来。
该线程池是JDK1.7后加入的。也是并发大神Doug Lea写的。
forkJoinPool使用了工作窃取算法。既当一个线程空闲时会去帮助别的线程从尾端执行任务。ForkJoinPool的效率是略高于ThreadPoolExecutor的,但是CPU占用率会高很多,也会产生大量的GC(可以使用jconsole等工具观察)。具体选择使用的话要看情况。
要使用forkJoinPool的话,一定要会用到RecursiveAction 和RecursiveTask
从名字就看出来这是个递归。
RecursiveAction 没返回结果
RecursiveTask 返回结果为泛型的类型
注:看这个的时候,我的同事syq给了我很大的帮助,谢谢哦。
还是拿上边那个发短信的需求来说 看代码
public class MessageAction extends RecursiveAction {
//用户服务
private UserService userService;
//临界值
private static final int COUNT = 500;
//需要被处理的数据
private List<User> users;
//通过构造器传参
public MessageAction(List<User> users, UserService userService){
super();
this.users = users;
this.userService = userService;
}
//该方法是必须实现的。
@Override
protected void compute() {
//如果users.size() 没有拆分到临界值,那么继续拆分
if(users.size()>COUNT){
//用了二分查找来拆分,一个从中间向左找,一个从中间向右找。一直递归到小于临界值
int middle=users.size()/2;
MessageAction left = new MessageAction(users.subList(0,middle),userService);
MessageAction right = new MessageAction(users.subList(middle,users.size()),userService);
left.fork();
right.fork();
}else{
try {
//小于临界值后执行
userService.massTexting(users);
}catch (Exception e){
}
}
}
}
/**
/**
* 2509
* 2394
* 2161
* 2467
* 注:由于 这个没有countDownLatch 阻塞主线程,所以不能用单元测试来跑,而且统计下来的执行时间 要从数据库看
* @return
*/
@Override
public Map massTextingByForkJoinPool() {
Map<String,Object> resultMap = new HashMap(2);
//一般就用这种创建方式就行。这种创建方式会创建一个 cpu核数-1 的线程池,是最合理的。
//用new ForkJoinPool()也可以。会创建一个 cpu核数 的线程池
ForkJoinPool pool = ForkJoinPool.commonPool();
List<User> users = null;
long start = System.currentTimeMillis();
try {
users = this.findAllUser();
if(users==null || users.size()==0){
resultMap.put("isSuccess",false);
resultMap.put("message","没有用户");
}else{
//在这里执行运行就可以,execute没有返回结果,submit和invoke有返回结果。
pool.execute(new MessageAction(users,this));
resultMap.put("isSuccess",true);
resultMap.put("message","发送成功");
}
}catch (Exception e){
resultMap.put("isSuccess",false);
resultMap.put("message","系统异常");
return resultMap;
}finally {
pool.shutdown();
return resultMap;
}
}
在看下有返回结果的。还是拿发短信来说,我要返回发送过短信的用户id的总和
public class MessageTask extends RecursiveTask<Integer>{
//用户服务
private UserService userService;
//临界值
private static final int COUNT = 500;
//需要发短信的用户
private List<User> users;
public MessageTask(List<User> users, UserService userService){
super();
this.users = users;
this.userService = userService;
}
@Override
protected Integer compute() {
if(users.size()>COUNT){
int middle=users.size()/2;
MessageTask left = new MessageTask(users.subList(0,middle),userService);
MessageTask right = new MessageTask(users.subList(middle,users.size()),userService);
left.fork();
right.fork();
//返回左半部分和右半部分相加的结果
//返回结果其实就是 join 有return 就是与 RecursiveAction最大的区别
return left.join()+right.join();
}else{
try {
return userService.massTexting(users);
}catch (Exception e){
}
return null;
}
}
}
/**
* @return
* 有返回结果的forkJoinPool
*/
@Override
public Map massTextingByForkJoinPoolByTask() {
Map<String,Object> resultMap = new HashMap(2);
ForkJoinPool pool = new ForkJoinPool();
List<User> users = null;
Integer sum = null;
long start = System.currentTimeMillis();
try {
users = this.findAllUser();
if(users==null || users.size()==0){
resultMap.put("isSuccess",false);
resultMap.put("message","没有用户");
}else{
//invoke 和submit的区别在于 invoke是同步的。
sum = pool.invoke(new MessageTask(users, this));
resultMap.put("isSuccess",true);
resultMap.put("message","发送成功");
}
}catch (Exception e){
resultMap.put("isSuccess",false);
resultMap.put("message","系统异常");
return resultMap;
}finally {
pool.shutdown();
System.out.println(sum);
System.out.println("耗时:["+ (System.currentTimeMillis()-start)+"]毫秒");
return resultMap;
}
}
上述 ThreadPoolExecutor 和ForkJoinPool 还是适合于半夜定时任务的应用。那时候可以疯狂的占用CPU,哈哈。高峰期 对于多线程的使用还是慎用。还有,拆分和线程的调度也是消耗效率的,不是使用多线程就一定会效率增高,还是看情况。
-
Spring中的ThreadPool
在看一个spring中的threadPool, 很简单。使用情况也挺多的。比如我一个查询 ,需要查询用户列表和短信发送列表
看代码
/**
* 先继承一个ThreadPoolTaskExecutor
* 可以自己设置参数
* 如果不设置的话,就用默认参数
* private int corePoolSize = 1;
* private int maxPoolSize = 2147483647;
* private int keepAliveSeconds = 60;
* private int queueCapacity = 2147483647;
* private boolean allowCoreThreadTimeOut = false;
* 跟cachedPool挺像的 具体含义上边有
*/
@Component
public class ThreadPoolForSpring extends ThreadPoolTaskExecutor {
}
@Autowired
private ThreadPoolForSpring threadPoolForSpring;
public void getUserAndMessageRecord() throws Exception {
//开启异步
Future<List<User>> usersFuture = threadPoolForSpring.submit(()-> {
System.out.println("当前线程名称:"+Thread.currentThread().getName());
return this.findAllUser();
});
MessageRecordExample recordExample = new MessageRecordExample();
List<MessageRecord> messageRecords = messageRecordMapper.selectByExample(recordExample);
System.out.println("同步查询messageRecords:"+messageRecords.size()+"条");
System.out.println("异步查询users:"+usersFuture.get().size()+"条");
}
这就OK了。 Spring的ThreadPool 也是基于JDK的ThreadPool的。设置起参数即可。
在给大家看个更骚的。
java8的并行流也是用ForkJoinPool实现的。并且主线程也加入了任务。
在lambda效率那么垃圾的情况下,这个方法也只用了3S多点。
public void massTextingByParallelStream(){
long start = System.currentTimeMillis();
List<User> allUser = this.findAllUser();
allUser.parallelStream().forEach(user -> {
MessageRecord messageRecord = new MessageRecord();
messageRecord.setUserId(user.getId());
messageRecord.setUserPhone(user.getPhone());
messageRecord.setMessageContent(user.getName()+",你好,这是你的短信。");
messageRecord.setCurrentTimeMillis(System.currentTimeMillis()+"");
messageRecordMapper.insertSelective(messageRecord);
System.out.println("给用户id:["+user.getId()+"]发送短信。当前线程:["+Thread.currentThread().getName()+"]");
});
System.out.println("耗时:["+ (System.currentTimeMillis()-start)+"]毫秒");
}