前言
Callable,Future,Executor都是java.util.concurrent包下的工具类,作者李二狗,为了彻底吃透它们的概念,今天就假设这些类都不存在,自己通过实际场景封装出这些工具的山寨版
需求
假设你需要写一个简单的方法,两个值求和,非常简单
public int sum(int x, int y) {
return x + y;
}
但需求增加了,需要计算的过程在一个新线程中执行,这代码该怎么写?就会出现以下两个问题:
- 怎么获取到线程执行的结果?
- 怎么知道新线程什么时候执行完?
实现
首先第一个问题,如何获取新线程结果,这个也好解决,虽然新线程里的变量我取不到,但内存是线程共享的啊,只要提前定义一个结果变量即可
private volatile int outcome;
public int sum(int x, int y) {
new Thread(()->{
outcome = x + y;
}).start();
return outcome;
}
很明显,最终结果肯定不对,因为返回的时候新线程可能都没开始运行,这就是第二个问题:怎么知道新线程什么时候执行完?
解决的方案当然也很多,只要线程之间进行通讯一下即可,我们用LockSupport方法来实现通讯
public class OperationTest {
private volatile int outcome;
private Thread waitThread;
public int sum(int x, int y) throws InterruptedException {
waitThread = Thread.currentThread();
new Thread(() -> {
outcome = x + y;
// 计算完成通知等待线程
LockSupport.unpark(waitThread);
}).start();
// 等待计算完成
LockSupport.park(this);
return outcome;
}
public static void main(String[] args) throws InterruptedException {
System.out.println(new OperationTest().sum(2,3));
}
}
此时我们就完成了这个需求,但看一下代码真的好麻烦啊,明明就是一个1+1等于几的事,写了这么多代码,如果明天再来个写减法的需求,我还要重写这么一堆
封装
其实上面解决的两个问题,完全就是通用性的问题,加法这样处理,减法一样也是这个解决思路,那么我们就可以把加法、减法等有返回值的方法用函数式接口给抽象化
// 使用泛型兼容各种类型返回
@FunctionalInterface
public interface Callable<V> {
V call();
}
接下来我们就要封装一个开启新线程执行它的工具,提供如下功能:
- 只要传入一个方法作为参数,就可以开启一个新线程去执行这个方法,并返回执行结果
- 也可以开启新线程执行普通的Runable方法
这个工具命名为ExecutorService
public interface ExecutorService {
/**
* 执行callable并返回执行结果
* @param task
* @param <T>
*/
<T> T submit(Callable<T> task);
/**
* 也可以执行Runnable
* @param runnable
*/
void execute(Runnable runnable);
}
然后开始实现这个工具,暂时叫做NewThreadExecutor
(新线程执行器)
public class NewThreadExecutor implements ExecutorService {
private volatile Object outcome;
private Thread waitThread;
@Override
public <T> T submit(Callable<T> task) {
waitThread = Thread.currentThread();
execute(()->{
outcome = task.call();
// 计算完成通知等待线程
LockSupport.unpark(waitThread);
});
// 等待计算完成
LockSupport.park(this);
return (T) outcome;
}
@Override
public void execute(Runnable runnable) {
// 执行方式就是开启一个线程去执行
new Thread(runnable).start();
}
}
这时我们再实现上面的需求就轻而易举了
int x = 2;
int y = 3;
Integer sub = new NewThreadExecutor().submit(() -> {
return x + y;
});
System.out.println(sub);
这样通过我们的逻辑和执行解耦,可以方便使用工具执行减法、乘法或其它复杂运算逻辑
多线程
再回头看一下我们这个工具,实际上非常不合理,开了一个新线程去执行函数,整个过程主线程却全程傻等
相当于一个主管带一个员工干活,而员工干活时,主管干不了别的事只能等着,那干脆主管自己干得了呗,何必聘请这么一个员工
而我们希望开启新线程后主线程可以去干别的(比如分配新任务给其它线程执行),等全分配完任务再统一获取结果,这样才算是多线程并行作业
那么如何改造代码呐?
首先,调用submit
方法不能阻塞,应该直接返回一个对象,主线程再想要获取的时候,才通过这个对象阻塞获取结果
这个对象不是运行结果,但通过它可以获得结果,他就像一个未来的约定,我们先使用代码给它抽象出来,命名为Future
public interface Future<V> {
// 是否运行完成
boolean isDone();
// 获取运行结果
V get();
}
此时我们的ExecutorService返回结果变为Future对象
public interface ExecutorService {
/**
* 执行callable并返回future
* @param task
* @param <T>
*/
<T> Future<T> submit(Callable<T> task);
/**
* 也可以执行Runnable
* @param runnable
*/
void execute(Runnable runnable);
}
那么此时如何改造NewThreadExecutor这个实现呐?
首先要实现Future抽象,这个对象可以获取到执行结果,那么它肯定可以访问到存储执行结果的对象(outcome)和等待线程对象(waitThread),那不妨就把这两个对象放入Future实现中,同时最终执行的Runable方法也要可以访问到这两个对象,那不妨就让Future的实现同时就是最终执行的Runable,即可执行的Future,取名为FutureTask
public class FutureTask<V> implements Future<V>, Runnable {
private Callable<V> callable; // 要执行的方法
private volatile Object outcome; // 执行结果
private Thread waitThread; // 等待的线程
public FutureTask(Callable<V> callable) {
this.callable = callable;
}
@Override
public boolean isDone() {
return outcome!=null;
}
@Override
public V get() {
waitThread = Thread.currentThread();
if (isDone()) { // 如果已经执行完直接返回
return (V) outcome;
}
// 否则等待
LockSupport.park(this);
return (V) outcome;
}
@Override
public void run() {
// 开始执行
outcome = callable.call();
// 计算完成通知等待线程
LockSupport.unpark(waitThread);
}
}
此时NewThreadExecutor改造如下
public class NewThreadExecutor implements ExecutorService {
@Override
public <T> Future<T> submit(Callable<T> task) {
FutureTask<T> futureTask = new FutureTask<>(task);
execute(futureTask);
return futureTask; // 直接返回
}
@Override
public void execute(Runnable runnable) {
// 执行方式就是开启一个线程去执行
new Thread(runnable).start();
}
}
这时我们就可以让两个子线程分别同时计算两个结果,最终主线程求和(真正的做到多线程计算)
Future<Integer> future1 = new NewThreadExecutor().submit(() -> {
return 3 + 4;
});
Future<Integer> future2 = new NewThreadExecutor().submit(() -> {
return 1 + 2;
});
System.out.println(future1.get()+future2.get());
扩展
以上封装的工具,达到了传入一个方法开启一个新线程计算的功能,并且使用future概念避免了阻塞
但工具还能再扩展一下,比如有一天领导让实现传入一个方法指定某一线程执行,或传入方法从几个固定线程中选一个空闲的去执行(线程池)
由于我们做到了逻辑和执行的分离解耦,所以只要重写一下execute
就可以了,而无论如何执行submit
的逻辑是不变的,我们可以继续给它抽象出来,命名为AbstractExecutorService
public abstract class AbstractExecutorService implements ExecutorService {
@Override
public <T> Future<T> submit(Callable<T> task) {
FutureTask<T> futureTask = new FutureTask<>(task);
execute(futureTask);
return futureTask; // 直接返回
}
}
此时它的继承者就可以传入方法并返回future,只需关注如何执行即可,比如我们的开启新线程执行工具
public class NewThreadExecutor extends AbstractExecutorService {
@Override
public void execute(Runnable runnable) {
// 执行方式就是开启一个线程去执行
new Thread(runnable).start();
}
}
再比如使用线程池去执行
public class ThreadPoolExecutor extends AbstractExecutorService {
@Override
public void execute(Runnable runnable) {
// 从线程池中选一个线程去执行
}
}
最后
以上代码的命名基本参照jdk的源码,可以自行对照,相信再看源码就会非常清晰,也可以结合Executor源码详解解读源码