一、创建线程的几种方式
1.继承thread类
public class ThreadTest {
public static void main(String[] args) {
Thread01 thread = new Thread01();
thread.start();
}
public static class Thread01 extends Thread{
@Override
public void run() {
System.out.println(""+Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
}
}
}
2.实现runnable接口
public class ThreadTest {
public static void main(String[] args) {
Thread thread = new Thread(new Runnable01());
thread.start();
}
public static class Runnable01 implements Runnable{
@Override
public void run() {
System.out.println(""+Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
}
}
}
3.实现Callable通过FutureTask创建线程
public class ThreadTest {
public static void main(String[] args) {
FutureTask<Integer> futureTask = new FutureTask<>(new Callable01());
Thread thread = new Thread(futureTask);
thread.start();
}
public static class Callable01 implements Callable<Integer>{
@Override
public Integer call() throws Exception {
System.out.println(""+Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}
}
}
4.线程池创建线程
七大参数:
corePoolSize:固定线程数
maximumPoolSize:最大线程数
keepAliveTime:存活时间
unit:时间单位
BlockingQueue<Runnable>:阻塞队列
ThreadFactory: 线程工厂
RejectedExecutionHandler:拒绝策略
疑问:拒绝策略丢弃的任务如何去解决
1.改写拒绝策略,延迟任务重新投向线程池
2.打印对应任务参数,可以做塞回数据库,或者打印出来方便排查问题
问题
Q:如何打印线程参数
A:RetryPolicy#rejectedExecution里面通过判断runnable的类型,然后进行打印相关参数
Q:有没有其他方案
A:有的,比如说有些就不用使用延迟队列,比如说我们是从数据库读取到的任务,执行成功就修改执行的标识,如果不成功或者任务被拒绝了,它下次扫描还是会继续塞回去
Q:延迟队列如果宕机的话,任务也丢失了怎么办
A:这里的打印日志就很重要了,可以记录起来,或者加个hook回调钩子,在宕机的时候将这些数据写回数据库(kill -9 pid不会调用hook~)
5.CompletableFuture异步编排
创建异步对象:
1、runXxxx 都是没有返回结果的,supplyXxx 都是可以获取返回结果的
2、可以传入自定义的线程池,否则就用默认的线程池;
列子:
CompletableFuture<Void> futureRunnable = CompletableFuture.runAsync(()->{
System.out.println("启动执行"+Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
},threadPoolExecutor);
try {
futureRunnable.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("启动执行" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i;
}, threadPoolExecutor).whenCompleteAsync((res,e)->{
System.out.println("whenCompleteAsync===>:"+res+"===>"+e);
},threadPoolExecutor);
System.out.println("返回结果===》:"+ future.get());
计算完成时回调:
whenComplete 可以处理正常和异常的计算结果,exceptionally 处理异常情况。
whenComplete 和 whenCompleteAsync 的区别:
whenComplete:是执行当前任务的线程执行继续执行 whenComplete 的任务。
whenCompleteAsync:是执行把 whenCompleteAsync 这个任务继续提交给线程池
来进行执行。
方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程
执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)
列子:
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("启动执行" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i;
}, threadPoolExecutor).whenCompleteAsync((res,e)->{
System.out.println("whenCompleteAsync===>:"+res+"===>"+e);
},threadPoolExecutor).exceptionally(throwable->{
return 10;
});
System.out.println("返回结果===》:"+ future.get());
handle 方法:可改变返回值
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("启动执行" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}, threadPoolExecutor).handle((res,e)->{ //第一个参数 线程返回值,第二个是 异常返回
if (res != null){
return res*2;
}
if (e == null){
return 0;
}
return -1;
});
线程串行化方法
thenApply 方法:当一个线程依赖另一个线程时,获取上一个任务返回的结果,并返回当前
任务的返回值。
thenAccept 方法:消费处理结果。接收任务的处理结果,并消费处理,无返回结果。
thenRun 方法:只要上面的任务执行完成,就开始执行 thenRun,只是处理完任务后,执行
thenRun 的后续操作
带有 Async 默认是异步执行的。同之前。
以上都要前置任务成功完成。
Function<? super T,? extends U>
T:上一个任务返回结果的类型
U:当前任务的返回值类型
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}, threadPoolExecutor).thenApplyAsync(res -> {
System.out.println("第二任务启动");
return "hallo"+res;
}, threadPoolExecutor);
==================================
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}, threadPoolExecutor).thenAcceptAsync(res -> {
System.out.println("开启另一个任务");
}, threadPoolExecutor);
两任务组合 - 都要完成
两个任务必须都完成,触发该任务。
thenCombine:组合两个 future,获取两个 future 的返回结果,并返回当前任务的返回值
thenAcceptBoth:组合两个 future,获取两个 future 任务的返回结果,然后处理任务,没有
返回值。
runAfterBoth:组合两个 future,不需要获取 future 的结果,只需两个 future 处理完任务后,
处理该任务。
CompletableFuture<Integer> futur1 = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}, threadPoolExecutor);
CompletableFuture<Integer> futur2 = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 5;
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("结束当前线程");
return i;
}, threadPoolExecutor);
CompletableFuture<String> stringCompletableFuture = futur1.thenCombineAsync(futur2, (f1, f2) -> {
System.out.println("全部结束:");
return f1 + ":" + f2;
}, threadPoolExecutor);
两任务组合 - 一个完成
当两个任务中,任意一个 future 任务完成的时候,执行任务。
applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
runAfterEither:两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返
回值。
CompletableFuture<Integer> futur1 = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程futur1:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果futur1:" + i);
return i;
}, threadPoolExecutor);
CompletableFuture<Integer> futur2 = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程futur2:" + Thread.currentThread().getId());
int i = 10 / 5;
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("结束当前线程futur2");
return i;
}, threadPoolExecutor);
CompletableFuture<Integer> future = futur1.applyToEitherAsync(futur2, (s) -> {
System.out.println("第三个任务future");
return s;
}, threadPoolExecutor);
多任务组合
allOf:等待所有任务完成
anyOf:只要有一个任务完成
CompletableFuture<Integer> futur1 = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程futur1:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果futur1:" + i);
return i;
}, threadPoolExecutor);
CompletableFuture<Integer> futur2 = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程futur2:" + Thread.currentThread().getId());
int i = 10 / 5;
try {
Thread.sleep(7000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("结束当前线程futur2");
return i;
}, threadPoolExecutor);
CompletableFuture<Void> futureAllOf = CompletableFuture.allOf(futur1, futur2);
System.out.println(futureAllOf.get());
CompletableFuture<Object> future = CompletableFuture.anyOf(futur1, futur2);
System.out.println(future.get());