异步编排和线程池

通过本文档你将学习到

  • 线程初始化的几种方式。
  • 线程的相关概念
  • 线程池怎么玩
  • 线程池的常见面试题

1 线程回顾知识

1.1 线程初始化的几种方式

1)、继承 Thread
2)、实现 Runnable 接口
3)、实现 Callable 接口 + FutureTask (可以拿到返回结果,可以处理异常)
4)、线程池
常见线程方式一

public static class Thread01 extends Thread {
    @Override
    public void run() {
        System.out.println("当前线程:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("运行结果:" + i);
    }
}
Thread thread = new Thread01();
 thread.start();

常见线程方式二

public static class Runable01 implements Runnable {
    @Override
    public void run() {
        System.out.println("当前线程:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("运行结果:" + i);
    }
}

 Runable01 runable01 = new Runable01();
 new Thread(runable01).start();

说到上面两种,大家非常非常的熟悉,就不多扯淡了,接下来,将引出第三种Callable。

Thread thread = new Thread01();
 thread.start();

相信这个大家再熟悉不过了,刚入门的时候,就是这么学习的。接下来我们就打开API

Thread

https://www.matools.com/api/java8
Callable,带返回值的
我们先看下自己超级熟悉的Thread() 构造方法,第一第二种方式都得创建Thread()
Thread()
分配一个新的 Thread对象。
Thread(Runnable target)
分配一个新的 Thread对象。
Thread(Runnable target, String name)
分配一个新的 Thread对象。

一般情况下我们都是使用 Thread(Runnable target, String name)吧

废话不多说 要是有一个这构造方法 Thread(Callable target, String name) 那不是直接搞起么
那么说明,我们一拍脑袋半秒就能想到的问题,这sum Oracle公司的天才们都想不到,
他们就是蠢货一个,马上写信给他们说,让我来拯救JAVA代码
那么接下来你有两种做法,马上发邮件告诉sum Oracle公司的天才们
1 我发现了BUG,你们没有扩展一个方法 Thread(Callable target, String name)
2 自动诞生的时候天才们就想到了扩展方法,根本不用多余的加个构造方法,最后发现小丑竟然是自己
你觉得是哪一个呢?

回归正题:本来我只能接受Runnable ,现在Callable 说,我也想勾搭下Thread,那么你准备怎么搞?
你想想搞定Thread,但是我不认识,那我就先搞定它周人的人,间接不就搞定了么?
你想想我们写代码的时候参数都是写的list 然后传递list的子类ArrayList对吧。更底层的代码直接传递的Collection,有时候大神写代码你看着更无语,直接传递一个接口,这个接口还没有任何方法,继承后也不用实现,这种混个身份,到时候可以透传的,如果你这是无语了,。你要说这人是傻逼,到最后发现小丑竟是我自己对吧?因为我们能力还达不到人家的境界,无法理解。等你能力提高后,你才会发现人家为什么这么写底层代码?哦明白了,看着好自然,对,就该这么搞。看着代码像大自然一样自然

FutureTask 就是中间人了,帮你撮合
Thread(Callable target, String name) 没有 这个构造,那么中间人过来
Thread(中间人 FutureTask , String name) 
new  Thread(futureTask , '"我的名字叫线程110号").star() 

在此之前也得介绍下:Future
这个是一个接口,获取异步任务的结果,取消任务的执行,判断任务是否被取消,判断任务执行是否完毕。
主线程让子线程去执行其它事情。Future接口的实现类FutureTask 。


image.png

JDK.5以后常见方式三:

public static class Callable01 implements Callable <Integer> {
    @Override
    public Integer call() throws Exception {
        System.out.println("当前线程:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("运行结果:" + i);
        return I;
    }
}
我启动了1000个线程去跑对账任务,现在编号999线程出错了,你给我返回错误,我想看下为什么?

  FutureTask<Integer> futureTask = new FutureTask<>(new Callable01());
 new Thread(futureTask).start();
 System.out.println(futureTask.get());

方式4 直接提交給线程池即可

public static ExecutorService executor = Executors.newFixedThreadPool(10);
// service.execute(new Runable01());
// Future<Integer> submit = service.submit(new Callable01());
// submit.get();

futureTask优势明显呀,你看搭配线程池用的飞起,那么它又缺点吗?
当然有呀,人无完人,代码也有缺点
1、futureTask.get()阻塞 也可以传递x秒 等待几秒
2、那我用isDone()方法。


image.png

这样也不行呀,浪费cpu字段,你一直while true 要有个回调通知那多好呀。
一般情况下我们都是通过轮询的方式获取结果,进来不要阻塞。

那有没有什么方式解决上面的缺点呢?
答案是肯定的,有的,我们不用这个类,我们有一个更好的选择 CompletableFuture 这个比Future还能干,Future能干的CompletableFuture也可以,CompletableFuture还能干Future不能干的。爽不爽?

CompletableFuture基本介绍。
阻塞的方式和异步编程的设计理念相违背,而轮询的方式会消耗无畏的CPU资源。因此,JDK8设计出CompletableFuture

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {

在Java 8中, Complet able Future提供了非常强大的Future的扩展功能, 可以帮助我们简化异步编程的复杂性, 并且提供了函数式编程的能力, 可以通过回调的方式处理计算结果, 也提供了转换和组合Complet able Future的方法。
它可能代表一个明确完成的Future, 也有可能代表一个完成阶段(Completion Stage) , 它支持在计算完成以后触发一些函数或执行某些动作。

�核心的四个静态方法


public static CompletableFuture<Void> runAsync(Runnable runnable
runAsync+线程池
public static CompletableFuture<Void> runAsync(Runnable runnable,
                                                   Executor executor)
supplyAsync 有返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) 
 supplyAsync+线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
                                                       Executor executor)



CompletableFuture优点总结

异步任务结束时,会自动回调某个对象的方法;

主线程设置好后,不再关心异步任务的执行,异步任务之间可以顺序执行

异步任务出错时,会自动回调某个对象的方法

1.2 为什么要使用线程池

1、2、 不能得到返回值
3 可以得到返回值但是他们都不能控制资源
实际玩的过程中我们都用线程池 第四方式。

1.3 线程池的介绍

线程池做的工作主要是控制运行的线程的数量,处理过程中将任务加入队列,然后在线程创建后启动这些任务,如果线程超过了最大数量,超出的数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行.

他的主要特点为:线程复用:控制最大并发数:管理线程.

第一:降低资源消耗.通过重复利用自己创建的线程降低线程创建和销毁造成的消耗.
第二: 提高响应速度.当任务到达时,任务可以不需要等到线程和粗昂就爱你就能立即执行.
第三: 提高线程的可管理性.线程是稀缺资源,如果无限的创建,不仅会消耗资源,还会较低系统的稳定性,使用线程池可以进行统一分配,调优和监控.

1.4 线程池如何使用?

Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor这几个类.


image.png

1.5 面试重点 说下常见的线程池呗

Executors.newFixedThreadPool(int)
执行一个长期的任务,性能好很多

  public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

主要特点如下:
1.创建一个定长线程池,可控制线程的最大并发数,超出的线程会在队列中等待.
2.newFixedThreadPool创建的线程池corePoolSize和MaxmumPoolSize是 相等的,它使用的的LinkedBlockingQueue

Executors.newSingleThreadExecutor()
一个任务一个线程执行的任务场景

 public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

主要特点如下:
1.创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务都按照指定顺序执行.
2.newSingleThreadExecutor将corePoolSize和MaxmumPoolSize都设置为1,它使用的的LinkedBlockingQueue

Executors.newCachedThreadPool()
适用:执行很多短期异步的小程序或者负载较轻的服务器

   public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

主要特点如下:
1.创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则创建新线程.
2.newCachedThreadPool将corePoolSize设置为0MaxmumPoolSize设置为Integer.MAX_VALUE,它使用的是SynchronousQUeue,也就是说来了任务就创建线程运行,如果线程空闲超过60秒,就销毁线程

1.6 面试重点 说下常见的线程池的七大参数呗

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 

从源码中可以看出,线程池的构造函数有7个参数,分别是corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler。下面会对这7个参数一一解释。

一、corePoolSize 线程池核心线程大小

线程池中会维护一个最小的线程数量,即使这些线程处理空闲状态,他们也不会被销毁,除非设置了allowCoreThreadTimeOut。这里的最小线程数量即是corePoolSize。

1.在创建了线程池后,当有请求任务来之后,就会安排池中的线程去执行请求任务,近视理解为今日当值线程
2.当线程池中的线程数目达到corePoolSize后,就会把到达的任务放入到缓存队列当中.

二、maximumPoolSize 线程池最大线程数量

一个任务被提交到线程池以后,首先会找有没有空闲存活线程,如果有则直接将任务交给这个空闲线程来执行,如果没有则会缓存到工作队列(后面会介绍)中,如果工作队列满了,才会创建一个新线程,然后从工作队列的头部取出一个任务交由新线程来处理,而将刚提交的任务放入工作队列尾部。线程池不会无限制的去创建新线程,它会有一个最大线程数量的限制,这个数量即由maximunPoolSize指定。

三、keepAliveTime 空闲线程存活时间

一个线程如果处于空闲状态,并且当前的线程数量大于corePoolSize,那么在指定时间后,这个空闲线程会被销毁,这里的指定时间由keepAliveTime来设定

默认情况下:
只有当线程池中的线程数大于corePoolSize时keepAliveTime才会起作用,知道线程中的线程数不大于corepoolSIze,
四、unit 空闲线程存活时间单位

keepAliveTime的计量单位

五、workQueue 工作队列

新任务被提交后,会先进入到此工作队列中,任务调度时再从队列中取出任务。jdk中提供了四种工作队列:

①ArrayBlockingQueue

基于数组的有界阻塞队列,按FIFO排序。新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。当线程池中线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程,如果线程数量已经达到maxPoolSize,则会执行拒绝策略。

②LinkedBlockingQuene

基于链表的无界阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序。由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize,因此使用该工作队列时,参数maxPoolSize其实是不起作用的。

③SynchronousQuene

一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。

④PriorityBlockingQueue

具有优先级的无界阻塞队列,优先级通过参数Comparator实现。

六、threadFactory 线程工厂

创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程等等

七、handler 拒绝策略

当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,该如何处理呢。这里的拒绝策略,就是解决这个问题的,jdk中提供了4中拒绝策略:
AbortPolicy(默认):直接抛出RejectedException异常阻止系统正常运行
CallerRunPolicy:"调用者运行"一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将任务分给调用线程来执行
DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交
DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常.如果允许任务丢失,这是最好的拒绝策略
以上内置策略均实现了RejectExecutionHandler接口

一句图总结中心思想:面试必问


image.png

1.7 坑你没商量

你在工作中单一的/固定数的/可变你的三种创建线程池的方法,你用哪个多?超级大坑

答案是一个都不用,我们生产上只能使用自定义的
Executors中JDK给你提供了为什么不用?

【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
说明:使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。

【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。说明:Executors返回的线程池对象的弊端如下:
1)FixedThreadPool和SingleThreadPool:允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
2)CachedThreadPool和ScheduledThreadPool:允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

1.7 你在工作中是如何创建线程池的,是否自定义过线程池使用

public class MyThreadPoolDemo {
    public static void main(String[] args) {
        ExecutorService threadPool = new ThreadPoolExecutor(
                2,
                5,
                1L,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<Runnable>(3),
                Executors.defaultThreadFactory(),
                //默认抛出异常
                //new ThreadPoolExecutor.AbortPolicy()
                //回退调用者
                //new ThreadPoolExecutor.CallerRunsPolicy()
                //处理不来的不处理
                //new ThreadPoolExecutor.DiscardOldestPolicy()
                new ThreadPoolExecutor.DiscardPolicy()
        );
        //模拟10个用户来办理业务 没有用户就是来自外部的请求线程.
        try {
            for (int i = 1; i <= 10; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t 办理业务");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
        //threadPoolInit();
    }

    private static void threadPoolInit() {
        /**
         * 一池5个处理线程
         */
        //ExecutorService threadPool= Executors.newFixedThreadPool(5);
        /**
         * 一池一线程
         */
        //ExecutorService threadPool= Executors.newSingleThreadExecutor();
        /**
         * 一池N线程
         */
        ExecutorService threadPool = Executors.newCachedThreadPool();
        //模拟10个用户来办理业务 没有用户就是来自外部的请求线程.
        try {
            for (int i = 1; i <= 20; i++) {
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + "\t 办理业务");
                });
                try {
                    TimeUnit.MICROSECONDS.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }
    }
}

此处看下RMQ的源码以及自己项目中的使用情况

1.7 合理配置线程池你是如何考虑的?

image.png

CPU 密集型
核心数+1。

但是为什么要加一呢?《Java并发编程实战》一书中给出的原因是:即使当计算(CPU)密集型的线程偶尔由于页缺失故障或者其他原因而暂停时,这个“额外”的线程也能确保 CPU 的时钟周期不会被浪费。
看不懂是不是?没关系我也看不懂。反正把它理解为一个备份的线程就行了。

IO密集型
cpu*2


image.png

既是IO密集型也是CPU密集型
合理分配即可

以上都是面试答案,但是
线程池使用面临的核心的问题在于:线程池的参数并不好配置。
一方面线程池的运行机制不是很好理解,配置合理需要强依赖开发人员的个人经验和知识;
另一方面,线程池执行的情况和任务类型相关性较大,IO密集型和CPU密集型的任务运行起来的情况差异非常大。
这导致业界并没有一些成熟的经验策略帮助开发人员参考。
https://blog.csdn.net/qq_41893274/article/details/112424767

1.8数据赏析

RMQ的BrokerController类等等

