Java 中Future

Future> futureRed = (Future>)threadBaseService.getExecutor().submit(

() -> {

DataSourceThreadHolder.set(DataSourceType.ElasticSearch);

List redAllUserConsumer =consumerMetricsDao.getRedAllUserConsumer(startTime,endTime,1);

DataSourceThreadHolder.reset();

return redAllUserConsumer;

}

);

List consumerInfoVoList = futureRed.get();

以下内容为转载:

1. 什么是Future

Future是多线程开发中常见的一种设计模式。Future模式可以返回线程执行结果的契约,通过此契约程序可以选择在合适的时机取回执行的结果,如果取回结果时线程还没有执行完成,将会阻塞调用线程等待执行结果返回。

2. 为什么需要Future

在有些场景下,我们想使用另一个线程去执行复杂耗时的操作,此时又不想让主线程等待白白浪费CPU,此时可以让主线程先去做别的事,然后在合适的时机去通过Future契约取回线程执行的结果。

3. Java中的Future模式

Java中的Future模式主要由以上接口和类组成。

3.1 Callable & Runnable

这是我们普通的线程任务,其中Callable是带返回值(真实数据),Runnable是不带返回值的,因此在我们使用Runnable和Future时,必须传入一个Result对象,通过Future在获取结果时就是获取的该Result,核心代码如下:

