项目中遇到一个这样的需求, 使用POS机对用户进行充值,且渠道实时知道充值结果在POS机显示. 但后端因为涉及第三方平台的调用, 第三方平台的结果异步通知, 所以后端无法给POS机同步响应结果, 解决这个问题的方案有两种.
- 后端提供两个接口, 下单接口与查询接口, POS机调用下单接口之后, 轮询调用查询接口, 查询最终结果, 若规定时间(根据业务需求而定)一直没有终态, 显示处理中, 提示最终结果会已push, 短信通知或人工处理.
- 后端只提供一个下单接口, 再后端使用CountDownLatch对异步进行同步化, 下面我们主要分析这种方法
先上代码
public CreateTradeRspDTO createTrade(CreateTradeReqDTO createTradeReqDTO) {
// 省略下单操作
// 异步同步化
final CountDownLatch countDown = new CountDownLatch(1);
// 使用线程池创建线程执行订单结果查询
// 我们服务调用使用dubbo, 超时时间设置5s, 所以CHECK_ACCOUNT_TIMES设置为8
// 最多进行查询8次, 每次0.5s, 如果8次也没有查询到最终结果, 也会执行countDown.countDown();
ThreadPoolExecutorUtil.execute(() -> {
for (int i = 0; i < CHECK_ACCOUNT_TIMES; i++) {
try {
TimeUnit.MILLISECONDS.sleep(500);
// 查询数据库中订单结果
ChannelFlowBO bo= channelFlowService.selectByOrderNo(orderNo);
// 有终态跳出循环, 否则一直循环
if(FlowStatusEnum.SUCCESS == bo.getFlowStatus || FlowStatusEnum.FALS== bo.getFlowStatus) {
// 省略设置订单状态
break;
}
} catch (Exception e) {
log.warn("**************************查询报错******************", e);
}
}
countDown.countDown();
});
try {
// 这一步需要在线程执行后面, 如果 countDown.await()放在线程执行前面, 程序会卡死
countDown.await();
} catch (Exception e) {
log.error("*****************CountDownLatch异常*****", e);
}
// 省略下单返回数据
}
经过上面的预热, 下面现在开始对CountDownLatch进行分析
CountDownLatch
CountDownLatch原理是创建是指定state的值, 然后调用await挂起线程, 后续每调用移除countDown,state-1,当state值为0时, 唤醒线程. 下面进行源码分析.
@Test
public void testCountDownLatch() throws InterruptedException {
final CountDownLatch downLatch = new CountDownLatch(2);
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
downLatch.countDown();
System.out.println("执行第一次countDown");
}).start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
downLatch.countDown();
System.out.println("执行第二次countDown");
}).start();
downLatch.await();
System.out.println("线程放行");
}
await分析
// 创建过程很简单, 就是为state设置值
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
// await开始
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// 只有当 state == 0 的时候,这个方法才会返回 1,否则返回-1
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 对head进行延时初始化, 将node设置为tail
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 这下面之前分析
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
创建CountDownLatch与await操作不难理解, 只要看过上一篇AbstractQueuedSynchronizer源码分析- ReentrantLock抢锁解锁就能很快理解.
countDown分析
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// 对state -1 操作, 若state-1 == 0返回true,否则返回false
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
// tryReleaseShared方法返回true进行下面操作
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 将ws设置为0, 然后唤醒线程h
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 对head进行延时初始化, 将node设置为tail
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//此时state为0, r为1
int r = tryAcquireShared(arg);
if (r >= 0) {
// 开始执行这里
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 唤醒线程之后会接着执行这一步
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
// 又是执行这个方法, 唤醒线程
doReleaseShared();
}
}
CountDownLatch的countDown不难理解,每调用一次就对state-1, 当state为0时, 换新之前挂起的线程, 源码比较难已理解的地方就是setHeadAndPropagate中为什么还要再次执行doReleaseShared(), 下面有一种Debug图解来分析一下.
如上图所示, 若多次执行await操作, 会多次执行addWaiter方法在链表中添加n+1个node,当执行count操作导致state为0是, 会依次唤醒链表中node节点的线程, 执行对应任务.
总结
1.CountDownLatch在工作中还是经常用的, 模拟多线程并发下的接口测试, 异步同步化, 一个线程完成任务同时通知多个线程, 计算汇总等待问题.
2.CyclicBarrier,Semaphore源码与CountDownLach差不多,用空对这两个类进行一下源码解读.