准备
保证写入时,线程安全的List和Set结构?
ConcurrentHashMap是线程安全的HashMap,CopyOnWriteArrayList是线程安全的ArrayList。
CopyOnWriteArraySet是线程安全的HashSet。考虑多线程处理任务点?
任务的类型: 计算能力复杂,IO操作;
任务是否异步: 同步,异步
每个子线程是否有依赖关系:有,没有 (例如:使用多线程处理 从1累加到1万)
如果是多个任务,每个任务使用多线程处理,主线程需要等待子线程任务执行完毕之后在进行执行: 在没有CompletableFuture 使用CountDownLatch解决
(https://blog.csdn.net/qq_38599840/article/details/120708245)
使用CompletableFuture处理依赖任务
- 线程池的方式并行获取数据弊端?
CPU资源大量浪费在阻塞等待上,导致CPU资源利用率低。在Java 8之前,一般会通过回调的方式来减少阻塞,但是大量使用回调,又引发臭名昭著的回调地狱问题,导致代码可读性和可维护性大大降低。
为了增加并发度,会引入更多额外的线程池,随着CPU调度线程数的增加,会导致更严重的资源争用,宝贵的CPU资源被损耗在上下文切换上,而且线程本身也会占用系统资源,且不能无限增加。
同步模型下,会导致硬件资源无法充分利用,系统吞吐量容易达到瓶颈。
- 保证了多线程写入时安全,咋样使用什么样的多线程呢?
使用executor.submit(() -> {})处理RPC任务;
使用future处理每一个子线程是否 终端还是跳过 逻辑
public class Test3 {
static ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100), new ThreadFactoryBuilder()
.setNameFormat("coustomThread %d")
.setUncaughtExceptionHandler((t,e) -> System.out.println("UncaughtExceptionHandler捕获到:" +t.getName()+"发生异常:"+e.getMessage()))
.build());
public static void main(String[] args) {
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6);//线程不安全List
List<String> test = test(list);
System.out.println("主线程:" + Thread.currentThread().getName() + ":" + test);
}
public static List<String> test(List<Integer> list) {
List<String> safeList = new CopyOnWriteArrayList();//线程安全的list
list.stream().map(i -> executor.submit(() -> {
//模拟rpc
try {
Thread.sleep(100);
if(i == 3) {
int j = i / 0;
}
safeList.add(i + "A");
System.out.println("子线程:" + Thread.currentThread().getName() + "返回参数:" + i + "A");
return i + "A";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})).collect(Collectors.toList()).forEach(future -> {
try {
if (future != null) {
Object o = future.get();
// System.out.println("子线程:" + Thread.currentThread().getName() + "返回参数:" + o);
}
} catch (InterruptedException e) {
//如果不抛出异常,那么线程执行不会终端;反之,如果抛出异常,则线程中断
// throw new RuntimeException(e);
} catch (ExecutionException e) {
// throw new RuntimeException(e);
}
});
System.out.println(123);
return safeList;
}
``
4. 使用CountDownLatch主线程需要等待子线程任务执行完毕之后在进行执行?
```java
public class Test4 {
static ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100), new ThreadFactoryBuilder()
.setNameFormat("coustomThread %d")
.setUncaughtExceptionHandler((t,e) -> System.out.println("UncaughtExceptionHandler捕获到:" +t.getName()+"发生异常:"+e.getMessage()))
.build());
public static void main(String[] args) throws Exception{
System.out.println("主线程开始:" + Thread.currentThread().getName());
CountDownLatch downLatch = new CountDownLatch(2);
//任务1
AtomicReference<String> q1 = new AtomicReference<>("");//线程安全
Future<String> future1 = executor.submit(() -> {
try {
Thread.sleep(1500);
q1.set("任务1");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// downLatch.countDown();
return "任务1";
});
//任务2
AtomicReference<String> q2 = new AtomicReference<>("");//线程安全
Future<String> future2 = executor.submit(() -> {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
q2.set("任务2" + q1.get());
// downLatch.countDown();
return "任务2" + q1.get();
});
// downLatch.await();
Thread.sleep(600);
System.out.println("获取任务1返回:" + future1.get());
System.out.println("获取任务2返回:" + future2.get());
System.out.println("主线程结束:" + Thread.currentThread().getName());
}
/*
* 主线程开始:main
获取任务1返回:任务1
获取任务2返回:任务2任务1
主线程结束:main
* */
}
CompletableFuture
实例
public void testCompletableInfo() throws InterruptedException, ExecutionException {
long startTime = System.currentTimeMillis();
//调用用户服务获取用户基本信息
CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() ->
//模拟查询商品耗时500毫秒
{
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "用户A";
});
//调用商品服务获取商品基本信息
CompletableFuture<String> goodsFuture = CompletableFuture.supplyAsync(() ->
//模拟查询商品耗时500毫秒
{
try {
Thread.sleep(400);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "商品A";
});
System.out.println("获取用户信息:" + userFuture.get());
System.out.println("获取商品信息:" + goodsFuture.get());
//模拟主程序耗时时间
Thread.sleep(600);
System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
}
相关的方法使用
public class TestFuture1 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
long startTime = System.currentTimeMillis();
//自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(5);
//cf1 cf2是0依赖
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "cf1", executor);
CompletableFuture<String> cf2 = CompletableFuture.completedFuture("cf2");
//CF3,CF5分别依赖于CF1和CF2,一元依赖
CompletableFuture<String> cf3 = cf1.thenApply(res1 -> "cf3");
CompletableFuture<String> cf5 = cf2.thenApply(res2 -> "cf5");
//cf4 依赖 cf1和cf2, 二元依赖
CompletableFuture<String> cf4 = cf1.thenCombine(cf2, (res1, res2) -> "cf4");
//cf6 依赖 cf3, cf4, cf5, 多元依赖
CompletableFuture<Void> cf6 = CompletableFuture.allOf(cf3, cf4, cf5);
//最终结果
CompletableFuture<String> result = cf6.thenApply(v -> "cf6");
// //模拟主程序耗时时间
// Thread.sleep(600);
// System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
}
}
注意事项
- fulture需要返回值,才能获取异常信息
Future需要获取返回值,才能获取到异常信息。如果不加 get()/join()方法,看不到异常信息。
小伙伴们使用的时候,注意一下哈,考虑是否加try...catch...或者使用exceptionally方法。
CompletableFuture.get()方法是阻塞的(CompletableFuture.get(5, TimeUnit.SECONDS);)
CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,需要添加超时时间。不建议使用默认的线程池(ForkJoinPool中的共用线程池CommonPool(CommonPool的大小是CPU核数-1,如果是IO密集的应用,线程数可能成为瓶颈)。)
CompletableFuture代码中又使用了默认的「ForkJoin线程池」,处理的线程个数是电脑「CPU核数-1」。在大量请求过来的时候,处理逻辑复杂的话,响应会很慢。一般建议使用自定义线程池,优化线程池配置参数;
前面提到,异步回调方法可以选择是否传递线程池参数Executor,这里我们建议强制传线程池,且根据实际情况做线程池隔离。
当不传递线程池时,会使用ForkJoinPool中的公共线程池CommonPool,这里所有调用将共用该线程池,核心线程数=处理器数量-1(单核核心线程数为1),所有异步回调都会共用该CommonPool,核心与非核心业务都竞争同一个池中的线程,很容易成为系统瓶颈。手动传递线程池参数可以更方便的调节参数,并且可以给不同的业务分配不同的线程池,以求资源隔离,减少不同业务之间的相互干扰。
- 自定义线程池,注意饱和策略
CompletableFuture的get()方法是阻塞的,我们一般建议使用future.get(5, TimeUnit.SECONDS)。并且一般建议使用自定义线程池。
但是如果线程池拒绝策略是DiscardPolicy或者DiscardOldestPolicy,当线程池饱和时,会直接丢弃任务,不会抛弃异常。因此建议,CompletableFuture线程池策略最好使用AbortPolicy,然后耗时的异步线程,做好线程池隔离哈。