通过本文档你将学习到
- 线程初始化的几种方式。
- 线程的相关概念
- 线程池怎么玩
- 线程池的常见面试题
1 线程回顾知识
1.1 线程初始化的几种方式
1)、继承 Thread
2)、实现 Runnable 接口
3)、实现 Callable 接口 + FutureTask (可以拿到返回结果,可以处理异常)
4)、线程池
常见线程方式一
public static class Thread01 extends Thread {
@Override
public void run() {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
}
}
Thread thread = new Thread01();
thread.start();
常见线程方式二
public static class Runable01 implements Runnable {
@Override
public void run() {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
}
}
Runable01 runable01 = new Runable01();
new Thread(runable01).start();
说到上面两种,大家非常非常的熟悉,就不多扯淡了,接下来,将引出第三种Callable。
Thread thread = new Thread01();
thread.start();
相信这个大家再熟悉不过了,刚入门的时候,就是这么学习的。接下来我们就打开API
https://www.matools.com/api/java8
Callable,带返回值的
我们先看下自己超级熟悉的Thread() 构造方法,第一第二种方式都得创建Thread()
Thread()
分配一个新的 Thread对象。
Thread(Runnable target)
分配一个新的 Thread对象。
Thread(Runnable target, String name)
分配一个新的 Thread对象。
一般情况下我们都是使用 Thread(Runnable target, String name)吧
废话不多说 要是有一个这构造方法 Thread(Callable target, String name) 那不是直接搞起么
那么说明,我们一拍脑袋半秒就能想到的问题,这sum Oracle公司的天才们都想不到,
他们就是蠢货一个,马上写信给他们说,让我来拯救JAVA代码
那么接下来你有两种做法,马上发邮件告诉sum Oracle公司的天才们
1 我发现了BUG,你们没有扩展一个方法 Thread(Callable target, String name)
2 自动诞生的时候天才们就想到了扩展方法,根本不用多余的加个构造方法,最后发现小丑竟然是自己
你觉得是哪一个呢?
回归正题:本来我只能接受Runnable ,现在Callable 说,我也想勾搭下Thread,那么你准备怎么搞?
你想想搞定Thread,但是我不认识,那我就先搞定它周人的人,间接不就搞定了么?
你想想我们写代码的时候参数都是写的list 然后传递list的子类ArrayList对吧。更底层的代码直接传递的Collection,有时候大神写代码你看着更无语,直接传递一个接口,这个接口还没有任何方法,继承后也不用实现,这种混个身份,到时候可以透传的,如果你这是无语了,。你要说这人是傻逼,到最后发现小丑竟是我自己对吧?因为我们能力还达不到人家的境界,无法理解。等你能力提高后,你才会发现人家为什么这么写底层代码?哦明白了,看着好自然,对,就该这么搞。看着代码像大自然一样自然
FutureTask 就是中间人了,帮你撮合
Thread(Callable target, String name) 没有 这个构造,那么中间人过来
Thread(中间人 FutureTask , String name)
new Thread(futureTask , '"我的名字叫线程110号").star()
在此之前也得介绍下:Future
这个是一个接口,获取异步任务的结果,取消任务的执行,判断任务是否被取消,判断任务执行是否完毕。
主线程让子线程去执行其它事情。Future接口的实现类FutureTask 。
JDK.5以后常见方式三:
public static class Callable01 implements Callable <Integer> {
@Override
public Integer call() throws Exception {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return I;
}
}
我启动了1000个线程去跑对账任务,现在编号999线程出错了,你给我返回错误,我想看下为什么?
FutureTask<Integer> futureTask = new FutureTask<>(new Callable01());
new Thread(futureTask).start();
System.out.println(futureTask.get());
方式4 直接提交給线程池即可
public static ExecutorService executor = Executors.newFixedThreadPool(10);
// service.execute(new Runable01());
// Future<Integer> submit = service.submit(new Callable01());
// submit.get();
futureTask优势明显呀,你看搭配线程池用的飞起,那么它又缺点吗?
当然有呀,人无完人,代码也有缺点
1、futureTask.get()阻塞 也可以传递x秒 等待几秒
2、那我用isDone()方法。
这样也不行呀,浪费cpu字段,你一直while true 要有个回调通知那多好呀。
一般情况下我们都是通过轮询的方式获取结果,进来不要阻塞。
那有没有什么方式解决上面的缺点呢?
答案是肯定的,有的,我们不用这个类,我们有一个更好的选择 CompletableFuture 这个比Future还能干,Future能干的CompletableFuture也可以,CompletableFuture还能干Future不能干的。爽不爽?
CompletableFuture基本介绍。
阻塞的方式和异步编程的设计理念相违背,而轮询的方式会消耗无畏的CPU资源。因此,JDK8设计出CompletableFuture
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
在Java 8中, Complet able Future提供了非常强大的Future的扩展功能, 可以帮助我们简化异步编程的复杂性, 并且提供了函数式编程的能力, 可以通过回调的方式处理计算结果, 也提供了转换和组合Complet able Future的方法。
它可能代表一个明确完成的Future, 也有可能代表一个完成阶段(Completion Stage) , 它支持在计算完成以后触发一些函数或执行某些动作。
�核心的四个静态方法
public static CompletableFuture<Void> runAsync(Runnable runnable
runAsync+线程池
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor)
supplyAsync 有返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
supplyAsync+线程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor)
CompletableFuture优点总结
异步任务结束时,会自动回调某个对象的方法;
主线程设置好后,不再关心异步任务的执行,异步任务之间可以顺序执行
异步任务出错时,会自动回调某个对象的方法
1.2 为什么要使用线程池
1、2、 不能得到返回值
3 可以得到返回值但是他们都不能控制资源
实际玩的过程中我们都用线程池 第四方式。
1.3 线程池的介绍
线程池做的工作主要是控制运行的线程的数量,处理过程中将任务加入队列,然后在线程创建后启动这些任务,如果线程超过了最大数量,超出的数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行.
他的主要特点为:线程复用:控制最大并发数:管理线程.
第一:降低资源消耗.通过重复利用自己创建的线程降低线程创建和销毁造成的消耗.
第二: 提高响应速度.当任务到达时,任务可以不需要等到线程和粗昂就爱你就能立即执行.
第三: 提高线程的可管理性.线程是稀缺资源,如果无限的创建,不仅会消耗资源,还会较低系统的稳定性,使用线程池可以进行统一分配,调优和监控.
1.4 线程池如何使用?
Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor这几个类.
1.5 面试重点 说下常见的线程池呗
Executors.newFixedThreadPool(int)
执行一个长期的任务,性能好很多
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
主要特点如下:
1.创建一个定长线程池,可控制线程的最大并发数,超出的线程会在队列中等待.
2.newFixedThreadPool创建的线程池corePoolSize和MaxmumPoolSize是 相等的,它使用的的LinkedBlockingQueue
Executors.newSingleThreadExecutor()
一个任务一个线程执行的任务场景
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
主要特点如下:
1.创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务都按照指定顺序执行.
2.newSingleThreadExecutor将corePoolSize和MaxmumPoolSize都设置为1,它使用的的LinkedBlockingQueue
Executors.newCachedThreadPool()
适用:执行很多短期异步的小程序或者负载较轻的服务器
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
主要特点如下:
1.创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则创建新线程.
2.newCachedThreadPool将corePoolSize设置为0MaxmumPoolSize设置为Integer.MAX_VALUE,它使用的是SynchronousQUeue,也就是说来了任务就创建线程运行,如果线程空闲超过60秒,就销毁线程
1.6 面试重点 说下常见的线程池的七大参数呗
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
从源码中可以看出,线程池的构造函数有7个参数,分别是corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler。下面会对这7个参数一一解释。
一、corePoolSize 线程池核心线程大小
线程池中会维护一个最小的线程数量,即使这些线程处理空闲状态,他们也不会被销毁,除非设置了allowCoreThreadTimeOut。这里的最小线程数量即是corePoolSize。
1.在创建了线程池后,当有请求任务来之后,就会安排池中的线程去执行请求任务,近视理解为今日当值线程
2.当线程池中的线程数目达到corePoolSize后,就会把到达的任务放入到缓存队列当中.
二、maximumPoolSize 线程池最大线程数量
一个任务被提交到线程池以后,首先会找有没有空闲存活线程,如果有则直接将任务交给这个空闲线程来执行,如果没有则会缓存到工作队列(后面会介绍)中,如果工作队列满了,才会创建一个新线程,然后从工作队列的头部取出一个任务交由新线程来处理,而将刚提交的任务放入工作队列尾部。线程池不会无限制的去创建新线程,它会有一个最大线程数量的限制,这个数量即由maximunPoolSize指定。
三、keepAliveTime 空闲线程存活时间
一个线程如果处于空闲状态,并且当前的线程数量大于corePoolSize,那么在指定时间后,这个空闲线程会被销毁,这里的指定时间由keepAliveTime来设定
默认情况下:
只有当线程池中的线程数大于corePoolSize时keepAliveTime才会起作用,知道线程中的线程数不大于corepoolSIze,
四、unit 空闲线程存活时间单位
keepAliveTime的计量单位
五、workQueue 工作队列
新任务被提交后,会先进入到此工作队列中,任务调度时再从队列中取出任务。jdk中提供了四种工作队列:
①ArrayBlockingQueue
基于数组的有界阻塞队列,按FIFO排序。新任务进来后,会放到该队列的队尾,有界的数组可以防止资源耗尽问题。当线程池中线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程,如果线程数量已经达到maxPoolSize,则会执行拒绝策略。
②LinkedBlockingQuene
基于链表的无界阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序。由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize,因此使用该工作队列时,参数maxPoolSize其实是不起作用的。
③SynchronousQuene
一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。
④PriorityBlockingQueue
具有优先级的无界阻塞队列,优先级通过参数Comparator实现。
六、threadFactory 线程工厂
创建一个新线程时使用的工厂,可以用来设定线程名、是否为daemon线程等等
七、handler 拒绝策略
当工作队列中的任务已到达最大限制,并且线程池中的线程数量也达到最大限制,这时如果有新任务提交进来,该如何处理呢。这里的拒绝策略,就是解决这个问题的,jdk中提供了4中拒绝策略:
AbortPolicy(默认):直接抛出RejectedException异常阻止系统正常运行
CallerRunPolicy:"调用者运行"一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将任务分给调用线程来执行
DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交
DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常.如果允许任务丢失,这是最好的拒绝策略
以上内置策略均实现了RejectExecutionHandler接口
一句图总结中心思想:面试必问
1.7 坑你没商量
你在工作中单一的/固定数的/可变你的三种创建线程池的方法,你用哪个多?超级大坑
答案是一个都不用,我们生产上只能使用自定义的
Executors中JDK给你提供了为什么不用?
【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
说明:使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。说明:Executors返回的线程池对象的弊端如下:
1)FixedThreadPool和SingleThreadPool:允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
2)CachedThreadPool和ScheduledThreadPool:允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。
1.7 你在工作中是如何创建线程池的,是否自定义过线程池使用
public class MyThreadPoolDemo {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
1L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(3),
Executors.defaultThreadFactory(),
//默认抛出异常
//new ThreadPoolExecutor.AbortPolicy()
//回退调用者
//new ThreadPoolExecutor.CallerRunsPolicy()
//处理不来的不处理
//new ThreadPoolExecutor.DiscardOldestPolicy()
new ThreadPoolExecutor.DiscardPolicy()
);
//模拟10个用户来办理业务 没有用户就是来自外部的请求线程.
try {
for (int i = 1; i <= 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t 办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
//threadPoolInit();
}
private static void threadPoolInit() {
/**
* 一池5个处理线程
*/
//ExecutorService threadPool= Executors.newFixedThreadPool(5);
/**
* 一池一线程
*/
//ExecutorService threadPool= Executors.newSingleThreadExecutor();
/**
* 一池N线程
*/
ExecutorService threadPool = Executors.newCachedThreadPool();
//模拟10个用户来办理业务 没有用户就是来自外部的请求线程.
try {
for (int i = 1; i <= 20; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t 办理业务");
});
try {
TimeUnit.MICROSECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
此处看下RMQ的源码以及自己项目中的使用情况
1.7 合理配置线程池你是如何考虑的?
CPU 密集型
核心数+1。
但是为什么要加一呢?《Java并发编程实战》一书中给出的原因是:即使当计算(CPU)密集型的线程偶尔由于页缺失故障或者其他原因而暂停时,这个“额外”的线程也能确保 CPU 的时钟周期不会被浪费。
看不懂是不是?没关系我也看不懂。反正把它理解为一个备份的线程就行了。
IO密集型
cpu*2
既是IO密集型也是CPU密集型
合理分配即可
以上都是面试答案,但是
线程池使用面临的核心的问题在于:线程池的参数并不好配置。
一方面线程池的运行机制不是很好理解,配置合理需要强依赖开发人员的个人经验和知识;
另一方面,线程池执行的情况和任务类型相关性较大,IO密集型和CPU密集型的任务运行起来的情况差异非常大。
这导致业界并没有一些成熟的经验策略帮助开发人员参考。
https://blog.csdn.net/qq_41893274/article/details/112424767
1.8数据赏析
RMQ的BrokerController类等等
//5、启动定时调度,每10秒钟扫描所有Broker,检查存活状态
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"NSScheduledThread"));
this.scheduledExecutorService.scheduleAtFixedRate(() -> NamesrvController.this.routeInfoManager.scanNotActiveBroker(), 5, 10, TimeUnit.SECONDS);
//不是说不能用吗?你还用?为什么?
//周期性向nameserv发心跳,如果有变化则同步broker信息
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"BrokerControllerScheduledThread"));
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
BrokerController.this.getBrokerStats().record();
} catch (Throwable e) {
log.error("schedule record error.", e);
}
}, initialDelay, period, TimeUnit.MILLISECONDS);
Broker向所有的nameServer注册自己的户口本信息。
private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(() -> {
try {
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if (result != null) {
registerBrokerResultList.add(result);
}
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
});
}
try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}