并发 推送任务

一个调度线程 

线程池 多个线程执行推送信息service

用ScheduledExecutorService 每2秒监控调度线程以及线程池状态

使用CountDownLatch 闭锁等待调度线程以及线程池执行完推送任务 后执行主线程业务逻辑



private void start(SendTask sendTask)throws Exception {

//        CountDownLatch latch = new CountDownLatch(1);

        CountDownLatch threadPoolLatch =new CountDownLatch(1);

CountDownLatch dispatchLatch =new CountDownLatch(1);

String msgContent = sendTask.getMsgContent();

String sendConfId =sendTask.getProviderId();

if(StringUtils.isNotEmpty(sendConfId)){

ProviderSendConf providerSendConf =providerSendConfRepository.findOne(new Integer(sendConfId));

if(providerSendConf  !=null){

String sendService = providerSendConf.getSendService();

if(StringUtils.isNotEmpty(sendService)){

sendMsgService = (MsgSendService) SpringContextUtil.getBean(sendService);

}

}

}

ScheduledExecutorService scheduledService = Executors.newScheduledThreadPool(1);

ExecutorService taskProcessPool =new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),5000,60L, TimeUnit.SECONDS,

new LinkedBlockingQueue(10000));

new Thread(new Runnable() {

@Override

            public void run() {

try {

dispatchLatch.await();

threadPoolLatch.await();

}catch (InterruptedException e) {

e.printStackTrace();

}

logger.info("task end");

System.out.println("task end");

scheduledService.shutdownNow();

}

}).start();

new Thread(new Runnable() {

public boolean isAllSend =false;

@Override

            public void run() {

while (!isAllSend) {

List sendRecordList =sendRecordRepository.findBySendStatusAndFileId(SendStatus.UN_SEND.getStatus(),new Integer(sendTask.getFileId()));

if(sendRecordList !=null &&  sendRecordList.size()>0){

for (SendRecord sendRecord:sendRecordList) {

sendRecord.setSendStatus(SendStatus.SEND.getStatus());

sendRecord.setSendTime(new Date());

sendRecordRepository.save(sendRecord);

Msg msg =new Msg();

msg.setContent(msgContent);

msg.setMobile(sendRecord.getMobile());

MsgSendTask task =new MsgSendTask();

task.setSendService(sendMsgService);

task.setMsg(msg);

taskProcessPool.submit(task);

try {

Thread.sleep(1);

}catch (InterruptedException e) {

e.printStackTrace();

}

}

}else {

isAllSend =true;

}

}

dispatchLatch.countDown();

}

}).start();

new Thread(new Runnable() {

@Override

            public void run() {

try {

//等任务派发完成才开始检查线程池状态

                    dispatchLatch.await();

}catch (InterruptedException e) {

e.printStackTrace();

}

ThreadPoolExecutor executor = (ThreadPoolExecutor) taskProcessPool;

if (executor.getQueue().size() ==0 && executor.getActiveCount() ==0) {

executor.shutdownNow();

threadPoolLatch.countDown();

ImportFile importFile =importFileRepository.findOne(new Integer(sendTask.getFileId()));

importFile.setStatus(ImportFileStatus.TASK_FINISH.getStatus());

importFileRepository.save(importFile);

sendTask.setTaskStatus(TaskStatus.FINISH.getStatus());

sendTaskRepository.save(sendTask);

logger.info("ThreadPool monitor exit....");

System.out.println("ThreadPool monitor exit....");

return;

}

logger.info("ThreadPool monitor....");

System.out.println("ThreadPool monitor....");

scheduledService.schedule(this,2, TimeUnit.SECONDS);

}

}).start();

//        latch.await();

    }

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 一.线程安全性 线程安全是建立在对于对象状态访问操作进行管理,特别是对共享的与可变的状态的访问 解释下上面的话: ...
    黄大大吃不胖阅读 878评论 0 3
  • layout: posttitle: 《Java并发编程的艺术》笔记categories: Javaexcerpt...
    xiaogmail阅读 5,878评论 1 19
  • 1、线程安全与锁 线程安全的本质,在于 存在了共享的可变状态 status, 在多线程共同操作状态变量时,当计算的...
    轩居晨风阅读 368评论 1 1
  • Thread机制允许同时进行的多个活动,并发程序设计比单线程程序设计要困难得多。 第六十六条、同步访问共享的可变数...
    Timorous阅读 245评论 0 0
  • 和姐姐差三岁 小的时候我在爸妈眼里就是不讲理,因为是老二所以我的外号就是“二拐”,每次吃饭时候我...
    景_acaa阅读 464评论 0 1