publicFutureTask(Runnable runnable,Vresult){this.callable=Executors.callable(runnable,result);this.state=NEW;// ensure visibility of callable}publicstatic<T>Callable<T>callable(Runnable task,Tresult){if(task==null)thrownewNullPointerException();returnnewRunnableAdapter<T>(task,result);}staticfinalclassRunnableAdapter<T>implementsCallable<T>{final Runnable task;finalTresult;RunnableAdapter(Runnable task,Tresult){this.task=task;this.result=result;}publicTcall(){task.run();returnresult;}}

复制

Callable目前只能搭配线程池或者Future来使用,不能直接和new Thread()搭配使用,Runnable可以搭配线程池和new Thread()使用,在配合Future使用时本质上是对其进行了适配,也就是上述代码中的RunnableAdapter。

3.2 Future

publicinterfaceFuture<V>{booleancancel(boolean mayInterruptIfRunning);booleanisCancelled();booleanisDone();Vget()throws InterruptedException,ExecutionException;Vget(long timeout,TimeUnit unit)throws InterruptedException,ExecutionException,TimeoutException;}

复制

Future是线程的契约,通过其get()方法我们可以获取线程执行的结果,当然Future也提供了其他三个方法,分别是:

cancel:取消任务

isCancelled:任务是否已经取消

isDone:任务是否完成

3.3 RunnableFuture

publicinterfaceRunnableFuture<V>extendsRunnable,Future<V>{voidrun();}

复制

RunnableFuture接口继承自Runnable和Future,表明RunnableFuture可以被线程执行并且可以通过契约获取到线程的执行结果。

4. FutureTask

4.1 属性

// 执行任务privateCallable<V>callable;// 任务的实际执行结果privateObject outcome;// 执行任务的线程privatevolatile Thread runner;// 等待结果的线程栈privatevolatile WaitNode waiters;

复制

4.2 状态

privatevolatile int state;privatestaticfinal intNEW=0;privatestaticfinal intCOMPLETING=1;privatestaticfinal intNORMAL=2;privatestaticfinal intEXCEPTIONAL=3;privatestaticfinal intCANCELLED=4;privatestaticfinal intINTERRUPTING=5;privatestaticfinal intINTERRUPTED=6;

复制

FutureTask除了4.1中的属性外,还有一个重要的属性就是state,FutureTask中的状态大约有7种:

NEW:任务的初始状态

COMPLETING:正在设置任务结果

NORMAL:任务执行完毕

EXCEPTIONAL:任务发行异常

CANCELLED:任务被取消

INTERRUPTING:正在中断任务

INTERRUPTED:任务被中断

4.3 run()方法

任务执行的时候实际就是执行run方法,源码如下:

publicvoidrun(){if(state!=NEW||!UNSAFE.compareAndSwapObject(this,runnerOffset,null,Thread.currentThread()))return;try{Callable<V>c=callable;if(c!=null&&state==NEW){Vresult;boolean ran;try{result=c.call();ran=true;}catch(Throwable ex){result=null;ran=false;setException(ex);}if(ran)set(result);}}finally{// runner must be non-null until state is settled to// prevent concurrent calls to run()runner=null;// state must be re-read after nulling runner to prevent// leaked interruptsint s=state;if(s>=INTERRUPTING)handlePossibleCancellationInterrupt(s);}}protectedvoidset(Vv){if(UNSAFE.compareAndSwapInt(this,stateOffset,NEW,COMPLETING)){outcome=v;UNSAFE.putOrderedInt(this,stateOffset,NORMAL);// final statefinishCompletion();}}privatevoidhandlePossibleCancellationInterrupt(int s){// It is possible for our interrupter to stall before getting a// chance to interrupt us.  Let's spin-wait patiently.if(s==INTERRUPTING)while(state==INTERRUPTING)Thread.yield();// wait out pending interrupt}

复制

run方法的大致流程如下:

校验任务的状态是否是NEW和当前是否无执行线程,如果校验通过,则获取任务执行

调用任务的call方法

如果执行异常,设置结果,状态修改为EXCEPTIONAL,并将任务结果设置为异常

如果正常执行,调用set(V v)设置结果,状态修改为NORMAL,结果设置为执行结果,并且唤醒等待结果的线程

最后在finally块中,我们将runner属性置为null,并且检查有没有遗漏的中断,如果发现s >= INTERRUPTING, 说明执行任务的线程有可能被中断了,因为s >= INTERRUPTING 只有两种可能,state状态为INTERRUPTING和INTERRUPTED。

4.3 get()方法

当我们需要去获取FutureTask的结果时,我们需要调用get方法获取结果。

publicVget()throws InterruptedException,ExecutionException{int s=state;if(s<=COMPLETING)s=awaitDone(false,0L);returnreport(s);}@SuppressWarnings("unchecked")privateVreport(int s)throws ExecutionException{Object x=outcome;if(s==NORMAL)return(V)x;if(s>=CANCELLED)thrownewCancellationException();thrownewExecutionException((Throwable)x);}privateintawaitDone(boolean timed,long nanos)throws InterruptedException{final long deadline=timed?System.nanoTime()+nanos:0L;WaitNode q=null;boolean queued=false;for(;;){if(Thread.interrupted()){removeWaiter(q);thrownewInterruptedException();}int s=state;if(s>COMPLETING){if(q!=null)q.thread=null;returns;}elseif(s==COMPLETING)// cannot time out yetThread.yield();elseif(q==null)q=newWaitNode();elseif(!queued)queued=UNSAFE.compareAndSwapObject(this,waitersOffset,q.next=waiters,q);elseif(timed){nanos=deadline-System.nanoTime();if(nanos<=0L){removeWaiter(q);returnstate;}LockSupport.parkNanos(this,nanos);}elseLockSupport.park(this);}}

复制

获取结果的大致步骤如下:

检测任务状态是否是NEW或者COMPLETING,如果不是,说明已经执行成功或失败,返回结果

否则就阻塞等待,阻塞等待的步骤如下

检测当前线程是否被中断,如果是就将其从等待线程中移除

再次检测任务状态,如果是异常、中断或者执行完成状态,则直接返回结果。

如果任务是COMPLETING状态,说明任务已经执行完成正在设置结果,此时让获取结果的线程短暂让出CPU继续等待

如果等待结果的线程栈为null,说明还没有生成,则生成等待结果的线程栈

如果queued为false,说明等待结果的线程还没入栈,所以将其入栈

最后看是否是是超时等待,根据是否超时,选择将等待结果的线程永久挂起(等待唤醒)还是具有超时时间的挂起

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容