多线程——Future和Callable

在并发编程中,我们经常用到非阻塞的模型,在之前的多线程的三种实现中,不管是继承Thread类还是实现Runnable接口,都无法保证获取到之前的执行结果。但是通过实现Callable接口,并用Future可以来接收多线程执行结果。

英文含义:
callable 可收回的
future 未来
Executor [ɪɡˈzekjətər] (一哥在kei特) 执行器

Future表示一个可能还没有完成的异步任务的结果,针对这个结果可以添加callable以便在任务执行成功或失败后作出相应的操作。

线程池提交实现Callable接口的线程会返回一个Future对象。

Java并发编程:Callable、Future和FutureTask

1. Callable和Runnable

Runnable是一个接口,它里面只声明了一个run方法。

public interface Runnable {
    public abstract void run();
}

但是run()方法返回值是void类型,所以在执行任务之后无法返回任何结果。

而Callable位于java.util.concurrent(并发包)包下,他也是一个接口,它里面也只声明了一个方法。

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

并且,这个接口是一个泛型接口,call方法返回类型就是传递进来的类型。

2. Future

一般情况下,Callable配合ExecutorSevice来使用,获取到的返回值就是Future类型

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,get方法会阻塞直到任务的完成。

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
  • cancel(mayInterruptIfRunning) 取消正在运行时的线程。需要注意的时,本质上是依赖线程进程中的(sleep,wait,join-->本质上均是NEW状态)的interrupt方法抛出异常来进行线程的中断的。详情请参考 FutureTask的cancel方法真的能终止正在运行的线程吗?

  • isCanselled方法表示任务是否被取消成功,如果任务正常完成前被取消成功,则返回true。

  • isDone 方法表示任务是否已经完成,若任务完成,则返回true。

  • get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回。

  • get(long timeout,TimeUnit unit)用来获取执行结果,如果在指定时间内,还未获取到结果,那么会抛出java.util.concurrent.TimeoutException异常。

Future提供了三种功能:

  1. 判断任务是否完成;
  2. 能够中断任务;
  3. 能够获取到任务执行结果;

3. FutureTask

Future继承关系类图.png

Future同时实现了runnable接口和Future接口。故它既可以作为Runnable接口被线程执行,又可以作为Future得到Callable的返回值。

1. FutureTask的参数:Callable<V>

    @Test
    public void testTask() throws InterruptedException, ExecutionException {

        ExecutorService executorService = Executors.newSingleThreadExecutor();
        //线程池提交任务
        FutureTask<Integer> futureTask = new FutureTask<Integer>(() -> {
            System.out.println("子线程在进行计算");
            Thread.sleep(3000);
            int sum = 0;
            for (int i = 0; i < 100; i++)
                sum += i;
            return sum;
        });
        //因为FutureTask实现了Runnable接口,故可以使用Thread启用
        Thread thread=new Thread(futureTask);
        thread.start();
        //也可以使用线程池的方式使用
        executorService.submit(futureTask);
        executorService.shutdown();  //关闭线程池

        Thread.sleep(1000);
        System.out.println("task运行结果"+futureTask.get());
        System.out.println("所有任务执行完毕");
    }
}

如果为了取消性而使用Future但又不提供可用的结果,则可以声明Future<?>形式参数,并返回null作为底层任务结果~

4. 项目实战,策略模式抽取公共逻辑

目的是为了保证输入的原始数据和输出的数据保存强一致。

    /**
     * 任务的批量处理,此方法会对异常进行处理,保证输入和输出的数据是有序的。
     *
     * @param sources  源数据
     * @param function 策略方法
     * @param executor 线程池
     * @param <T>      请求数据的泛型
     * @param <R>      响应数据的泛型
     * @return 并发执行的最终结果
     */
    public static  <T, R> List<BatchFutureResult<T, R>> futureAllFunction(List<T> sources,
                                                                  Function<T, R> function,
                                                                  ExecutorService executor) {
        return futureAllFunction(sources, function, null, executor);
    }

    /**
     *
     * 任务的批量处理,此方法会对异常进行处理,保证输入和输出的数据是有序的。
     *
     * @param sources  源数据
     * @param function 策略方法
     * @param timeout  失效时间,为空即不设置
     * @param executor 线程池
     * @param <T>      请求数据的泛型
     * @param <R>      响应数据的泛型
     * @return 并发执行的最终结果
     */
    public static  <T, R> List<BatchFutureResult<T, R>> futureAllFunction(List<T> sources,
                                                                  Function<T, R> function,
                                                                  Long timeout,
                                                                  ExecutorService executor) {
        /**
         * 获取FutureTask任务,并放入到线程池中。
         */
        List<FutureTask<BatchFutureResult<T, R>>> futureTasks = sources.stream().map(t -> {

            FutureTask<BatchFutureResult<T, R>> resultFutureTask = new FutureTask<>(() -> {
                BatchFutureResult<T, R> batchFutureResult = new BatchFutureResult<>();
                R apply = null;
                Integer status = StatusEnum.SUCCESS.getStatus();
                try {
                    apply = function.apply(t);
                } catch (Exception e) {
                    log.error("批量任务异常", e);
                    batchFutureResult.setMessage(ExceptionUtil.getLogErrorMessage(e));
                    status = StatusEnum.ERROR.getStatus();
                }
                batchFutureResult.setSource(t);
                batchFutureResult.setResult(apply);
                batchFutureResult.setStatus(status);
                return batchFutureResult;
            });
            //放入线程池等待处理
            executor.submit(resultFutureTask);
            return resultFutureTask;
        }).collect(Collectors.toList());


        List<BatchFutureResult<T, R>> results = new ArrayList<>();
        //循环获取结果
        for (int i = 0; i < sources.size(); i++) {
            //获取数据
            T source = sources.get(i);
            FutureTask<BatchFutureResult<T, R>> futureTask = futureTasks.get(i);
            //抓取结果
            BatchFutureResult<T, R> batchFutureResult = new BatchFutureResult<>();
            try {
                if (timeout == null) {
                    batchFutureResult = futureTask.get();
                } else {
                    batchFutureResult = futureTask.get(timeout, TimeUnit.MILLISECONDS);
                }
            } catch (Exception e) {
                log.error("", e);
                batchFutureResult.setStatus(StatusEnum.ERROR.getStatus());
                batchFutureResult.setMessage(ExceptionUtil.getLogErrorMessage(e));
                batchFutureResult.setSource(source);
            }
            //放入数据
            results.add(batchFutureResult);
        }
        return results;
    }
@Data
public class BatchFutureResult<T, R> {

    /**
     * 是否成功
     */
    private Integer status;

    /**
     * 异常时的原因
     */
    private String message;

    /**
     * 原数据
     */
    private T source;

    /**
     * 结果数据
     */
    private R result;

}
@Getter
public enum StatusEnum {
    SUCCESS(1, "成功"),
    ERROR(2, "失败");

    StatusEnum(Integer status, String desc) {
        this.status = status;
        this.desc = desc;
    }

    private Integer status;

    private String desc;
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容