异步、线程池与CompletableFuture异步编排

一、创建线程的几种方式

1.继承thread类

public class ThreadTest {
    public static void main(String[] args) {
        Thread01 thread = new Thread01();
        thread.start();
    }
    public static class Thread01 extends Thread{
        @Override
        public void run() {
            System.out.println(""+Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
        }
    }
}

2.实现runnable接口

public class ThreadTest {
    public static void main(String[] args) {
        Thread thread = new Thread(new Runnable01());
        thread.start();
    }

    public static class Runnable01 implements Runnable{
        @Override
        public void run() {
            System.out.println(""+Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
        }
    }

}


3.实现Callable通过FutureTask创建线程

public class ThreadTest {
    public static void main(String[] args) {

        FutureTask<Integer> futureTask = new FutureTask<>(new Callable01());
        Thread thread = new Thread(futureTask);
        thread.start();
    }
    public static class Callable01 implements Callable<Integer>{
        @Override
        public Integer call() throws Exception {
            System.out.println(""+Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        }
    }
}

4.线程池创建线程

七大参数:
corePoolSize:固定线程数
maximumPoolSize:最大线程数
keepAliveTime:存活时间
unit:时间单位
BlockingQueue<Runnable>:阻塞队列
ThreadFactory: 线程工厂
RejectedExecutionHandler:拒绝策略

疑问:拒绝策略丢弃的任务如何去解决

1.改写拒绝策略,延迟任务重新投向线程池
2.打印对应任务参数,可以做塞回数据库,或者打印出来方便排查问题
问题
Q:如何打印线程参数
A:RetryPolicy#rejectedExecution里面通过判断runnable的类型,然后进行打印相关参数

Q:有没有其他方案
A:有的,比如说有些就不用使用延迟队列,比如说我们是从数据库读取到的任务,执行成功就修改执行的标识,如果不成功或者任务被拒绝了,它下次扫描还是会继续塞回去

Q:延迟队列如果宕机的话,任务也丢失了怎么办
A:这里的打印日志就很重要了,可以记录起来,或者加个hook回调钩子,在宕机的时候将这些数据写回数据库(kill -9 pid不会调用hook~)

5.CompletableFuture异步编排

创建异步对象:

image.png

1、runXxxx 都是没有返回结果的,supplyXxx 都是可以获取返回结果的
2、可以传入自定义的线程池,否则就用默认的线程池;
列子:

CompletableFuture<Void> futureRunnable = CompletableFuture.runAsync(()->{
            System.out.println("启动执行"+Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
        },threadPoolExecutor);
        try {
            futureRunnable.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("启动执行" + Thread.currentThread().getId());
            int i = 10 / 0;
            System.out.println("运行结果:" + i);
            return i;
        }, threadPoolExecutor).whenCompleteAsync((res,e)->{
            System.out.println("whenCompleteAsync===>:"+res+"===>"+e);
        },threadPoolExecutor);
        System.out.println("返回结果===》:"+ future.get());

计算完成时回调:

image.png

whenComplete 可以处理正常和异常的计算结果,exceptionally 处理异常情况。
whenComplete 和 whenCompleteAsync 的区别:
whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池
来进行执行。
方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程
执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)
列子:

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("启动执行" + Thread.currentThread().getId());
            int i = 10 / 0;
            System.out.println("运行结果:" + i);
            return i;
        }, threadPoolExecutor).whenCompleteAsync((res,e)->{
            System.out.println("whenCompleteAsync===>:"+res+"===>"+e);
        },threadPoolExecutor).exceptionally(throwable->{
            return 10;
        });
        System.out.println("返回结果===》:"+ future.get());

handle 方法:可改变返回值

image.png
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("启动执行" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        }, threadPoolExecutor).handle((res,e)->{ //第一个参数 线程返回值,第二个是 异常返回
            if (res != null){
                return res*2;
            }
            if (e == null){
                return 0;
            }
            return -1;
        });

线程串行化方法

image.png

thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前
任务的返回值。
thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。
thenRun 方法:只要上面的任务执行完成,就开始执行 thenRun,只是处理完任务后,执行
thenRun 的后续操作
带有 Async 默认是异步执行的。同之前。
以上都要前置任务成功完成。
Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        }, threadPoolExecutor).thenApplyAsync(res -> {
            System.out.println("第二任务启动");
            return "hallo"+res;
        }, threadPoolExecutor);
==================================

        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        }, threadPoolExecutor).thenAcceptAsync(res -> {
            System.out.println("开启另一个任务");
        }, threadPoolExecutor);

两任务组合 - 都要完成

image.png

image.png

两个任务必须都完成,触发该任务。
thenCombine:组合两个 future,获取两个 future 的返回结果,并返回当前任务的返回值
thenAcceptBoth:组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有
返回值。
runAfterBoth:组合两个 future,不需要获取 future 的结果,只需两个 future 处理完任务后,
处理该任务。

        CompletableFuture<Integer> futur1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果:" + i);
            return i;
        }, threadPoolExecutor);
        CompletableFuture<Integer> futur2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程:" + Thread.currentThread().getId());
            int i = 10 / 5;
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("结束当前线程");
            return i;
        }, threadPoolExecutor);
        CompletableFuture<String> stringCompletableFuture = futur1.thenCombineAsync(futur2, (f1, f2) -> {
            System.out.println("全部结束:");
            return f1 + ":" + f2;
        }, threadPoolExecutor);

两任务组合 - 一个完成

image.png

image.png

当两个任务中,任意一个 future 任务完成的时候,执行任务。
applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
runAfterEither:两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返
回值。

        CompletableFuture<Integer> futur1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程futur1:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果futur1:" + i);
            return i;
        }, threadPoolExecutor);
        CompletableFuture<Integer> futur2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程futur2:" + Thread.currentThread().getId());
            int i = 10 / 5;
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("结束当前线程futur2");
            return i;
        }, threadPoolExecutor);
        CompletableFuture<Integer> future = futur1.applyToEitherAsync(futur2, (s) -> {
            System.out.println("第三个任务future");
            return s;
        }, threadPoolExecutor);

多任务组合

image.png

allOf:等待所有任务完成
anyOf:只要有一个任务完成

        CompletableFuture<Integer> futur1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程futur1:" + Thread.currentThread().getId());
            int i = 10 / 2;
            System.out.println("运行结果futur1:" + i);
            return i;
        }, threadPoolExecutor);
        CompletableFuture<Integer> futur2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("当前线程futur2:" + Thread.currentThread().getId());
            int i = 10 / 5;
            try {
                Thread.sleep(7000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("结束当前线程futur2");
            return i;
        }, threadPoolExecutor);
        CompletableFuture<Void> futureAllOf = CompletableFuture.allOf(futur1, futur2);
        System.out.println(futureAllOf.get());
        CompletableFuture<Object> future = CompletableFuture.anyOf(futur1, futur2);
        System.out.println(future.get());
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容