Future/FutureTask源码分析

一. 关于Future/FutureTask

Future/FutureTask/ExecutorService相关类图:


FutureTask.png

0x01: Future

下面是Future的Doc文档

A {@code Future} represents the result of an asynchronous computation. Methods are provided to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation. The result can only be retrieved using method {@code get} when the computation has completed, blocking if necessary until it is ready. Cancellation is performed by the {@code cancel} method. Additional methods are provided to determine if the task completed normally or was cancelled. Once a computation has completed, the computation cannot be cancelled. If you would like to use a {@code Future} for the sake of cancellability but not provide a usable result, you can declare types of the form {@code Future<?>} and return {@code null} as a result of the underlying task.

简单的说, Future表示一个异步计算的结果. 异步计算是在其他线程进行的, 因此异步计算的结果, 有可能有值, 也有可能没有值. 于是, Future就提供了一些方法来处理这种未知状态:

a. isDone() 异步任务是否完成, 即否有结果
b. get() 获取异步任务结果, 如果异步任务未完成, 此方法会一直阻塞, 直到异步方法完成 或 任务被取消 (调用了cancel()方法)
c. cancel() 取消异步任务, 如果异步任务已经完成, 那么取消失败(即cancel()方法返回false)
d. isCancelled() 查询异步任务是否已被取消

简单用法:

interface ArchiveSearcher { 
    String search(String target); 
}

class App {

  ExecutorService executor = ...
  ArchiveSearcher searcher = ...

  void showSearch(final String target) throws InterruptedException {
    Future<String> future = executor.submit(new Callable<String>() {
        public String call() {
            return searcher.search(target);
        }});

    displayOtherThings(); // do other things while searching
    try {
      displayText(future.get()); // use future
    } catch (ExecutionException ex) { 
        cleanup(); 
        return; 
    }
  }

}

用Future主要是为了获取异步计算的结果. 例如在Android中, 你会把网络请求放在子线程中去执行, 而请求的结果会拿到UI线程中来使用. 这时候就可以使用Future. 关键代码如下:

interface NetworkService {
    ModelXxx requestXxx();
}

// 注意: 这是一个阻塞的方法, 不能在UI线程中直接调用
public ModelXxx xx() {
    final NetworkService networkService = ...
    ExecutorService executorService = ...

    Future<ModelXxx> future = executorService.submit(new Callable<ModelXxx>() {
        @Override
        public ModelXxx call() throws Exception {
            return networkService.requestXxx();
        }
    });

    ModelXxx modelXxx = future.get(); //此处可能阻塞
}

void bindDataToUi(ModelXxx model) {
    // do something with model ...
}

Future使用起来并不方便, 因为异步任务什么时完成我们并不知晓, 除非你用isDone()方法去查询是否完成. 如果要异步任务完成后主动通知我们, 那么该如何做呢? 这个问题留到后面说.

0x02: FutureTask

下面是FutureTask的Doc文档:

A cancellable asynchronous computation. This class provides a base implementation of {@link Future}, with methods to start and cancel a computation, query to see if the computation is complete, and retrieve the result of the computation. The result can only be retrieved when the computation has completed; the {@code get} methods will block if the computation has not yet completed. Once the computation has completed, the computation cannot be restarted or cancelled (unless the computation is invoked using {@link #runAndReset}).

a. 简单来说: FutureTask就是一个可取消的异步任务.
b. 把FutureTask的名字拆开来看, FutureTask是Future(异步任务结果)和Task(异步任务)的集合. 因此, 我们可以直接把FutureTask扔给ExecutorService去执行, 然后又可以获取计算结果 (Future的特性FutureTask都有).
c. FutureTask的基本用法和步骤如下:
public void someBiz() {
        // 1. init services
        final String serviceUrl = ...
        final NetworkService networkService = ...
        final ExecutorService executorService = ...

        // 2. make task
        FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return networkService.request(serviceUrl);
            }
        });
        
        // 3. submit task
        executorService.submit(futureTask);
        
        // 4. get result
        String result = futureTask.get();
        
        // 5. do something with result
        // ...
    }
d. 使用FutureTask执行异步任务有个问题: 就是异步任务执行完了并不会通知调用方. 但是, FutureTask已经有支持异步任务执行完毕就立刻通知调用方的基础. FutureTask有一个done() 方法, 此方法即是异步任务调用完毕的回调方法, 这个方法执行时已经可以获取到结果数据了. 我们可以扩展FutureTask并重写done()来支持异步任务执行完成后的回调通知.

二. Future/FutureTask实现解析

a. Future的创建

