概述
多个线程请求一个资源,请求是单次触发的,且触发时机不定。为了节省io资源,CountDownLatch实现多线程堵塞,批量处理后,再唤醒原线程继续执行。
代码
模拟请求类
public class MainDispatchClass {
public static void main(String[] args) {
BatchProcess batchProcess = new BatchProcess();
for (int i = 0; i < 15; i++) {
final int inputParam = i * 5;
new Thread(() -> {
String name = Thread.currentThread().getName();
System.out.println(name + "输入:" + inputParam);
int outPut = batchProcess.process(inputParam, Thread.currentThread().getName());
System.out.println(name + "输出" + outPut);
}, "线程:" + i
).start();
}
}
}
堵塞批量处理类
public class BatchProcess {
private volatile int index = 0;
private final int batchSize = 10;
private volatile int emptyRun = 0;
private volatile BatchProcessCapacity currentCapacity;
private volatile LinkedList<BatchProcessCapacity> historyCapacity = new LinkedList<>();
private ScheduledExecutorService scheduledExecutorService;
public BatchProcess() {
scheduledExecutorService = new ScheduledThreadPoolExecutor(5);
scheduledExecutorService.scheduleAtFixedRate(this::batchProcess, 1, 5, TimeUnit.SECONDS);
}
//用于唤醒
private void batchProcess() {
while (!historyCapacity.isEmpty()) {
BatchProcessCapacity batchProcessCapacity = historyCapacity.pollFirst();
batchProcessCapacity.process();
emptyRun = 0;
}
System.out.println("empty run." + emptyRun);
emptyRun++;
if (emptyRun == 5) {
if (Objects.nonNull(currentCapacity)) {
replace();
}
emptyRun = 0;
}
}
public int process(int inputParam, String unique) {
BatchProcessCapacity capacity = getCapacity(inputParam, unique);
return capacity.countWait(unique); //堵塞等待唤醒
}
private synchronized BatchProcessCapacity getCapacity(int inputParam, String unique) {
//每10个一批
if ((index % batchSize) == 0) {
if (Objects.nonNull(currentCapacity)) {
replace();
index = 0;
}
index++;
BatchProcessCapacity batchProcessCapacity = new BatchProcessCapacity(inputParam, unique);
currentCapacity = batchProcessCapacity;
return currentCapacity;
}
index++;
currentCapacity.add(inputParam, unique);
return currentCapacity;
}
// 不断重建
private synchronized void replace() {
historyCapacity.addFirst(currentCapacity);
currentCapacity = null;
}
public class BatchProcessCapacity {
List<Integer> allInputParameter;
CountDownLatch countDownLatch;
Map<Integer, String> threadMap;
Map<String, Integer> valueMap;
public BatchProcessCapacity(Integer inputParameter, String unique) {
this.allInputParameter = new ArrayList<>();
allInputParameter.add(inputParameter);
this.countDownLatch = new CountDownLatch(1);
threadMap = new HashMap<>();
threadMap.put(inputParameter, unique);
valueMap = new HashMap<>();
}
public void add(Integer inputParameter, String unique) {
allInputParameter.add(inputParameter);
threadMap.put(inputParameter, unique);
}
public CountDownLatch getCountDownLatch() {
return countDownLatch;
}
public int countWait(String unique) {
try {
this.countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
return valueMap.get(unique);
}
public void process() {
for (Integer integer : allInputParameter) {
int value = integer + 1;
valueMap.put(threadMap.get(integer), value);
}
countDownLatch.countDown();
}
public Integer getValue(String unique) {
return valueMap.get(unique);
}
}
}
后记
当前代码只是用于验证可行性,尚不能直接用于生产环境。