自定义线程池+多线程处理+CountDownLatch

前几天在写同步接口,因为数据量比较大,所以使用多线程,这里写了Demo记录下。

自定义线程池

  1. Java中Executors已经提供了创建线程池的方式,但在阿里巴巴开发手册上是严禁使用的,建议使用自定义线程池,究其原因,是可能会产生一些问题。
public static ExecutorService newFixedThreadPool(int nThreads) {
       return new ThreadPoolExecutor(nThreads, nThreads,
                                     0L, TimeUnit.MILLISECONDS,
                                     new LinkedBlockingQueue<Runnable>());
   }


public static ExecutorService newSingleThreadExecutor() {
       return new FinalizableDelegatedExecutorService
           (new ThreadPoolExecutor(1, 1,
                                   0L, TimeUnit.MILLISECONDS,
                                   new LinkedBlockingQueue<Runnable>()));
   }


public static ExecutorService newCachedThreadPool() {
       return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                     60L, TimeUnit.SECONDS,
                                     new SynchronousQueue<Runnable>());
   }
  1. 查看源码发现newFixedThreadPool和newSingleThreadExecutor方法他们都使用了LinkedBlockingQueue的任务队列,LikedBlockingQueue的默认大小为Integer.MAX_VALUE。newCachedThreadPool中定义的线程池大小为Integer.MAX_VALUE。

  2. 通过源码发现禁止使用Executors创建线程池的原因就是newFixedThreadPool和newSingleThreadExecutor的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。

  3. newCachedThreadPool允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

下面是写了一个Demo演示

public class ThreadPool {


    /**
     * 自定义线程名称,方便的出错的时候溯源
     */
    private static final ThreadFactory NAME_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("order-pool-%d").build();
    //private static final ThreadFactory NAMED_THREAD_FACTORY = new ThreadFactoryBuilder().build();

    /**
     * corePoolSize    线程池核心池的大小
     * maximumPoolSize 线程池中允许的最大线程数量
     * keepAliveTime   当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间
     * unit            keepAliveTime 的时间单位
     * workQueue       用来储存等待执行任务的队列
     * threadFactory   创建线程的工厂类
     * handler         拒绝策略类,当线程池数量达到上线并且workQueue队列长度达到上限时就需要对到来的任务做拒绝处理
     */
    private static final ExecutorService service = new ThreadPoolExecutor(
            4,
            6,
            0L,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(1024),
            NAME_THREAD_FACTORY,
            new ThreadPoolExecutor.AbortPolicy()
    );

    /**
     * 获取线程池
     * @return 线程池
     */
    public static ExecutorService getEs() {
        return service;
    }

    /**
     * 使用线程池创建线程并异步执行任务
     * @param r 任务
     */
    public static void newTask(Runnable r) {
        service.execute(r);
    }

    public static void main(String[] args) throws InterruptedException {

        String[] arr = {"a","b","c","d","e","f","g","h","1","4","n"};
        CountDownLatch countDownLatch = new CountDownLatch(arr.length);
        List<String> list = Arrays.asList(arr);
        ExecutorService es = getEs();

        System.out.println("开始处理...");
        int size = 0;
        List<Future<Integer>> resultList = new ArrayList<Future<Integer>>();
        try {

            for (String m : list) {
                SendEmail sendEmail = new SendEmail(m, countDownLatch);
                SendEmailCallBack sendEmailCallBack = new SendEmailCallBack(m, countDownLatch);
                Future<Integer> future = es.submit(sendEmailCallBack);
                resultList.add(future);
            }
            System.out.println("主线程等待...");
            countDownLatch.await();
            for (Future<Integer> future : resultList) {
                size += future.get();
            }
            System.out.println("处理数量: "+size);
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            es.shutdown();
        }

        System.out.println("处理完成...");
    }

}

业务类

主要实现自己的业务逻辑

public class SendEmailCallBack implements Callable<Integer> {
    private final String mobile;
    private final CountDownLatch countDownLatch;

    public SendEmailCallBack(String mobile, CountDownLatch countDownLatch) {
        this.mobile = mobile;
        this.countDownLatch = countDownLatch;
    }

    private int sendEmail() {
        try {
            Thread.sleep(1000);
            System.out.println("线程:"+Thread.currentThread().getName()+ ", 邮件" + mobile +"发送成功");
        }catch (Exception e){
            e.printStackTrace();
        }
        return 1;
    }

    @Override
    public Integer call() throws Exception {
        int num = 0;
        try {
            synchronized (this){
                 num = this.sendEmail();
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            countDownLatch.countDown();
        }
        return num;
    }
}

CountDownLatch

使用CountDownLatch是为了同步时,主线程等待所有线程执行执行完毕后,获取返回数据,汇总同步总数。

控制台打印结果

开始处理...
主线程等待...
线程:order-pool-0, 邮件a发送成功
线程:order-pool-2, 邮件c发送成功
线程:order-pool-3, 邮件d发送成功
线程:order-pool-1, 邮件b发送成功
线程:order-pool-3, 邮件g发送成功
线程:order-pool-2, 邮件f发送成功
线程:order-pool-0, 邮件e发送成功
线程:order-pool-1, 邮件h发送成功
线程:order-pool-3, 邮件1发送成功
线程:order-pool-2, 邮件4发送成功
线程:order-pool-0, 邮件n发送成功
处理数量: 11
处理完成...
下面单独写了一个线程池创建类,修改下就可以使用,和上面Demo实现无关。
public class ThreadPoolFactory {
    /**
    * 下面的属性大小可以改成获取服务器配置来动态调整
    */
    /**
     *核心线程数
     */
    private static final int corePoolSize = 4;
    /**
     * 最大核心线程数
     */
    private static final int maximumPoolSize = 6;
    /**
     * 工作队列
     */
    private static final int workQueue = 1024;
    /**
     *线程空闲时间
     */
    private static final int keepAliveTime = 30;

    /**
     * 自定义线程名称
     */
    private static final ThreadFactory NAMED_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("order-pool-%d").build();

    /**
     * corePoolSize    线程池核心池的大小
     * maximumPoolSize 线程池中允许的最大线程数量
     * keepAliveTime   当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间
     * unit            keepAliveTime 的时间单位
     * workQueue       用来储存等待执行任务的队列
     * threadFactory   创建线程的工厂类
     * handler         拒绝策略类,当线程池数量达到上线并且workQueue队列长度达到上限时就需要对到来的任务做拒绝处理
     */
    private static final ExecutorService service = new ThreadPoolExecutor(
            corePoolSize,
            maximumPoolSize,
            keepAliveTime,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(workQueue),
            NAMED_THREAD_FACTORY,
            new ThreadPoolExecutor.AbortPolicy()
    );

    /**
     * 获取线程池
     * @return 线程池
     */
    public static ExecutorService getEs() {
        return service;
    }

    /**
     * 使用线程池创建线程并异步执行任务
     * @param runnable 任务
     */
    public static void newTask(Runnable runnable) {
        service.execute(runnable);
    }

}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,335评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,895评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,766评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,918评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,042评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,169评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,219评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,976评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,393评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,711评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,876评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,562评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,193评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,903评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,699评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,764评论 2 351