先看看Future是如何产生的(java.util.concurrent.AbstractExecutorService类中):

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

当我们提交一个异步任务 (Callable<T>) 时, 会调用submit()方法, submit()方法内部会调用newTask()方法创建一个FutureTask. 我们已经知道, FutureTask是Future的实现, 又是Runnable的实现. 因此, 它既可以执行又可以获取结果.

b. FutureTask的执行逻辑

创建FutureTask时, 会把我们提交的任务 (Callbable)传递给FutureTask. 其实FutureTask执行时, 会委托给传进来的Callable. 基本逻辑如下:

public class FutureTask<V> implements RunnableFuture<V> {
    // 用来存储 "用户提供的有实在业务逻辑的" 任务
    private Callable<V> callable;
    
    // 用来保存异步计算的结果
    private Object outcome;

    public FutureTask(Callable<V> callable) {
        if (callable == null) throw new NullPointerException();
        // 保存外部传来的任务, 待会在run()方法中调用
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    public void run() {
        // 省略其他代码 ...

        Callable<V> c = callable;

        //FutureTak的执行逻辑委托给用户提供的真正任务
        V result = c.call(); 

        // 设置异步任务结果
        set(result);
    }

    // 其他代码省略 ...
}
c. FutureTask是如何保存计算结果的

可以看到, FutureTask实现了RunnableFuture接口, 此接口是Runnable接口和Future接口的结合. 自然FutureTask是要被当成任务来在线程中执行的( 线程内部执行的一般都是Runnable ). FutureTask内部用了一个Object成员outcome来存储异步任务的结果. run() 方法调用用户传过来的Callbable的call()方法并产生一个运算结果, 此时会调用set()方法来将计算结果存储到成员outcome中. 下面就来看看FutureTask是如何将计算结果设置到outcome成员中的:

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

非常简单, 直接赋值就好了. 需要注意的是, 赋值之前有一个任务状态的切换, 这个切换会同步. 因此设置完后, 其他线程就可以获取到结果了.

d. 任务执行完的收尾工作 (任务完成的回调, 资源回收等)

我们注意到, 赋值后会调用一个finishCompletion()方法, , 那么就来看看此方法:

private void finishCompletion() {
        // 省略无关代码 ... 
        
        done();
        callable = null;        // to reduce footprint
    }

其实就是做一些收尾工作, 将callbable置null等 (callable中可能会持有一些资源).
此方法里面还调用了done()方法, 此方法其实就是异步任务执行完的回调, 下面来看看此方法:

protected void done() { }

空的实现!! 且方法为protected, 这就是为扩展而存在的方法.
回到前面提的问题: **异步任务执行完毕后如何主动通知调用者? **
其实我们只要扩展FutureTask, 再给扩展对象设置一个回调对象, 然后重写done()方法, 在done()方法内调用回调对象就可以了.
具体我们可以来看看Guava的com.google.common.util.concurrent.ListenableFutureTask:

public class ListenableFutureTask<V> extends FutureTask<V> implements ListenableFuture<V> {

  //存储异步任务完成后的回调以及回调所在的Executor
  private final ExecutionList executionList = new ExecutionList(); 

  // 其他代码省略 ...

  //添加异步任务完成后的回调和回调所在的Executor
  @Override
  public void addListener(Runnable listener, Executor exec) {
    executionList.add(listener, exec);
  }

  @Override
  protected void done() {
    // 异步任务完成后回调
    executionList.execute();
  }
}

需要注意的是FutureTaskdone()方法是在Worker线程中执行的, 一般我们获取结果是在其他线程, 因此需要把计算结果挪到指定的线程中去. 因此不仅需要指定任务完成的回调, 还需要指定任务完成的回调所在的线程.

e. FutureTask计算结果同步问题

既然FutureTask是在Worker线程中执行的, 那么其他线程获取计算结果, 就会存在同步问题. 那么FutureTask是如何来同步计算结果的呢?

jdk1.8 FutureTask源码中, 保存计算结果的成员变量声明是这样的:
private Object outcome; // non-volatile, protected by state reads/writes
可以看到后面的说明, 此变量是non-volatile的, 同步是用state字段的读写来保证的. 由于用了不公开的API, 逻辑也比较复杂, 具体就不多说了, 自己看源码吧 --

三. 自定义FutureTask

a. 下面是对FutureTask的简单扩展 (获取网络图片)
import android.support.annotation.NonNull;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicReference;

public class ListenableFuture<V> extends FutureTask<V> {

    AtomicReference<Listener> mListenerRef;

