Promise 是异步编程中的一种解决方案,该方案的设计表示在执行某任务的时候会立即返回一个承诺,然后在任务执行完之后返回给你结果。Promise可以将你的异步代码给拉平,给人的感觉就好象在写同步代码一样,顺序由上而下,从左到右。
Promise主要用来解决异步编程中的回调问题——回调地狱
promise有三种状态:pending(等待态),fulfiled(成功态),rejected(失败态);状态一旦改变,就不会再变。
在安卓中,经常会用到这样的场景,在工作线程处理数据然后到UI线程中显示数据,可能之后有需要到工作线程中去处理数据...
在这里我们需要干两件事:
- 设置回调接口,等待工作线程中处理完后返回结果
- 发送Handler消息在UI线程中更新UI。
这里我们需要在工作线程和UI线程中来回切换,还需要发送一些Handler事件去更新UI。当然一个异步任务处理数据然后简单的用Handler包装一下去更新UI,你可能觉得没啥。好像也体现不出多复杂的感觉,但是如果任务一多,那么代码就会变得非常晦涩,有的时候不得不从Handler的处理函数中去寻找一丝蛛丝马迹。如果用上Promise,代码就会变得非常整洁,可读性直线提升。
此处基于Promise设计方案,并结合在安卓中的使用,加入了线程切换的功能,方便任意时刻切换线程指定工作方式,此篇文章中我的Promise名字为HandlerFuture
Promise标准仅描述了then
方法的行为,规范中规定then
方法用来注册promise
对象状态改变时的回调,且返回一个新的promise
对象。这里先演示Function
范畴的then
。
public <V>HandlerFuture<V> applyThen(Function<T,V> fun,Function<Throwable,? extends V> except){
Objects.requireNonNull(fun);
Objects.requireNonNull(except);
//判断IO模式
IO workIs=io==IO.WORK?IO.WORK:IO.UI;
//每次then的时候都需要创建下一个新的HandlerFutrure对象并返回
HandlerFuture<V> future=new HandlerFuture<>(null,workIs,stop);
//构建当前任务
FunctionTask<V> task=new FunctionTask<>(future,val->{
try {
//任务执行期间捕获异常
return fun.apply(val);
}catch (Exception e){
return except.apply(e);
}
});
//每次then都会立即或者延迟执行任务
if(value!=null){
//如果当前有值则直接执行任务
executeTask(task);
}else {
//否则延期直到可以执行任务的时候在执行
pending=task;
}
return future;
}
上面代码中applyThen
函数清楚的描述了Promise的then行为:
-
applyThen
方法返回一个新的HandlerFuture
对象。 -
executeTask
函数执行当前HandlerFuture
中的任务 - 如果当前
HandlerFuture
没有值,则延期直到调用complete
函数时再执行。 - 在函数式编程中异常是副作用,所以不应该出现它,这里捕获了异常并返回异常结果。
public void complete(T value){
if(stop.stop||value==null){
return;
}
//有值之后开始执行任务
this.value=value;
if(pending!=null){
executeTask(pending);
}
}
complete
函数用来标记告诉HandlerFuture
可以开始执行任务了。
在Android中有UI线程和工作线程之分,因此这里HandlerFuture
中的任务其实实在Handler中执行的,这样我们就可以轻松的在线程中来回切换,因为每次在执行任务的时候会判断IO
模式。
private void executeTask(Handler.Callback callback){
if(io==IO.WORK&&workThread==null){
Log.i(TAG,"启动工作线程");
workThread=new HandlerThread("asyncHandler");
workThread.start();
}
//创建Handler并发送消息执行任务
Handler handler=new Handler(io==IO.UI?Looper.getMainLooper():workThread.getLooper(),callback);
Message message=new Message();
message.setTarget(handler);
message.sendToTarget();
}
private class FunctionTask<V> implements Handler.Callback {
private Function<T,V> fun;
private HandlerFuture<V> chain;
FunctionTask(HandlerFuture<V> chain,Function<T, V> fun) {
this.chain=chain;
this.fun = fun;
}
@Override
public boolean handleMessage(Message msg) {
Log.i(TAG,Thread.currentThread().getName());
if(!stop.stop){
V obj=fun.apply(value);
//现在告诉下一个HandlerFuture,执行它的任务
chain.complete(obj);
}
closeThreadIfExist();
return true;
}
}
此外,在安卓中对于Handler
的使用稍不小心就会出现内存泄漏的风险。因此这里采用的StopWrap的方式,包装了每个执行的任务,方便在使用的过程中可以随时掐断任务链。这样就避免了内存泄露或者空指针的问题。
下面是完整的源码
import android.os.Handler;
import android.os.HandlerThread;
import android.os.Looper;
import android.os.Message;
import android.util.Log;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
public class HandlerFuture<T>{
private static final String TAG="Handler";
private HandlerThread workThread;
private T value;
private IO io;
private Handler.Callback pending;
private final Stop stop;
private HandlerFuture(T value,IO io,Stop stop){
this.value=value;
this.io=io;
this.stop=stop;
}
public static <V>HandlerFuture<V> ofUI(V value){
return new HandlerFuture<>(value,IO.UI,new Stop());
}
public static <V>HandlerFuture<V> ofWork(V value){
return new HandlerFuture<>(value,IO.WORK,new Stop());
}
public HandlerFuture<T> runOn(IO io){
this.io=io;
return this;
}
public <V>HandlerFuture<V> applyThen(Function<T,V> fun){
return applyThen(fun,throwable -> {
throwable.printStackTrace();
return null;
});
}
public <V>HandlerFuture<V> applyThen(Function<T,V> fun,Function<Throwable,? extends V> except){
Objects.requireNonNull(fun);
Objects.requireNonNull(except);
IO workIs=io==IO.WORK?IO.WORK:IO.UI;
HandlerFuture<V> future=new HandlerFuture<>(null,workIs,stop);
//构建一个任务
FunctionTask<V> task=new FunctionTask<>(future,val->{
try {
return fun.apply(val);
}catch (Exception e){
return except.apply(e);
}
});
if(value!=null){
//如果当前有值则直接执行任务
executeTask(task);
}else {
//否则延期直到可以执行任务的时候在执行
pending=task;
}
return future;
}
public HandlerFuture<Void> applyThen(Consumer<T> fun){
return applyThen(fun,throwable -> {
throwable.printStackTrace();
return null;
});
}
public HandlerFuture<Void> applyThen(Consumer<T> fun,Function<Throwable,? extends Void> except){
Objects.requireNonNull(fun);
Objects.requireNonNull(except);
IO workIs=io==IO.WORK?IO.WORK:IO.UI;
HandlerFuture<Void> future=new HandlerFuture<>(null,workIs,stop);
//构建一个任务
ConsumerTask task=new ConsumerTask(future,val->{
try {
fun.accept(val);
}catch (Exception e){
except.apply(e);
}
});
if(value!=null){
//如果当前有值则直接执行任务
executeTask(task);
}else {
//否则延期直到可以执行任务的时候在执行
pending=task;
}
return future;
}
public <V>HandlerFuture<V> applyThen(Supplier<V> fun){
return applyThen(fun,throwable -> {
throwable.printStackTrace();
return null;
});
}
public <V>HandlerFuture<V> applyThen(Supplier<V> fun,Function<Throwable,? extends V> except){
Objects.requireNonNull(fun);
Objects.requireNonNull(except);
IO workIs=io==IO.WORK?IO.WORK:IO.UI;
HandlerFuture<V> future=new HandlerFuture<>(null,workIs,stop);
//构建一个任务
SupplierTask<V> task=new SupplierTask<>(future,()->{
try {
return fun.get();
}catch (Exception e){
return except.apply(e);
}
});
if(value!=null){
//如果当前有值则直接执行任务
executeTask(task);
}else {
//否则延期直到可以执行任务的时候在执行
pending=task;
}
return future;
}
public void exits(){
stop.stop=true;
}
public void complete(T value){
if(stop.stop||value==null){
return;
}
this.value=value;
if(pending!=null){
executeTask(pending);
}
}
private void closeThreadIfExist(){
if(workThread!=null&&workThread.isAlive()){
workThread.quit();
workThread=null;
}
}
private void executeTask(Handler.Callback callback){
if(io==IO.WORK&&workThread==null){
Log.i(TAG,"启动工作线程");
workThread=new HandlerThread("asyncHandler");
workThread.start();
}
Handler handler=new Handler(io==IO.UI?Looper.getMainLooper():workThread.getLooper(),callback);
Message message=new Message();
message.setTarget(handler);
message.sendToTarget();
}
private class FunctionTask<V> implements Handler.Callback {
private Function<T,V> fun;
private HandlerFuture<V> chain;
FunctionTask(HandlerFuture<V> chain,Function<T, V> fun) {
this.chain=chain;
this.fun = fun;
}
@Override
public boolean handleMessage(Message msg) {
Log.i(TAG,Thread.currentThread().getName());
if(!stop.stop){
V obj=fun.apply(value);
//现在可以执行下一个任务
chain.complete(obj);
}
closeThreadIfExist();
return true;
}
}
private class ConsumerTask implements Handler.Callback {
private Consumer<T> fun;
private HandlerFuture<Void> chain;
ConsumerTask( HandlerFuture<Void> after,Consumer<T> fun) {
this.chain=after;
this.fun=fun;
}
@Override
public boolean handleMessage(Message msg) {
Log.i(TAG,Thread.currentThread().getName());
if(!stop.stop){
fun.accept(value);
//现在可以执行下一个任务
chain.complete(Void.TYPE.cast(null));
}
closeThreadIfExist();
return true;
}
}
private class SupplierTask<V> implements Handler.Callback {
private Supplier<V> fun;
private HandlerFuture<V> chain;
SupplierTask(HandlerFuture<V> chain,Supplier<V> fun) {
this.chain=chain;
this.fun = fun;
}
@Override
public boolean handleMessage(Message msg) {
Log.i(TAG,Thread.currentThread().getName());
if(!stop.stop){
V obj=fun.get();
//现在可以执行下一个任务
chain.complete(obj);
}
closeThreadIfExist();
return true;
}
}
private static class Stop{
private boolean stop;
}
}
IO源码:
public enum IO {
/**
* 使任务运行在工作线程,不会堵塞UI线程
*/
WORK,
/**
* 使任务运行在UI线程
*/
UI
}
使用方式:
HandlerFuture.ofWork("ggx")
.applyThen(str->{
//todo 处理耗时操作
return str
})
.runOn(IO.UI)
.applyThen(str—>{
//todo 更新UI操作
});