前言
阻塞”与"非阻塞"与"同步"与“异步"不能简单的从字面理解,提供一个从分布式系统角度的回答。
同步与异步
所谓同步,就是在发出一个调用时,在没有得到结果之前,该调用就不返回。换句话说,就是由调用者主动等待这个调用的结果,也就是必须一件一件事做,等前一件做完了才能做下一件事。
而异步则是相反,调用在发出之后,这个调用就直接返回了。换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果。而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用。典型的异步编程模型比如Node.js(感觉自从Node火起来之后,java中的异步也开始火起来,比如Vertx这个被称为java版的Node库)。
看过知乎上严肃关于这个问题回答的通俗的例子:你打电话问书店老板有没有《分布式系统》这本书,如果是同步通信机制,书店老板会说,你稍等,”我查一下",然后开始查啊查,等查好了(可能是5秒,也可能是一天)告诉你结果(返回结果)。
而异步通信机制,书店老板直接告诉你我查一下啊,查好了打电话给你,然后直接挂电话了(不返回结果)。然后查好了,他会主动打电话给你。在这里老板通过“回电”这种方式来回调。
阻塞与非阻塞
阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态.
阻塞调用是指调用结果返回之前,当前线程会被挂起(线程进入非可执行状态,在这个状态下,cpu不会给线程分配时间片,即线程暂停运行)。调用线程只有在得到结果之后才会返回。非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程,可以继续执行其他任务。
还是上面的例子,你打电话问书店老板有没有《分布式系统》这本书,你如果是阻塞式调用,你会一直把自己“挂起”,直到得到这本书有没有的结果,如果是非阻塞式调用,你不管老板有没有告诉你,你自己先一边去玩了, 当然你也要偶尔过几分钟check一下老板有没有返回结果。在这里阻塞与非阻塞与是否同步异步无关,跟老板通过什么方式给你结果也无关。
先简单看下使用jdk实现异步的方式:
java future
要实现异步不然要开启新的线程,如果是靠自己去开启线程、执行、线程切换等,则效率太低,所以我们也需要使用线程池来管理线程。
- Executors创建线程池的几种常见方式
Executors是一个工厂类,提供很多静态方法,注意区分Executor接口。Executor通过Executors可以创建不同类似的线程池,常见的大概有下表几种类型,在实际应用中,主要使用newCachedThreadPool和newFixedThreadPool来创建线程池。
类名 | 说明 |
---|---|
newCachedThreadPool | 缓存型线程池,先查看池中是否有以前建立的可用的线程,如果有,就重用改线程;如果没有,就建一个新的线程加入池中使用。缓存型池子通常用于执行一些生存期很短的异步型任务。因此在一些面向连接的daemon型SERVER中用得不多。能reuse的线程,必须是timeout IDLE内的池中线程,缺省timeout为60s,超过这个IDLE时长,线程实例将被终止并移出池子。注意:放入CachedThreadPool的线程超过TIMEOUT不活动,其会自动被终止。 |
newFixedThreadPool | 和cacheThreadPool类似,有可用的线程就使用,但不能随时建新的线程。其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子。cache池和fixed池调用的是同一个底层池,只不过参数不同:fixed池线程数固定,并且是0秒IDLE(无IDLE)。所以FixedThreadPool多数针对一些很稳定很固定的正规并发线程,多用于服务器。cache池线程数支持0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60秒IDLE。 |
ScheduledThreadPool | 调度型线程池。这个池子里的线程可以按schedule依次delay执行,或周期执行。 |
SingleThreadExecutor | 单例线程,任意时间池中只能有一个线程。用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE)。 |
- Executors创建线程池
// 创建一个cache线程池
ExecutorService service = Executors.newCachedThreadPool();
// 实际返回的是ThreadPoolExecutor对象
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
从下面的继承关系可以看出,AbstractExecutorService实现了ExecutorService ,ThreadPoolExecutor继承自AbstractExecutorService
- ExecutorService(线程池)
ExecutorService继承了Executor接口,在java.util.concurrent包中,在原有execute方法的基础上新增了submit方法,可以传入Callable和Runnable类型的task,返回一个Future类型的对象,获取异步执行的结果。
submit(Callable)和submit(Runnable)类似,都会返回一个Future对象,但是除此之外,submit(Callable)接收的是一个Callable的实现,Callable接口中的call()方法有一个返回值,可以返回任务的执行结果,而Runnable接口中的run()方法是void的,没有返回值。submit(Runnable)如果任务执行完成,future.get()方法会返回一个null;submit(Callable)任务执行完成,future.get()方法会返回Callable任务的执行结果。注意,future.get()方法会产生阻塞。
- ExecutorService的关闭
当我们使用完成ExecutorService之后应该关闭它,否则它里面的线程会一直处于运行状态。
举个例子,如果的应用程序是通过main()方法启动的,在这个main()退出之后,如果应用程序中的ExecutorService没有关闭,这个应用将一直运行。之所以会出现这种情况,是因为ExecutorService中运行的线程会阻止JVM关闭。
如果要关闭ExecutorService中执行的线程,我们可以调用ExecutorService.shutdown()方法。在调用shutdown()方法之后,ExecutorService不会立即关闭,但是它不再接收新的任务,直到当前所有线程执行完成才会关闭,所有在shutdown()执行之前提交的任务都会被执行。
如果我们想立即关闭ExecutorService,我们可以调用ExecutorService.shutdownNow()方法。这个动作将跳过所有正在执行的任务和被提交还没有执行的任务。但是它并不对正在执行的任务做任何保证,有可能它们都会停止,也有可能执行完成。
- Future(获取异步计算结果)
方法 | 说明 |
---|---|
boolean cancel(boolean mayInterruptIfRunning) | 取消任务的执行,如果任务已经完成,则会取消失败;如果任务取消成功了,isDone总会返回true |
boolean isCancelled() | 任务是否已取消,任务正常完成前将其取消,返回 true |
boolean isDone() | 任务是否已完成,任务正常终止、异常或取消,返回true |
V get() | 等待任务结束,然后获取V类型的结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;任务已取消,抛出CancellationException; |
V get(long timeout, TimeUnit unit) | 获取结果,设置超时时间 |
也就是说Future提供了三种功能:
- 判断任务是否完成;
- 能够中断任务;
- 能够获取任务执行结果。
因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask。
- FutureTask
可以看出RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。
当FutureTask处于未启动或者是已启动状态时,此时还未得到线程执行结果,调用FutureTask.get方法会导致线程阻塞;
当FutureTask处于已完成状态时,此时已经得到线程执行结果,调用FutureTask.get方法会立即返回线程执行结果;
当FutureTask处于未启动状态时,调用FutureTask.cancel方法将会导致该task永远不会被执行;
当FutureTask处于启动状态时,调用FutureTask.cancel方法将会中断该任务的执行,至于会不会对任务产生影响由cancel方法的入参决定;
当FutureTask处于已完成状态,调用FutureTask.cancel方法返回false
FutureTask提供了2个构造器:
public FutureTask(Callable<V> callable)
public FutureTask(Runnable runnable, V result)
当构造方法传入参数为Runnable,会通过Executors.callable方法将其转换成Callable。
下面的例子觉得比较经典,引用来加深理解。
- FutureTask高并发环境下应用:
FutureTask在高并发环境下确保任务只执行一次。在很多高并发的环境下,往往我们只需要某些任务只执行一次。这种使用情景FutureTask的特性恰能胜任。举一个例子,假设有一个带key的连接池,当key存在时,即直接返回key对应的对象;当key不存在时,则创建连接。对于这样的应用场景,通常采用的方法为使用一个Map对象来存储key和连接池对应的对应关系,典型的代码如下面所示:
private Map<String, Connection> connectionPool = new HashMap<String, Connection>();
private ReentrantLock lock = new ReentrantLock();
public Connection getConnection(String key) {
try {
lock.lock();
if (connectionPool.containsKey(key)) {
return connectionPool.get(key);
} else {
//创建 Connection
Connection conn = createConnection();
connectionPool.put(key, conn);
return conn;
}
} finally {
lock.unlock();
}
}
//创建Connection
private Connection createConnection() {
return null;
}
在上面的例子中,我们通过加锁确保高并发环境下的线程安全,也确保了connection只创建一次,然而确牺牲了性能。改用ConcurrentHash的情况下,几乎可以避免加锁的操作,性能大大提高,但是在高并发的情况下有可能出现Connection被创建多次的现象。这时最需要解决的问题就是当key不存在时,创建Connection的动作能放在connectionPool之后执行,这正是FutureTask发挥作用的时机,基于ConcurrentHashMap和FutureTask的改造代码如下:
private ConcurrentHashMap<String, FutureTask<Connection>> connectionPool = new ConcurrentHashMap<String, FutureTask<Connection>>();
public Connection getConnection(String key) throws Exception {
FutureTask<Connection> connectionTask = connectionPool.get(key);
if (connectionTask != null) {
return connectionTask.get();
} else {
Callable<Connection> callable = new Callable<Connection>() {
@Override
public Connection call() throws Exception {
// TODO Auto-generated method stub
return createConnection();
}
};
FutureTask<Connection> newTask = new FutureTask<Connection>(callable);
connectionTask = connectionPool.putIfAbsent(key, newTask);
if (connectionTask == null) {
connectionTask = newTask;
// FutureTask 保证回调任务只执行一次,在这里的应用还是比较巧妙的
connectionTask.run();
}
return connectionTask.get();
}
}
//创建Connection
private Connection createConnection() {
return null;
}
经过这样的改造,可以避免由于并发带来的多次创建连接及锁的出现。
- 示例代码
// 测试代码 仅供参考
@Test
public void javaFutureTest() {
System.out.println("main start " + Thread.currentThread());
// 创建工作线程池
ExecutorService service = Executors.newCachedThreadPool();
// 子线程任务1
Future<Integer> integerTask = service.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("integerTask start " + Thread.currentThread());
Thread.sleep(100);
System.out.println("integerTask end " + Thread.currentThread());
return new Random().nextInt(100);
}
});
// 子线程任务2
Future<String> stringTask = service.submit(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println("stringTask start " + Thread.currentThread());
Thread.sleep(50);
System.out.println("stringTask end " + Thread.currentThread());
return " ok";
}
});
// 主线程其他任务
for (int i = 0 ;i < 4 ;i++){
System.out.println("main other task " + i + " " + Thread.currentThread());
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (integerTask.isDone() && !integerTask.isCancelled()) {
try {
System.out.println("integerTask done " + "result:" + integerTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
if (stringTask.isDone() && !stringTask.isCancelled()) {
try {
System.out.println("stringTask done " + "result:" + stringTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
// 关闭线程池
service.shutdown();
System.out.println("main end " + Thread.currentThread());
}
输出结果
main start Thread[main,5,main]
main other task 0 Thread[main,5,main]
integerTask start Thread[pool-1-thread-1,5,main]
stringTask start Thread[pool-1-thread-2,5,main]
main other task 1 Thread[main,5,main]
stringTask end Thread[pool-1-thread-2,5,main]
integerTask end Thread[pool-1-thread-1,5,main]
stringTask done result: ok
main other task 2 Thread[main,5,main]
integerTask done result:86
stringTask done result: ok
main other task 3 Thread[main,5,main]
integerTask done result:86
stringTask done result: ok
main end Thread[main,5,main]
可以看到stringTask 和integerTask 两个子任务分别开了两个子线程执行,子线程执行过程中,主线程继续做其他任务,因为stringTask 耗时短先执行完。可以看到主线程中为了得到回调结果需要一直去检查isDone,这种写法是很不推荐的,后续会有其他方法来监听futureTask的执行结果,思想是开启一个新的异步线程去监听这个子线程的执行结果。
Guava future
ListenableFuture是可以监听的Future,它是对java原生Future的扩展增强。Future表示一个异步计算任务,当任务完成时可以得到计算结果。如果希望计算完成时马上就拿到结果展示给用户或者做另外的计算,就必须使用另一个线程不断的查询计算状态。这样做会使得代码复杂,且效率低下。如果使用ListenableFuture,Guava会帮助检测Future是否完成了,如果完成就自动调用回调函数,这样可以减少并发程序的复杂度。
- MoreExecutors
该类是final类型的工具类,提供了很多静态方法。例如listeningDecorator方法初始化ListeningExecutorService方法,使用此实例submit方法即可初始化ListenableFuture对象。
//装饰一个自己的线程池返回
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
- ListeningExecutorService
该类是对ExecutorService的扩展,重写ExecutorService类中的submit方法,返回ListenableFuture对象。
ListenableFuture<String> task =executorService .submit(() -> "hello");
- ListenableFuture
该接口扩展了Future接口,增加了addListener方法,该方法在给定的excutor上注册一个监听器,当计算完成时会马上调用该监听器。不能够确保监听器执行的顺序,但可以在计算完成时确保马上被调用。
task.addListener(() -> System.out.println("world"),executorService );
- Futures
该类提供和很多实用的静态方法以供使用。
Futures.allAsList这个方法用来把多个ListenableFuture组合成一个。
ListenableFuture<String> future1 = executorService.submit(() -> "Hello");
ListenableFuture<Integer> future2 = executorService.submit(() -> 2);
ListenableFuture<List<Object>> future = Futures.allAsList(future1, future2);
Futures.transform[Async]这个方法用于转换返回值。
ListenableFuture<String> future1 = executorService.submit(() -> "Hello");
ListenableFuture<Integer> listenableFuture = Futures.transform(future1, String::length, executorService);
System.out.println(listenableFuture.get());
这个是同步的方法,如果需要异步的执行
ListenableFuture<String> future1 = executorService.submit(() -> "Hello");
ListenableFuture<Integer> listenableFuture = Futures.transformAsync(future1, input -> Futures.immediateFuture(input.length()), executorService);
System.out.println(listenableFuture.get());
- 示例代码
// 测试代码 仅供参考
@Test
public void guavaFutureTest() {
System.out.println("main start " + Thread.currentThread());
// 创建工作线程池
ExecutorService service = Executors.newCachedThreadPool();
ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator(service);
// 子线程任务1
ListenableFuture<Integer> integerTask = listeningExecutorService.submit(() -> {
System.out.println("integerTask start " + Thread.currentThread());
Thread.sleep(100);
System.out.println("integerTask end " + Thread.currentThread());
return new Random().nextInt(100);
});
// 添加监听
Futures.addCallback(integerTask, new FutureCallback<Integer>() {
@Override
public void onSuccess(@Nullable Integer result) {
System.out.println("integerTask done " + "result:" + result + " " +Thread.currentThread());
}
@Override
public void onFailure(Throwable t) {
t.printStackTrace();
}
}, listeningExecutorService);
// 子线程任务2
ListenableFuture<String> stringTask = listeningExecutorService.submit(() -> {
System.out.println("stringTask start " + Thread.currentThread());
Thread.sleep(50);
System.out.println("stringTask end " + Thread.currentThread());
return " ok";
});
// 添加监听
Futures.addCallback(stringTask, new FutureCallback<String>() {
@Override
public void onSuccess(@Nullable String result) {
System.out.println("stringTask done " + "result:" + result + " " +Thread.currentThread());
}
@Override
public void onFailure(Throwable t) {
t.printStackTrace();
}
}, listeningExecutorService);
// 主线程其他任务
for (int i = 0 ;i < 4 ;i++){
System.out.println("main other task " + i + " " + Thread.currentThread());
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 关闭线程池
listeningExecutorService.shutdown();
System.out.println("main end " + Thread.currentThread());
}
输出结果:
main start Thread[main,5,main]
integerTask start Thread[pool-1-thread-1,5,main]
stringTask start Thread[pool-1-thread-2,5,main]
main other task 0 Thread[main,5,main]
main other task 1 Thread[main,5,main]
stringTask end Thread[pool-1-thread-2,5,main]
stringTask done result: ok Thread[pool-1-thread-3,5,main]
integerTask end Thread[pool-1-thread-1,5,main]
integerTask done result:63 Thread[pool-1-thread-3,5,main]
main other task 2 Thread[main,5,main]
main other task 3 Thread[main,5,main]
main end Thread[main,5,main]
可以看到结果和前面java future输出类似,stringTask 和integerTask 两个子任务分别开了两个子线程执行,子线程执行过程中,主线程继续做其他任务,因为stringTask 耗时短先执行完。可以看到主线程中为了得到回调结果不再需要一直去检查isDone,有监听函数获取子线程的执行结果Futures.addCallback,开启一个新的异步线程Thread[pool-1-thread-3,5,main]去监听这个子线程的执行结果。
Java future 和 Guava future的对比
Future 具有局限性。在实际应用中,当需要下载大量图片或视频时,可以使用多线程去下载,提交任务下载后,可以从多个Future中获取下载结果,由于Future获取任务结果是阻塞的,所以将会依次调用Future.get()方法,这样的效率会很低。很可能第一个下载速度很慢,则会拖累整个下载速度。就像前面说的,使用java future需要另起一个线程去check任务是否完成,guava相当于把这一步帮我们做掉了。
Future主要功能在于获取任务执行结果和对异步任务的控制。但如果要获取批量任务的执行结果,从上面的例子我们已经可以看到,单使用 Future 是很不方便的。其主要原因在于:一方面是没有好的方法去判断第一个完成的任务;另一方面是 Future的get方法 是阻塞的,使用不当会造成线程的浪费。第一个问题可以用 CompletionService 解决,CompletionService 提供了一个 take() 阻塞方法,用以依次获取所有已完成的任务。第二个问题可以用 Google Guava 库所提供的 ListeningExecutorService 和 ListenableFuture 来解决。除了获取批量任务执行结果时不便,Future另外一个不能做的事便是防止任务的重复提交。要做到这件事就需要 Future 最常见的一个实现类 FutureTask 了。Future只实现了异步,而没有实现回调,主线程get时会阻塞,可以轮询以便获取异步调用是否完成。
在实际的使用中建议使用Guava ListenableFuture代替JDK的 Future来实现异步非阻塞,目的就是多任务异步执行,通过回调的方方式来获取执行结果而不需轮询任务状态。
关于Guava的异步还有很多东西需要学习,比如需要异步任务的计算结果继续做计算的,异步计算同步获取结果(异步转同步)等,后续的笔记会继续展开。
参考文章:
https://blog.csdn.net/he90227/article/details/52210490
https://www.zhihu.com/question/19732473/answer/20851256
https://cloud.tencent.com/developer/article/1041329