    public ListenableFuture(@NonNull Callable<V> callable) {
        super(callable);
        mListenerRef = new AtomicReference<>();
    }

    public ListenableFuture(@NonNull Runnable runnable, V result) {
        super(runnable, result);
    }


    @Override
    protected void done() {
        final Listener<V> listener = mListenerRef.get();
        if(listener != null) {
            try {
                onSuccess(listener, get());
            } catch (InterruptedException e) {
                onFailed(listener, e);
            } catch (ExecutionException e) {
                onFailed(listener, e);
            }
            mListenerRef.set(null);
        }
    }

    private void onSuccess(final Listener<V> listener, final V result) {
        MainHandler.post(new Runnable() {
            @Override
            public void run() {
                listener.onSuccess(result);
            }
        });
    }

    private void onFailed(final Listener<V> listener, final Throwable e) {
        MainHandler.post(new Runnable() {
            @Override
            public void run() {
                listener.onFailed(e);
            }
        });
    }

    public void setListener(Listener<V> listener) {
        mListenerRef.compareAndSet(mListenerRef.get(), listener);
    }

    public interface Listener<V> {
        void onSuccess(V v);
        void onFailed(Throwable e);
    }
}

工具类MainHandler.java

import android.os.Handler;
import android.os.Looper;

public final class MainHandler {

    private static Handler mainHandler;

    private MainHandler() {
        //no instance
    }

    public static void post(Runnable task) {
        getMainHandler().post(task);
    }

    public static Handler getMainHandler() {
        if(mainHandler == null) {
            mainHandler = new Handler(Looper.getMainLooper());
        }
        return mainHandler;
    }
}

ListenableFuture的使用方法如下:

import com.stone.demo.R;

import android.Manifest;
import android.content.pm.PackageManager;
import android.graphics.Bitmap;
import android.graphics.BitmapFactory;
import android.os.Bundle;
import android.support.annotation.NonNull;
import android.support.annotation.Nullable;
import android.support.v4.app.ActivityCompat;
import android.support.v7.app.AppCompatActivity;
import android.view.View;
import android.widget.ImageView;
import android.widget.Toast;

import java.io.InputStream;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import okhttp3.Call;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;

public class MainActivity extends AppCompatActivity {
    ExecutorService service;

    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_future_main);

        service = Executors.newFixedThreadPool(4);

        findViewById(R.id.btn).setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                loadImage();
            }
        });
        checkPermission();
    }

    private void checkPermission() {
        int result = ActivityCompat.checkSelfPermission(this, Manifest.permission.WRITE_EXTERNAL_STORAGE);
        if(!(result == PackageManager.PERMISSION_GRANTED)) {
            ActivityCompat.requestPermissions(this, new String[] {Manifest.permission.WRITE_EXTERNAL_STORAGE}, 0x10);
        }
    }


    @Override
    public void onRequestPermissionsResult(int requestCode, @NonNull String[] permissions, @NonNull int[] grantResults) {
        if(requestCode == 0x10) {
            if(!(grantResults[0] == PackageManager.PERMISSION_GRANTED)) {
                Toast.makeText(this, "您拒绝了访问磁盘", Toast.LENGTH_SHORT).show();
            }
        }
    }

    private void loadImage() {
        ListenableFuture<Bitmap> future = new ListenableFuture<Bitmap>(new Callable<Bitmap>() {
            @Override
            public Bitmap call() throws Exception {
                OkHttpClient client = new OkHttpClient();
                Request request = new Request.Builder().url("http://img06.tooopen.com/images/20161214/tooopen_sy_190570171299.jpg").build();
                Call call = client.newCall(request);
                Response response = call.execute();
                InputStream inputStream = null;
                if(response != null && response.body() != null && (inputStream = response.body().byteStream()) != null) {
                    Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
                    inputStream.close();
                    return bitmap;
                }
                return null;
            }
        });

        future.setListener(new ListenableFuture.Listener<Bitmap>() {
            @Override
            public void onSuccess(Bitmap bitmap) {
                if(bitmap != null) {
                    ImageView.class.cast(findViewById(R.id.img)).setImageBitmap(bitmap);
                } else {
                    Toast.makeText(MainActivity.this, "图片加载失败", Toast.LENGTH_SHORT).show();
                }
            }

            @Override
            public void onFailed(Throwable e) {
                Toast.makeText(MainActivity.this, "图片加载失败", Toast.LENGTH_SHORT).show();
            }
        });

        service.submit(future);
    }
}

布局文件非常简单, 上面一个Button, 下面一个ImageView, 就不贴出来了, 效果图如下:

screen.png

如有错误之处, 欢迎指正 ~~

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容