线程的基本操作
用new关键字创建一个线程对象,然后调用它的start()启动线程即可。
Thread thread = new Thread();
thread.start();
线程有个run()方法,start()会创建一个新的线程并让这个线程执行run()方法。
//这种run()方法调用不能新建一个线程,而是在当前线程中调用run()方法,
//将run方法只是作为一个普通的方法调用。
Thread thread1 = new Thread();
thread1.run();
start方法是启动一个线程,run方法只会在当前线程中串行的执行run方法中的代码。
Thread thread = new Thread(){
@Override
public void run() {
System.out.println("xxxxxxx");
}
};
thread.start();
通过继承Thread类,然后重写run方法,来自定义一个线程。
java中刚好提供了Runnable接口来自定义一个线程。
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
Thread类有一个非常重要的构造方法:
public Thread(Runnable target)
Thread的run方法:
@Override
public void run() {
if (target != null) {
target.run();
}
}
实现Runnable接口是比较常见的做法,也是推荐的做法。
Thread thread2 =new Thread(new Runnable() {
@Override
public void run() {
}
});
thread2.start();
线程状态 Thread
6个状态定义:java.lang.Thread.State
1.NEW : 尚未启动的线程的线程状态
2.RUNNABLE :可运行线程的线程状态,等待cpu调度
3.BLOCKED:线程阻塞等待监视器锁定的线程状态,处在synchronized同步代码块或方法中被阻塞。
4.WAITING:等待线程的线程状态,如 不带超时的方式:Object.wait,Thread.join,LockSupport.park进入这个状态
5.TIMED_WAITING:具有指定等待时间的等待线程的此案陈状态,如 带超时方式;Thread.sleep,Object.wait,Thread.join,LockSupport.parkNanos,LockSupport.parkUntil进入这个状态
6.TERMINATED:终止线程的线程状态。线程正常完成执行或出现异常
线程终止
正确的线程中止 interrupter
如果目标线程在调用Object class的wait(),wait(long)或wait(long,int)方法,join(),join(long,int),或sleep(long,int)方法是被阻塞,那么Interrupt会生效,该线程的中断状态将被清除,抛出InterrupttedException异常。
如果目标线程是被I/O或者NIO中的Channel锁阻塞,同样,I/O操作会被中断或者返回特殊异常值,达到终止线程的目的。
如果以上条件都不满足,则会设置此线程的中断状态。
正确的线程中止 标志位
在代码逻辑中,增加一个判断,用来控制。
Thread提供了3个与线程中断有关的方法:
public void interrupt()//中断线程
public boolean isInterrupted()//判断线程是否中断
public static boolean interrupted()//判断线程是否被中断,并清除当前中断状态
public static void main(String[] args) throws InterruptedException {
Thread thread2 =new Thread(){
@Override
public void run() {
while (true){
if (this.interrupted()){
break;
}
}
}
};
thread2.start();
TimeUnit.SECONDS.sleep(1);
thread2.interrupt();//中断
}
如果一个线程调用了sleep方法,一直处于休眠状态,通过变量控制,不可以中断线程。此时只能使用线程提供的interrupt方法来中断线程了。
Thread thread2 =new Thread(){
@Override
public void run() {
while (true){
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
this.interrupt();//抛异常,发出终止线程
e.printStackTrace();
}
if (this.isInterrupted()){
break;
}
}
}
};
thread2.setName("thread2");
thread2.start();
TimeUnit.SECONDS.sleep(1);
thread2.interrupt();//中断
}
sleep方法由于中断而抛出异常之后,线程的中断标志会被清除(置为false),所以在异常中需要执行this.interrupt()方法,将中断标志位置为true.
线程通信
通信方式:
1)文件共享
2)网络共享
3)共享变量
4)jdk提供的线程协调API
线程协作 JDK API
多线程协作的典型场景:生产者 - 消费者 (线程的阻塞,线程唤醒)
等待(wait)和通知(notify)
为了支持 多线程 之间的协作,JDK提供了两个非常重要的方法:等待wait()方法和通知notify()方法。在Object类中定义的。这意味着所有的对象都可以调用者两个方法。java.lang.Object
public final void wait() throws InterruptedException
public final native void notify();//随机唤醒
当在一个对象实例上调用wait()方法后,当前线程就会在这个对象上等待。
比如在线程A中,调用了obj.wait()方法,那么线程A就会停止继续执行,转为等待状态。等待到什么时候结束呢?线程A会一直等到其他线程调用obj.notify()方法为止,这时,obj对象成为了多个线程之间的有效通信手段。
如果一个线程调用了object.wait()方法,那么它就会进出object对象的等待队列。这个队列中,可能会有多个线程,因为系统可能运行多个线程同时等待某一个对象。
当object.notify()方法被调用时,它就会从这个队列中 随机 选择一个线程,并将其唤醒。这个选择是不公平.
Object独享还有一个nofiyAll()方法,它和notify()方法的功能类似,不同的是,它会唤醒在这个等待队列中所有等待的线程,而不是随机选择一个。
Object.wait()方法并不能随便调用。它必须包含在对应的synchronize语句汇总,无论是wait()方法或者notify()方法都需要首先获取目标独享的一个监视器。
Object.wait()方法和Thread.sleeep()方法都可以让现场等待若干时间。除wait()方法可以被唤醒外,另外一个主要的区别就是wait()方法会释放目标对象的锁,而Thread.sleep()方法不会释放锁。
park/unpark机制
线程调用park则等待“许可”,unpark方法为指定线程
提供“许可permit”。
不要求 park和unpark方法的调用顺序。
调了几次park,就得调几次unpark
线程封闭
数据都被封闭在各自的线程之中,就不需要同步,这种通过将数据封闭在线程中而避免使用同步的技术即线程封闭。
ThreadLocal
ThreadLocal是java中的一种特殊的变量,每个线程都有一个ThreadLocal就是每个线程都拥有了自己的一个变量,竞争条件被彻底消除了,在并发模式下是绝对安全的变量。
用法:
ThreadLocal<T> var = new ThreadLocal<T>();
会自动在每一个线程上创建一个T的副本,副本之间彼此独立,互不影响。可以用ThreadLocal存储一些参数,以便在线程中多个方法中使用,用来代替方法传参的做法。
栈封闭
局部变量的固有属性之一即是封闭在线程中,它们位于执行线程的栈中,其它线程无法访问这个栈。
线程池及原理
volatile修饰共享变量
java帮我们提供了这样的方法,使用volatile修饰共享变量,被volatile修改的变量有以下特点:
1.线程中读取的时候,每次读取都会去主内存中读取共享变量最新的值,然后将其复制到工作内存
2.线程中修改了工作内存中变量的副本,修改之后会立即刷新到主内存
volatile解决了共享变量在多线程中可见性的问题,可见性是指一个线程对共享变量的修改,对于另一个线程来说是否是可以看到的。
volatile不能保证线程安全,只能保证被修饰变量的内存可见性,如果对该变量执行的是非原子性操作依旧线程不安全。
什么是线程池?
如果系统能够提前为我们创建好线程,我们需要的时候直接拿来使用,用完之后不是直接将其关闭,而是将其返回到线程中中,给其他需要这使用,这样直接节省了创建和销毁的时间,提升了系统的性能。
线程池实现原理
当向线程池提交一个任务之后,线程池的处理流程如下:
- 判断是否达到核心线程数,若未达到,则直接创建新的线程处理当前传入的任务,否则进入下个流程
- 线程池中的工作队列是否已满,若未满,则将任务丢入工作队列中先存着等待处理,否则进入下个流程
- 是否达到最大线程数,若未达到,则创建新的线程处理当前传入的任务,否则交给线程池中的饱和策略进行处理。
jdk中提供了线程池的具体实现,实现类是:java.util.concurrent.ThreadPoolExecutor
,主要构造方法:ThreadPoolExecutor
类型 | 类名 | 描述 |
---|---|---|
接口 | Executor | 最上层的接口,定义了执行任务的 execute |
接口 | ExecutorService | 继承Executor接口,扩展Callable,Tutrue,关闭方法 |
接口 | ScheduledExecutorService | 继承ExecutorService,增加定时任务相关方法 |
实现类 | ThreadPoolExecutor | 基础,标准的线程池实现 |
实现类 | ScheduledThreadPoolExecutor | 继承ThreadPoolExecutor,实现ScheduledExecutorService定时任务的方法 |
ExecutorService
//优雅关闭线程,之前提交的任务继续执行,但不接受新的任务
void shutdown();
//尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行任务的列表
List<Runnable> shutdownNow();
//如果关闭后所有任务都已完成,则返回true
boolean isTerminated();
//监测线程池是否关闭,直到所有任务完成执行,或超时发生,或当前线程被中断
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
//提交一个用于执行Callable返回任务,并返回一个Future,用获取Callable执行结果
<T> Future<T> submit(Callable<T> task);
//提交一个运行任务执行,并返回一个Future, 执行结果为传入result
<T> Future<T> submit(Runnable task, T result);
//提交一个运行任务执行,并返回一个Future, 执行结果null
Future<?> submit(Runnable task);
//执行给定的任务集合,执行完毕后返回结果
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
//执行给定的任务集合,执行完毕或超时,返回结果,其它任务终止
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
//执行给定的任务,任意一个执行成功,则返回结果,其它终止
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
//执行给定的任务,任意一个执行成功或超时,则返回结果,其它终止
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
ScheduledExecutorService
//创建并执行一个一次性任务,过了延迟时间会被执行
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
//创建并执行一个一次性任务,过了延迟时间会被执行
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
//创建并执行一个周期性任务,过了给定初始延迟时间,会第一次被执行,执行过程发生异常,那么任务终止
//一次性任务执行超过了周期时间,下一次任务会等到该任务执行结束后,立刻执行
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
//创建并执行一个周期性任务,过了给定初始延迟时间,第一次被执行,后续以给定的周期时间执行
//执行过程中发生异常,那么任务就停止。
//一次性任务执行超过了周期时间,下一次任务在该任务执行结束的时间基础上,计算执行延时
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
corePoolSize:核心线程大小,当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使有其他空闲线程可以处理任务也会创新线程,等到工作的线程数大于核心线程数时就不会在创建了。如果调用了线程池的prestartAllCoreThreads
方法,线程池会提前把核心线程都创造好,并启动
maximumPoolSize:线程池允许创建的最大线程数。如果队列满了,并且以创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。如果我们使用了无界队列,那么所有的任务会加入队列,这个参数就没有什么效果了
keepAliveTime:线程池的工作线程空闲后,保持存活的时间。如果没有任务处理了,有些线程会空闲,空闲的时间超过了这个值,会被回收掉。如果任务很多,并且每个任务的执行时间比较短,避免线程重复创建和回收,可以调大这个时间,提高线程的利用率
unit:keepAliveTIme的时间单位,可以选择的单位有天、小时、分钟、毫秒、微妙、千分之一毫秒和纳秒。类型是一个枚举java.util.concurrent.TimeUnit
,这个枚举也经常使用,有兴趣的可以看一下其源码
workQueue:工作队列,用于缓存待处理任务的阻塞队列,常见的有4种,本文后面有介绍
threadFactory:线程池中创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字
handler:饱和策略,当线程池无法处理新来的任务了,那么需要提供一种策略处理提交的新任务,默认有4种策略
调用线程池的execute方法处理任务,执行execute方法的过程:
- 判断线程池中运行的线程数是否小于corepoolsize,是:则创建新的线程来处理任务,否:执行下一步
- 试图将任务添加到workQueue指定的队列中,如果无法添加到队列,进入下一步
- 判断线程池中运行的线程数是否小于
maximumPoolSize
,是:则新增线程处理当前传入的任务,否:将任务传递给handler
对象rejectedExecution
方法处理
线程池的使用步骤:
- 调用构造方法创建线程池
- 调用线程池的方法处理任务
- 关闭线程池
/**
* ThreadPoolExecutor 线程池
*
*/
public class ThreadPoolExecutorDemo {
static ThreadPoolExecutor executor = new ThreadPoolExecutor(3,
5,
10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
for (int i=0;i<10;i++){
int j = i;
String taskName = "任务"+j;
executor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(j);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+taskName+"处理完毕");
});
}
executor.shutdown();
}
}
线程池中常见5种工作队列
任务太多的时候,工作队列用于暂时缓存待处理的任务,jdk中常见的5种阻塞队列:
ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按照先进先出原则对元素进行排序
LinkedBlockingQueue:是一个基于链表结构的阻塞队列,此队列按照先进先出排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool
使用了这个队列。
SynchronousQueue :一个不存储元素的阻塞队列,每个插入操作必须等到另外一个线程调用移除操作,否则插入操作一直处理阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool
使用这个队列
PriorityBlockingQueue:优先级队列,进入队列的元素按照优先级会进行排序
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
System.out.println("开始");
for (int i=0;i<50;i++){
int j = i;
String taskName = "任务"+j;
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName()+" 处理 "+taskName);
try {
//模拟任务内部处理耗时
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
System.out.println("结束");
executorService.shutdown();
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
使用上面的方式创建线程池需要注意,如果需要处理的任务比较耗时,会导致新来的任务都会创建新的线程进行处理,可能会导致创建非常多的线程,最终耗尽系统资源,触发OOM。
PriorityBlockingQueue优先级队列的线程池
static class Task implements Runnable,Comparable<Task>{
private int i;
private String name;
public Task(int i,String name){
this.i = i;
this.name=name;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" 处理 "+this.name);
}
@Override
public int compareTo(Task o) {
return Integer.compare(o.i,this.i);
}
}
public static void main(String[] args) {
ExecutorService executorService = new ThreadPoolExecutor(1,1,60L,
TimeUnit.SECONDS,new PriorityBlockingQueue<>());
for (int i=0;i<10;i++){
int j = i;
String taskName = " 任务 "+j;
executorService.execute(new Task(i,taskName));
}
for (int i=100;i>=90;i--){
int j = i;
String taskName = " 任务 "+j;
executorService.execute(new Task(i,taskName));
}
executor.shutdown();
}
除了第一个任务,其他任务按照优先级高低按顺序处理。原因在于:创建线程池的时候使用了优先级队列,进入队列中的任务会进行排序,任务的先后顺序由Task中的i变量决定。向PriorityBlockingQueue
加入元素的时候,内部会调用代码中Task的compareTo
方法决定元素的先后顺序。
自定义创建线程的工厂
给线程池中线程起一个有意义的名字,在系统出现问题的时候,通过线程堆栈信息可以更容易发现系统中问题所在。自定义创建工厂需要实现java.util.concurrent.ThreadFactory
接口中的Thread newThread(Runnable r)
方法,参数为传入的任务,需要返回一个工作线程。
static AtomicInteger threadNum = new AtomicInteger(1);
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5,5,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10),
r -> {
Thread thread = new Thread(r);
thread.setName("自定义线程--"+threadNum.getAndIncrement());
return thread;
});
for (int i=0;i<5;i++){
String taskName = "任务--" + i;
executor.execute(() ->{
System.out.println(Thread.currentThread().getName()+" , 处理: "+taskName);
});
}
executor.shutdown();
}
4种常见饱和策略
当线程池中队列已满,并且线程池已达到最大线程数,线程池会将任务传递给饱和策略进行处理。这些策略都实现了RejectedExecutionHandler
接口。接口中有个方法:
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
r:需要执行的任务
executor:当前线程池对象
JDK中提供了4种常见的饱和策略:
AbortPolicy:直接抛出异常
CallerRunsPolicy:在当前调用者的线程中运行任务,即随丢来的任务,由他自己去处理
DiscardOldestPolicy:丢弃队列中最老的一个任务,即丢弃队列头部的一个任务,然后执行当前传入的任务
DiscardPolicy:不处理,直接丢弃掉,方法内部为空
自定义饱和策略
需要实现RejectedExecutionHandler
接口。任务无法处理的时候,我们想记录一下日志,我们需要自定义一个饱和策略,示例代码:
static class Task implements Runnable{
private String name;
public Task(String name){
this.name = name;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+", 处理="+this.name);
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String toString() {
return "Task{name="+name+"}";
}
}
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1,1,
60L,TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(1),
Executors.defaultThreadFactory(),
(r,rexecutor) -> {
//自定义饱和策略
//记录一下无法处理的任务
System.out.println("无法处理的任务: "+r.toString());
});
for (int i=0;i<5;i++){
executor.execute(new Task(" 任务-"+i));
}
executor.shutdown();
}
输出结果中可以看到有3个任务进入了饱和策略中,记录了任务的日志,对于无法处理多任务,我们最好能够记录一下,让开发人员能够知道。任务进入了饱和策略,说明线程池的配置可能不是太合理,或者机器的性能有限,需要做一些优化调整。
扩展线程池
ThreadPoolExecutor
内部提供了几个方法beforeExecute
、afterExecute
、terminated
,可以由开发人员自己去这些方法。看一下线程池内部的源码:
beforeExecute:任务执行之前调用的方法,有2个参数,第1个参数是执行任务的线程,第2个参数是任务
protected void beforeExecute(Thread t, Runnable r) { }
afterExecute:任务执行完成之后调用的方法,2个参数,第1个参数表示任务,第2个参数表示任务执行时的异常信息,如果无异常,第二个参数为null
protected void afterExecute(Runnable r, Throwable t) { }
terminated:线程池最终关闭之后调用的方法。所有的工作线程都退出了,最终线程池会退出,退出时调用该方法
protected void terminated() { }
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(10,
10,60L,TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(1),
Executors.defaultThreadFactory(),
(r,executors) -> {
//自定义饱和策略
//记录一下无法处理的任务
System.out.println("无法处理的任务-" + r.toString());
}){
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println(System.currentTimeMillis()+","+t.getName()+",开始执行任务:" + r.toString());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println(System.currentTimeMillis()+","+Thread.currentThread().getName()+",r任务:" + r.toString()+",执行完毕");
}
@Override
protected void terminated() {
System.out.println(System.currentTimeMillis()+","+Thread.currentThread().getName() + ",关闭线程池");
}
};
for (int i= 0;i<10;i++){
executor.execute(new Task("任务-"+i));
}
TimeUnit.SECONDS.sleep(2);
executor.shutdown();
}
JUC中的Executors框架
Excecutor框架主要包含3部分的内容:
- 任务相关的:包含被执行的任务要实现的接口:Runnable接口或Callable接口
- 任务的执行相关的:包含任务执行机制的核心接口Executor,以及继承自
Executor
的ExecutorService
接口。Executor框架中有两个关键的类实现了ExecutorService接口(ThreadPoolExecutor
和ScheduleThreadPoolExecutor
) - 异步计算结果相关的:包含接口Future和实现Future接口的FutureTask类
Executors框架包括:
- Executor
- ExecutorService
- ThreadPoolExecutor
- Executors
- Future
- Callable
- FutureTask
- CompletableFuture
- CompletionService
- ExecutorCompletionService
Executors类
Executors类,提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。常用的方法有:
newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor(){}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {}
创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。内部使用了无限容量的LinkedBlockingQueue阻塞队列来缓存任务,任务如果比较多,单线程如果处理不过来,会导致队列堆满,引发OOM。
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {}
创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,在提交新任务,任务将会进入等待队列中等待。如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。内部使用了无限容量的LinkedBlockingQueue阻塞队列来缓存任务,任务如果比较多,如果处理不过来,会导致队列堆满,引发OOM。
newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {}
创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。
在《阿里巴巴java开发手册》中指出了线程资源必须通过线程池提供,
使用ThreadPoolExecutor有助于大家明确线程池的运行规则,创建符合自己的业务场景需要的线程池,避免资源耗尽的风险。
Future、Callable接口
Future
接口定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。
Callable
接口中定义了需要有返回的任务需要实现的方法。
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
比如主线程让一个子线程去执行任务,子线程可能比较耗时,启动子线程开始执行任务后,主线程就去做其他事情了,过了一会才去获取子任务的执行结果。
//获取异步任务执行结果
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(1);
Future<Integer> result = executorService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println(System.currentTimeMillis()+","+Thread.currentThread().getName()+",开始");
TimeUnit.SECONDS.sleep(5);
System.out.println(System.currentTimeMillis()+","+Thread.currentThread().getName()+",结束");
return 10;
}
});
System.out.println(System.currentTimeMillis() +" "+ Thread.currentThread().getName());
try {//get()阻塞等待结果返回
System.out.println(System.currentTimeMillis() +" "+ Thread.currentThread().getName()+", result="+result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}finally {
executorService.shutdown();
}
}
代码中创建了一个线程池,调用线程池的submit
方法执行任务,submit参数为Callable
接口:表示需要执行的任务有返回值,submit方法返回一个Future
对象,Future相当于一个凭证,可以在任意时间拿着这个凭证去获取对应任务的执行结果(调用其get
方法),代码中调用了result.get()
方法之后,此方法会阻塞当前线程直到任务执行结束。
超时获取异步任务执行结果
可能任务执行比较耗时,比如耗时1分钟,我最多只能等待10秒,如果10秒还没返回,我就去做其他事情了。
刚好get有个超时的方法,声明如下:
java.util.concurrent.Future;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
Future
其他方法介绍一下
cancel:取消在执行的任务,参数表示是否对执行的任务发送中断信号,方法声明如下:
boolean cancel(boolean mayInterruptIfRunning);
isCancelled:用来判断任务是否被取消
isDone:判断任务是否执行完毕。
Future、Callable接口需要结合ExecutorService来使用,需要有线程池的支持。
FutureTask类
FutureTask除了实现Future接口,还实现了Runnable接口,因此FutureTask可以交给Executor执行,也可以交给线程执行执行(Thread有个Runnable的构造方法),FutureTask表示带返回值结果的任务。
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println(System.currentTimeMillis() + "开始" + Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(3);
System.out.println(System.currentTimeMillis() + " 结束 " + Thread.currentThread().getName());
return 10;
}
});
System.out.println(System.currentTimeMillis() + " " + Thread.currentThread().getName());
new Thread(futureTask).start();//线程启动Futuretask
System.out.println(System.currentTimeMillis() + " " + Thread.currentThread().getName());
System.out.println(System.currentTimeMillis() +" "+ Thread.currentThread().getName()+",result:"+futureTask.get());
}
线程池的submit方法返回的Future实际类型正是FutureTask对象
CompletionService接口
java.util.concurrent.CompletionService
CompletionService相当于一个执行任务的服务,通过submit丢任务给这个服务,服务内部去执行任务,可以通过服务提供的一些方法获取服务中已经完成的任务。
接口内的几个方法:
Future<V> submit(Callable<V> task);
用于向服务中提交有返回结果的任务,并返回Future对象
Future<V> take() throws InterruptedException;
从服务中返回并移除一个已经完成的任务,如果获取不到,会一致阻塞到有返回值为止。此方法会响应线程中断。
从服务中返回并移除一个已经完成的任务,如果获取不到,会一致阻塞到有返回值为止。此方法会响应线程中断。
Future<V> poll();
通过submit向内部提交任意多个任务,通过take方法可以获取已经执行完成的任务,如果获取不到将等待。
ExecutorCompletionService 类
ExecutorCompletionService类是CompletionService接口的具体实现;
ExecutorCompletionService创建的时候会传入一个线程池,调用submit方法传入需要执行的任务,任务由内部的线程池来处理;ExecutorCompletionService内部有个阻塞队列,任意一个任务完成之后,会将任务的执行结果(Future类型)放入阻塞队列中,然后其他线程可以调用它take、poll方法从这个阻塞队列中获取一个已经完成的任务,获取任务返回结果的顺序和任务执行完成的先后顺序一致,所以最先完成的任务会先返回。
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
构造方法需要传入一个Executor对象,这个对象表示任务执行器,所有传入的任务会被这个执行器执行。
completionQueue
是用来存储任务结果的阻塞队列,默认用采用的是LinkedBlockingQueue
,也支持开发自己设置。通过submit传入需要执行的任务,任务执行完成之后,会放入completionQueue
中。
执行一批任务,然后消费执行结果
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<Callable<Integer>> list = new ArrayList<>();
int taskCount = 5;
for (int i = taskCount;i>0;i--){
int j=i*2;
list.add(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
TimeUnit.SECONDS.sleep(j);
return j;
}
});
}
soive(executorService,list,new Consumer<Integer>(){
@Override
public void accept(Integer integer) {
System.out.println(System.currentTimeMillis()+" = "+integer);
}
});
executorService.shutdown();
}
private static <T> void soive(ExecutorService executorService, List<Callable<T>> solvers, Consumer<T> consumer) throws InterruptedException, ExecutionException {
CompletionService<T> ecs = new ExecutorCompletionService<T>(executorService);
for (Callable<T> s:solvers){
ecs.submit(s);//提交任务
}
int n = solvers.size();
for (int i = 0;i<n;i++){
T r = ecs.take().get();
if (r !=null){
consumer.accept(r);//消费任务
}
}
}