最近遇到一个需求,一批数据,要去请求A B C 多个接口。不同接口返回不同字段的值。然后设置到原来的对象中。
其中 A BC 接口每次请求都对数量有限制。
好了,使用CompleteFuture来解决
代码:
@GetMapping("/user2")
public List<User> getData() throws Exception {
List<User> userList = new ArrayList<>(2000);
for (int j = 0; j < 1000; j++) {
User u1 = new User();
u1.setId(j + "");
u1.setAddress("地址:" + j);
u1.setAge(j + "");
userList.add(u1);
}
long l = System.currentTimeMillis();
// getOtherInfo(userList);
// getUserName(userList);
// getMobile(userList);
CompletableFuture<Void> otherTask = CompletableFuture.runAsync(() -> {
getOtherInfo(userList);
}, poolExecutor);
CompletableFuture<Void> nameTask = CompletableFuture.runAsync(() -> {
getUserName(userList);
}, poolExecutor);
CompletableFuture<Void> mobileTask = CompletableFuture.runAsync(() -> {
getMobile(userList);
}, poolExecutor);
try {
CompletableFuture.allOf(nameTask, mobileTask, otherTask).join();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("================调用第三方耗时:" + (System.currentTimeMillis() - l) + " 毫秒");
return userList;
}
原始请求接口 A B C
/**
* 模拟调用第三方,获取其他信息
*
* @param users
*/
private void getOtherInfo(List<User> users) {
long beginTime = System.currentTimeMillis();
List<List<User>> subUsers = new ArrayList<>();
List<List<User>> partitionList = getPartitionList(users, subUsers, 50);
// partitionList.forEach(list -> {
// sendRequestToService(100);
// list.forEach(user -> {
// user.setBirth("生日: " + user.getId());
// user.setSex("性别:" + user.getId());
// });
// });
List<CompletableFuture> futures=new ArrayList<>();
for (int i = 0; i < partitionList.size(); i++) {
List<User> list = partitionList.get(i);
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() + " 正在执行任务");
sendRequestToService(100);
list.forEach(user -> {
user.setBirth("生日: " + user.getId());
user.setSex("性别:" + user.getId());
});
});
futures.add(future);
}
CompletableFuture allFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
allFuture.join();
// System.out.println(Thread.currentThread().getName() + " 正在执行任务");
System.out.println("获取其他信息耗时:" + (System.currentTimeMillis() - beginTime) + " 毫秒");
}
/**
* 获取用户姓名
*
* @param users
*/
private void getUserName(List<User> users) {
long beginTime = System.currentTimeMillis();
List<List<User>> subUsers = new ArrayList<>();
List<List<User>> partitionList = getPartitionList(users, subUsers, 200);
partitionList.forEach(list -> {
sendRequestToService(130);
list.forEach(user -> {
user.setUsername("姓名:" + user.getId());
});
});
System.out.println(Thread.currentThread().getName() + " 正在执行任务");
System.out.println("获取用户姓名 耗时:" + (System.currentTimeMillis() - beginTime) + " 毫秒");
}
/**
* 获取手机
* @param users
*/
private void getMobile(List<User> users) {
long beginTime = System.currentTimeMillis();
List<List<User>> subUsers = new ArrayList<>();
List<List<User>> partitionList = getPartitionList(users, subUsers, 400);
partitionList.forEach(list -> {
sendRequestToService(150);
list.forEach(user -> {
user.setMobile("手机:" + user.getId());
});
});
System.out.println(Thread.currentThread().getName() + " 正在执行任务");
System.out.println("获取用户姓名 耗时:" + (System.currentTimeMillis() - beginTime) + " 毫秒");
}
数据进行分片 和模拟时间
/**
* 根据分片来获取数据
*
* @param users
* @param subUsers
* @param count
* @return
*/
private List<List<User>> getPartitionList(List<User> users, List<List<User>> subUsers, int count) {
if (users.size() >= count) {
subUsers = Lists.partition(users, count);
} else {
subUsers.add(users);
}
return subUsers;
}
/**
* 根据传入时间来判断暂停接口多少毫秒
*
* @param i
*/
private void sendRequestToService(int count) {
try {
// 模拟请求对面接口count毫秒
TimeUnit.MILLISECONDS.sleep(count);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Bean
@Qualifier(value = "MyThread")
private ThreadPoolTaskExecutor poolExecutor(){
ThreadPoolTaskExecutor executor =new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setKeepAliveSeconds(10);
executor.setThreadNamePrefix("Pool-Nexus");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
最终结果:
不使用 future
http-nio-8009-exec-1 正在执行任务
获取其他信息耗时:2012 毫秒
http-nio-8009-exec-1 正在执行任务
获取用户姓名 耗时:652 毫秒
http-nio-8009-exec-1 正在执行任务
获取用户姓名 耗时:454 毫秒
================调用第三方耗时:3118 毫秒
使用3个
Pool-Nexus3 正在执行任务
获取用户姓名 耗时:453 毫秒
Pool-Nexus2 正在执行任务
获取用户姓名 耗时:654 毫秒
Pool-Nexus1 正在执行任务
获取其他信息耗时:2011 毫秒
================调用第三方耗时:2014 毫秒
使用3个再为了获取其他信息接口再套3个
ForkJoinPool.commonPool-worker-1 正在执行任务
ForkJoinPool.commonPool-worker-2 正在执行任务
ForkJoinPool.commonPool-worker-4 正在执行任务
ForkJoinPool.commonPool-worker-3 正在执行任务
ForkJoinPool.commonPool-worker-5 正在执行任务
ForkJoinPool.commonPool-worker-3 正在执行任务
ForkJoinPool.commonPool-worker-4 正在执行任务
ForkJoinPool.commonPool-worker-2 正在执行任务
ForkJoinPool.commonPool-worker-5 正在执行任务
ForkJoinPool.commonPool-worker-1 正在执行任务
ForkJoinPool.commonPool-worker-3 正在执行任务
ForkJoinPool.commonPool-worker-4 正在执行任务
ForkJoinPool.commonPool-worker-2 正在执行任务
ForkJoinPool.commonPool-worker-1 正在执行任务
ForkJoinPool.commonPool-worker-5 正在执行任务
ForkJoinPool.commonPool-worker-4 正在执行任务
ForkJoinPool.commonPool-worker-2 正在执行任务
ForkJoinPool.commonPool-worker-3 正在执行任务
ForkJoinPool.commonPool-worker-5 正在执行任务
ForkJoinPool.commonPool-worker-1 正在执行任务
获取其他信息耗时:406 毫秒
Pool-Nexus3 正在执行任务
获取用户姓名 耗时:454 毫秒
Pool-Nexus2 正在执行任务
获取用户姓名 耗时:656 毫秒
================调用第三方耗时:660 毫秒
1个异步加多个completeFuture