一. 关于Future/FutureTask
Future/FutureTask/ExecutorService相关类图:
0x01: Future
下面是Future的Doc文档
A {@code Future} represents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation. The result can only be retrieved using method {@code get} when the computation has completed, blocking if necessary until it is ready. Cancellation is performed by the {@code cancel} method. Additional methods are provided to determine if the task completed normally or was cancelled. Once a computation has completed, the computation cannot be cancelled. If you would like to use a {@code Future} for the sake of cancellability but not provide a usable result, you can declare types of the form {@code Future<?>} and return {@code null} as a result of the underlying task.
简单的说, Future表示一个异步计算的结果. 异步计算是在其他线程进行的, 因此异步计算的结果, 有可能有值, 也有可能没有值. 于是, Future就提供了一些方法来处理这种未知状态:
a. isDone()
异步任务是否完成, 即否有结果
b. get()
获取异步任务结果, 如果异步任务未完成, 此方法会一直阻塞, 直到异步方法完成 或 任务被取消 (调用了cancel()
方法)
c. cancel()
取消异步任务, 如果异步任务已经完成, 那么取消失败(即cancel()
方法返回false)
d. isCancelled()
查询异步任务是否已被取消
简单用法:
interface ArchiveSearcher {
String search(String target);
}
class App {
ExecutorService executor = ...
ArchiveSearcher searcher = ...
void showSearch(final String target) throws InterruptedException {
Future<String> future = executor.submit(new Callable<String>() {
public String call() {
return searcher.search(target);
}});
displayOtherThings(); // do other things while searching
try {
displayText(future.get()); // use future
} catch (ExecutionException ex) {
cleanup();
return;
}
}
}
用Future主要是为了获取异步计算的结果. 例如在Android中, 你会把网络请求放在子线程中去执行, 而请求的结果会拿到UI线程中来使用. 这时候就可以使用Future. 关键代码如下:
interface NetworkService {
ModelXxx requestXxx();
}
// 注意: 这是一个阻塞的方法, 不能在UI线程中直接调用
public ModelXxx xx() {
final NetworkService networkService = ...
ExecutorService executorService = ...
Future<ModelXxx> future = executorService.submit(new Callable<ModelXxx>() {
@Override
public ModelXxx call() throws Exception {
return networkService.requestXxx();
}
});
ModelXxx modelXxx = future.get(); //此处可能阻塞
}
void bindDataToUi(ModelXxx model) {
// do something with model ...
}
Future使用起来并不方便, 因为异步任务什么时完成我们并不知晓, 除非你用isDone()
方法去查询是否完成. 如果要异步任务完成后主动通知我们, 那么该如何做呢? 这个问题留到后面说.
0x02: FutureTask
下面是FutureTask的Doc文档:
A cancellable asynchronous computation. This class provides a base implementation of {@link Future}, with methods to start and cancel a computation, query to see if the computation is complete, and retrieve the result of the computation. The result can only be retrieved when the computation has completed; the {@code get} methods will block if the computation has not yet completed. Once the computation has completed, the computation cannot be restarted or cancelled (unless the computation is invoked using {@link #runAndReset}).
a. 简单来说: FutureTask就是一个可取消的异步任务.
b. 把FutureTask的名字拆开来看, FutureTask是Future(异步任务结果)和Task(异步任务)的集合. 因此, 我们可以直接把FutureTask扔给ExecutorService去执行, 然后又可以获取计算结果 (Future的特性FutureTask都有).
c. FutureTask的基本用法和步骤如下:
public void someBiz() {
// 1. init services
final String serviceUrl = ...
final NetworkService networkService = ...
final ExecutorService executorService = ...
// 2. make task
FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {
return networkService.request(serviceUrl);
}
});
// 3. submit task
executorService.submit(futureTask);
// 4. get result
String result = futureTask.get();
// 5. do something with result
// ...
}
d. 使用FutureTask
执行异步任务有个问题: 就是异步任务执行完了并不会通知调用方. 但是, FutureTask
已经有支持异步任务执行完毕就立刻通知调用方的基础. FutureTask
有一个done()
方法, 此方法即是异步任务调用完毕的回调方法, 这个方法执行时已经可以获取到结果数据了. 我们可以扩展FutureTask
并重写done()
来支持异步任务执行完成后的回调通知.
二. Future/FutureTask实现解析
a. Future的创建
先看看Future是如何产生的(java.util.concurrent.AbstractExecutorService
类中):
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
当我们提交一个异步任务 (Callable<T>
) 时, 会调用submit()
方法, submit()方法内部会调用newTask()
方法创建一个FutureTask
. 我们已经知道, FutureTask是Future的实现, 又是Runnable的实现. 因此, 它既可以执行又可以获取结果.
b. FutureTask的执行逻辑
创建FutureTask时, 会把我们提交的任务 (Callbable
)传递给FutureTask. 其实FutureTask执行时, 会委托给传进来的Callable
. 基本逻辑如下:
public class FutureTask<V> implements RunnableFuture<V> {
// 用来存储 "用户提供的有实在业务逻辑的" 任务
private Callable<V> callable;
// 用来保存异步计算的结果
private Object outcome;
public FutureTask(Callable<V> callable) {
if (callable == null) throw new NullPointerException();
// 保存外部传来的任务, 待会在run()方法中调用
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public void run() {
// 省略其他代码 ...
Callable<V> c = callable;
//FutureTak的执行逻辑委托给用户提供的真正任务
V result = c.call();
// 设置异步任务结果
set(result);
}
// 其他代码省略 ...
}
c. FutureTask是如何保存计算结果的
可以看到, FutureTask实现了RunnableFuture接口, 此接口是Runnable接口和Future接口的结合. 自然FutureTask是要被当成任务来在线程中执行的( 线程内部执行的一般都是Runnable ). FutureTask内部用了一个Object成员outcome
来存储异步任务的结果. run()
方法调用用户传过来的Callbable的call()
方法并产生一个运算结果, 此时会调用set()
方法来将计算结果存储到成员outcome
中. 下面就来看看FutureTask
是如何将计算结果设置到outcome
成员中的:
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
非常简单, 直接赋值就好了. 需要注意的是, 赋值之前有一个任务状态的切换, 这个切换会同步. 因此设置完后, 其他线程就可以获取到结果了.
d. 任务执行完的收尾工作 (任务完成的回调, 资源回收等)
我们注意到, 赋值后会调用一个finishCompletion()
方法, , 那么就来看看此方法:
private void finishCompletion() {
// 省略无关代码 ...
done();
callable = null; // to reduce footprint
}
其实就是做一些收尾工作, 将callbable置null等 (callable中可能会持有一些资源).
此方法里面还调用了done()
方法, 此方法其实就是异步任务执行完的回调, 下面来看看此方法:
protected void done() { }
空的实现!! 且方法为protected
, 这就是为扩展而存在的方法.
回到前面提的问题: **异步任务执行完毕后如何主动通知调用者? **
其实我们只要扩展FutureTask
, 再给扩展对象设置一个回调对象, 然后重写done()
方法, 在done()
方法内调用回调对象就可以了.
具体我们可以来看看Guava的com.google.common.util.concurrent.ListenableFutureTask
:
public class ListenableFutureTask<V> extends FutureTask<V> implements ListenableFuture<V> {
//存储异步任务完成后的回调以及回调所在的Executor
private final ExecutionList executionList = new ExecutionList();
// 其他代码省略 ...
//添加异步任务完成后的回调和回调所在的Executor
@Override
public void addListener(Runnable listener, Executor exec) {
executionList.add(listener, exec);
}
@Override
protected void done() {
// 异步任务完成后回调
executionList.execute();
}
}
需要注意的是FutureTask
的done()
方法是在Worker线程中执行的, 一般我们获取结果是在其他线程, 因此需要把计算结果挪到指定的线程中去. 因此不仅需要指定任务完成的回调, 还需要指定任务完成的回调所在的线程.
e. FutureTask计算结果同步问题
既然FutureTask是在Worker线程中执行的, 那么其他线程获取计算结果, 就会存在同步问题. 那么FutureTask是如何来同步计算结果的呢?
jdk1.8 FutureTask
源码中, 保存计算结果的成员变量声明是这样的:
private Object outcome; // non-volatile, protected by state reads/writes
可以看到后面的说明, 此变量是non-volatile
的, 同步是用state
字段的读写来保证的. 由于用了不公开的API, 逻辑也比较复杂, 具体就不多说了, 自己看源码吧 --
三. 自定义FutureTask
a. 下面是对FutureTask
的简单扩展 (获取网络图片)
import android.support.annotation.NonNull;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicReference;
public class ListenableFuture<V> extends FutureTask<V> {
AtomicReference<Listener> mListenerRef;
public ListenableFuture(@NonNull Callable<V> callable) {
super(callable);
mListenerRef = new AtomicReference<>();
}
public ListenableFuture(@NonNull Runnable runnable, V result) {
super(runnable, result);
}
@Override
protected void done() {
final Listener<V> listener = mListenerRef.get();
if(listener != null) {
try {
onSuccess(listener, get());
} catch (InterruptedException e) {
onFailed(listener, e);
} catch (ExecutionException e) {
onFailed(listener, e);
}
mListenerRef.set(null);
}
}
private void onSuccess(final Listener<V> listener, final V result) {
MainHandler.post(new Runnable() {
@Override
public void run() {
listener.onSuccess(result);
}
});
}
private void onFailed(final Listener<V> listener, final Throwable e) {
MainHandler.post(new Runnable() {
@Override
public void run() {
listener.onFailed(e);
}
});
}
public void setListener(Listener<V> listener) {
mListenerRef.compareAndSet(mListenerRef.get(), listener);
}
public interface Listener<V> {
void onSuccess(V v);
void onFailed(Throwable e);
}
}
工具类MainHandler.java
import android.os.Handler;
import android.os.Looper;
public final class MainHandler {
private static Handler mainHandler;
private MainHandler() {
//no instance
}
public static void post(Runnable task) {
getMainHandler().post(task);
}
public static Handler getMainHandler() {
if(mainHandler == null) {
mainHandler = new Handler(Looper.getMainLooper());
}
return mainHandler;
}
}
ListenableFuture
的使用方法如下:
import com.stone.demo.R;
import android.Manifest;
import android.content.pm.PackageManager;
import android.graphics.Bitmap;
import android.graphics.BitmapFactory;
import android.os.Bundle;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
import android.support.v4.app.ActivityCompat;
import android.support.v7.app.AppCompatActivity;
import android.view.View;
import android.widget.ImageView;
import android.widget.Toast;
import java.io.InputStream;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import okhttp3.Call;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
public class MainActivity extends AppCompatActivity {
ExecutorService service;
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_future_main);
service = Executors.newFixedThreadPool(4);
findViewById(R.id.btn).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
loadImage();
}
});
checkPermission();
}
private void checkPermission() {
int result = ActivityCompat.checkSelfPermission(this, Manifest.permission.WRITE_EXTERNAL_STORAGE);
if(!(result == PackageManager.PERMISSION_GRANTED)) {
ActivityCompat.requestPermissions(this, new String[] {Manifest.permission.WRITE_EXTERNAL_STORAGE}, 0x10);
}
}
@Override
public void onRequestPermissionsResult(int requestCode, @NonNull String[] permissions, @NonNull int[] grantResults) {
if(requestCode == 0x10) {
if(!(grantResults[0] == PackageManager.PERMISSION_GRANTED)) {
Toast.makeText(this, "您拒绝了访问磁盘", Toast.LENGTH_SHORT).show();
}
}
}
private void loadImage() {
ListenableFuture<Bitmap> future = new ListenableFuture<Bitmap>(new Callable<Bitmap>() {
@Override
public Bitmap call() throws Exception {
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder().url("http://img06.tooopen.com/images/20161214/tooopen_sy_190570171299.jpg").build();
Call call = client.newCall(request);
Response response = call.execute();
InputStream inputStream = null;
if(response != null && response.body() != null && (inputStream = response.body().byteStream()) != null) {
Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
inputStream.close();
return bitmap;
}
return null;
}
});
future.setListener(new ListenableFuture.Listener<Bitmap>() {
@Override
public void onSuccess(Bitmap bitmap) {
if(bitmap != null) {
ImageView.class.cast(findViewById(R.id.img)).setImageBitmap(bitmap);
} else {
Toast.makeText(MainActivity.this, "图片加载失败", Toast.LENGTH_SHORT).show();
}
}
@Override
public void onFailed(Throwable e) {
Toast.makeText(MainActivity.this, "图片加载失败", Toast.LENGTH_SHORT).show();
}
});
service.submit(future);
}
}
布局文件非常简单, 上面一个Button, 下面一个ImageView, 就不贴出来了, 效果图如下:
如有错误之处, 欢迎指正 ~~