多线程处理数据
// 方法入口
public void updateDatasMult() {
List<User> users = findDataDao.findAllUpLicData(); //待处理的数据
long start,end;
start = System.currentTimeMillis();
try {
batchDeal(users, 1000);//每个线程处理1000条数据
} catch (InterruptedException e) {
e.printStackTrace();
}
end = System.currentTimeMillis();
logger.info("start time:" + start+ "; end time:" + end+ "; Run Time:" + (end - start) + "(ms)");
}
private void batchDeal(List data, int batchNum) throws InterruptedException {
int totalNum = data.size();
int pageNum = totalNum % batchNum == 0 ? totalNum / batchNum : totalNum / batchNum + 1;
ExecutorService executor = Executors.newFixedThreadPool(pageNum);
try {
CountDownLatch countDownLatch = new CountDownLatch(pageNum);
List subData = null;
int fromIndex, toIndex;
for (int i = 0; i < pageNum; i++) {
fromIndex = i * batchNum;
toIndex = Math.min(totalNum, fromIndex + batchNum);
subData = data.subList(fromIndex, toIndex);
ImportTask task = new ImportTask(subData, countDownLatch);
executor.execute(task);
}
countDownLatch.await();
logger.info("数据操作完成!");
} finally {
// 关闭线程池,释放资源
executor.shutdown();
}
}
//线程方法
class ImportTask implements Runnable {
private List list;
private CountDownLatch countDownLatch;
public ImportTask(List data, CountDownLatch countDownLatch) {
this.list = data;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
if (null != list) {
//更新
list.stream().forEach(o -> {
//TODO 处理逻辑
});
}
countDownLatch.countDown();
}
}