       //5、启动定时调度,每10秒钟扫描所有Broker,检查存活状态
 private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "NSScheduledThread"));

        this.scheduledExecutorService.scheduleAtFixedRate(() -> NamesrvController.this.routeInfoManager.scanNotActiveBroker(), 5, 10, TimeUnit.SECONDS);
//不是说不能用吗?你还用?为什么?



//周期性向nameserv发心跳,如果有变化则同步broker信息

 private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
        "BrokerControllerScheduledThread"));


   this.scheduledExecutorService.scheduleAtFixedRate(() -> {
                try {
                    BrokerController.this.getBrokerStats().record();
                } catch (Throwable e) {
                    log.error("schedule record error.", e);
                }
            }, initialDelay, period, TimeUnit.MILLISECONDS);

Broker向所有的nameServer注册自己的户口本信息。

   private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
        new ArrayBlockingQueue<Runnable>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));


  final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
            for (final String namesrvAddr : nameServerAddressList) {
                brokerOuterExecutor.execute(() -> {
                    try {
                        RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                        if (result != null) {
                            registerBrokerResultList.add(result);
                        }

                        log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                    } catch (Exception e) {
                        log.warn("registerBroker Exception, {}", namesrvAddr, e);
                    } finally {
                        countDownLatch.countDown();
                    }
                });
            }

            try {
                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,826评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,968评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,234评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,562评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,611评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,482评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,271评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,166评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,608评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,814评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,926评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,644评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,249评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,866评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,991评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,063评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,871评论 2 354

推荐阅读更多精彩内容