package com.example.wxh.queue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
- Created by
- 通用的多线程,算分页的逻辑 ,计算一推数据,算出分页再多线程处理
- @auth wangxinhui
- @Date 2019/11/29
*/
public class CommonCompletion {
public static void main(String args[]){
long starttime = System.currentTimeMillis();
int totals = 1000000000;
int coreThread = 5;
int nThread = 20;
LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(10); //阻塞队列
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(coreThread, nThread,
0L, TimeUnit.MILLISECONDS,
linkedBlockingQueue);
LinkedBlockingQueue<Future<Long>> QueueingFuture = new LinkedBlockingQueue<Future<Long>>(10);
CompletionService<Long> completionService = new ExecutorCompletionService(poolExecutor,QueueingFuture);
long step = totals/nThread; //每页处理的条数 : pageSize
for(int i=1;i<= nThread ;i++){
long start = (i-1)*step+1; //开启的条数
long end;
if (i==nThread){ //是否是最后一个线程
end = totals;
}else{
end = i*step;
}
completionService.submit(new Callable<Long>() {
long result = 0l;
@Override
public Long call() throws Exception {
for(long i= start ;i<= end;i++){
result+=i;
}
System.out.println(Thread.currentThread() +"--"+ poolExecutor.toString() +"------"+start + "~"+ end+",result =" +result);
return result;
}
});
}
long count = 0l;
for(int i = 1 ; i<= nThread ; i++){
try {
count += completionService.take().get();
System.out.println("i="+i+",count =" +count);
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println(" count= "+count);
System.out.println(" task done !");
System.out.println(" times ="+(System.currentTimeMillis()-starttime));
poolExecutor.shutdown();
}
}