摘要:在上一篇文章中,我们设计了一个基于 Actor 模式的“写缓冲(Write-Behind)”防抖系统,看似美好,但是还是有消息乱序与数据丢失的隐患。本文将详细记录 V2 版本的重构思路:通过引入 阻塞背压 (Blocking Backpressure)、延迟确认 (Deferred ACK) 和 事件循环 (Event Loop),构建一个更加健壮、严谨的防抖系统。
1. 背景与挑战:从“跑通”到“跑稳”
在构建即时通讯系统的会话列表时,核心矛盾在于海量消息吞吐与数据库有限写入能力之间的冲突。业务要求每条消息都能更新会话的 last_active_time 和 digest(摘要),但直接的数据库 UPDATE 操作会导致严重的写放大(Write Amplification)。
上期的 V1 采用了“流水线 + Actor”模式:利用内存队列暂存消息,通过 Map 进行去重合并,最后批量落库。这种方案有三个隐患:
-
消息乱序风险:利用
NAK(拒绝消息)进行限流,会导致 NATS 将消息重新投递。在重试过程中,旧消息可能排在新消息之后,破坏 FIFO 顺序,导致会话状态“时光倒流”。 -
数据丢失风险:消息一旦执行
ACK,若此时服务发生 OOM 或断电,内存中未落库的数据将永久丢失,违背了数据可靠性原则。 - 驱动模型冗余:依赖定时器(ScheduledExecutor)轮询,在低负载时存在空转资源浪费,在高负载时又难以做到极致的“贪婪消费”。
针对上述问题,V2 版本对核心数据流转逻辑进行了深度重构。
2. 核心重构一:流量控制 —— 弃用 NAK,拥抱阻塞 (Blocking Backpressure)
2.1 隐患分析:NAK 带来的“乱序风暴”
初始版本为了保持消费者线程的非阻塞特性,在队列满时选择直接 NAK 消息。NATS Server 收到 NAK 后会将消息重新加入待发送队列。
这引发了一个严重的逻辑漏洞:重试打破了顺序。
假设用户先后发送了消息 A(内容:Hello)和消息 B(内容:Bye)。若 A 被 NAK 而 B 成功入队,A 稍后被重试投递时,将排在 B 之后。系统处理时会误认为 A 是最新状态,导致会话摘要错误地显示为 "Hello" 而非 "Bye"。此外,频繁的 NAK 还会增加中间件的负载,甚至触发最大投递次数限制导致消息被丢弃。
2.2 解决方案:基于 TCP 的自然背压
V2 版本将接收队列替换为阻塞队列 (BlockingQueue),并使用 put() 操作替代 offer()。
-
机制:设置极小的队列容量(如 Size=8)。当消费者处理速度低于生产者时,队列瞬间填满。此时,NATS 消费线程在尝试
put时被操作系统挂起(Block)。 - 连锁反应:消费线程停止从 Socket 读取数据 -> 系统的 TCP 接收窗口(Receive Window)被填满 -> NATS Server 感知拥塞 -> 自动降低对该客户端的推送速率。
-
收益:
- 严格顺序:消息在服务端排队等待,进入系统的顺序永远是绝对的 FIFO。
- 零丢包:不再有 NAK,也就消除了因“重试次数超限”而被丢弃的风险。
3. 核心重构二:数据安全 —— 延迟确认 (Deferred ACK) 与原子性提交
3.1 隐患分析:过早 ACK 导致的数据“裸奔”
初始版本遵循 接收 -> 转换 -> ACK -> 入队 的流程。ACK 意味着“责任转移完成”。但在 Write-Behind 模式下,数据此时仅仅存在于易失性的内存中。一旦发生服务宕机,这部分已确认但未持久化的数据将彻底丢失。
3.2 解决方案:持有句柄的事务性处理
为了实现 At-Least-Once(至少一次) 的投递保证,ACK 的时机必须后移至数据库事务提交之后。这要求内存中的防抖容器(Map)不仅要存储业务数据,还必须持有原始消息的引用(Handle)。
-
数据结构升级:Map 的 Value 包装为
MessageWrapper,包含SessionUpdateEvent(业务对象)和Message(NATS 原生句柄)。 -
智能合并策略:
- 覆盖场景:当新消息更新了同一个 Session,旧消息的数据价值失效。此时应立即 ACK 旧消息,仅保留新消息在 Map 中。
-
持久化场景:仅当
flushAll()方法成功执行完数据库batchUpdate后,才遍历当前批次的所有 Message 执行ack()。 -
异常兜底:若入库失败,则对该批次所有 Message 执行
nak(),触发 NATS 重发,等待下一次处理。
4. 核心重构三:驱动模型 —— 单线程事件循环 (Event Loop)
4.1 隐患分析:定时器的局限性
使用 ScheduledExecutorService 存在天然的滞后性。即使队列瞬间被打满,消费者也必须等待定时器触发。且此前尝试的虚拟线程模型并不适合这种“单例、长期驻留、CPU 密集型(Map 操作)”的任务。
4.2 解决方案:贪婪的单线程 Loop
V2 版本采用了类似 Redis 或 Node.js 的 Single Thread Event Loop 模型。创建一个长期驻留的平台线程,运行一个永不停止的 while 循环。
-
贪婪消费:使用
queue.poll(timeout)。一旦有数据,立即唤醒处理;处理完一条后,继续循环尝试获取,最大化吞吐。 -
自带心跳:
poll的超时时间(如 500ms)即为“最大落库延迟”。如果系统空闲,线程休眠;一旦超时醒来,强制检查是否需要刷盘。 - 无锁设计:由于数据读取、Map 合并、数据库写入均在同一个线程内串行执行,彻底消除了并发竞争,既安全又高效。
5. 核心代码实现 (V2)
Talk is cheap. 下面是经过 V2 重构后的核心代码实现。请注意代码是如何通过 BlockingQueue 和 LinkedHashMap 的组合来实现上述流控与安全逻辑的。
5.1 整体结构与双队列定义
我们摒弃了复杂的第三方缓存组件,直接使用 JVM 内部数据结构,减少网络开销。
@Slf4j
@Component
public class SessionUpdateListener implements NatsConsumer {
// 【Queue A】接收队列:容量极小 (8),核心作用是建立"背压"
// 当消费者处理不过来时,这个队列会满,从而阻塞 NATS 客户端线程
private static final int RECEIVE_QUEUE_SIZE = 8;
private final BlockingQueue<Message> receiveQueue = new ArrayBlockingQueue<>(RECEIVE_QUEUE_SIZE);
// 【Queue B】防抖容器:LRU 模式,用于合并重复的 Session 更新
// accessOrder=true 确保我们能在容量满时挤出"最老"的数据
private static final int PENDING_MAP_SIZE = 1000;
private final LinkedHashMap<String, MessageWrapper> pendingMap =
new LinkedHashMap<>(16, 0.75f, true);
// 单线程处理器:系统的"心脏"
private final Thread processingThread;
public SessionUpdateListener(SessionUpdateService sessionUpdateService) {
this.sessionUpdateService = sessionUpdateService;
// 启动一个守护线程,专门负责"消费 -> 合并 -> 落库"的全流程
processingThread = new Thread(this::processLoop, "session-update-processor");
processingThread.setDaemon(true);
processingThread.start();
}
// 包装类:持有 NATS 原始句柄,直到入库成功才 ACK
private record MessageWrapper(Message message, SessionUpdateEvent event) {}
}
5.2 生产者:优雅的阻塞背压
这是 V2 版本最大的改变。我们不再使用 offer + NAK,而是直接使用 put。
@SneakyThrows
@Override
public void onMessage(Message msg) {
// 【关键点】阻塞式写入
// 如果 receiveQueue 满了,当前线程(NATS Client 线程)会在此挂起 (WAITING)
// 这会导致 TCP 接收窗口填满,进而让 NATS Server 自动降低推送速率
// 保证了所有消息严格 FIFO,不乱序,不丢弃
receiveQueue.put(msg);
}
5.3 消费者:贪婪的事件循环 (Event Loop)
我们移除了 ScheduledExecutor,改用 poll(timeout) 机制。这既保证了高负载下的实时消费,又充当了低负载下的心跳机制。
private void processLoop() {
while (true) {
try {
// 1. 贪婪获取:带 500ms 超时
// 有数据 -> 立即返回,微秒级延迟
// 无数据 -> 等待 500ms,相当于心跳间隔
Message msg = receiveQueue.poll(CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS);
if (msg != null) {
addToPendingMap(msg); // 进货:放入 Map 合并
} else {
flushAll(); // 心跳:超时没数据,强制刷盘,防止数据滞留
}
} catch (Exception e) {
if (e instanceof InterruptedException) {
throw (RuntimeException) e;
}
log.error("Error processing queue", e);
}
}
}
5.4 核心逻辑:智能合并与延迟 ACK
这是解决“乱序”与“丢数据”的关键逻辑。注意我们是如何处理 ACK 的:只 ACK 被覆盖的旧消息,保留最新消息的句柄直到落库。
private void addToPendingMap(Message msg) {
// ... 解析代码略 (ConvertUtils) ...
// 智能合并逻辑 (compute)
pendingMap.compute(sessionKey, (key, existing) -> {
if (existing != null) {
if (existing.event().getTimestamp() > event.getTimestamp()) {
// 【场景 A:乱序】
// 内存里的消息比新来的更"新",说明新消息是迟到的旧状态
// 策略:直接 ACK 新消息(丢弃),保留内存里的现有值
msg.ack();
return existing;
} else {
// 【场景 B:正常更新/覆盖】
// 新消息是最新的,旧消息已经没用了
// 策略:ACK 旧消息(它完成了历史使命),将新消息放入 Map
existing.message().ack();
return new MessageWrapper(msg, event);
}
}
// 【场景 C:新值】直接存入
return new MessageWrapper(msg, event);
});
// 兜底策略:如果 Map 满了,主动挤出最老的数据单独落库
if (pendingMap.size() >= PENDING_MAP_SIZE && !pendingMap.containsKey(sessionKey)) {
evictOldest();
}
}
5.5 提交阶段:原子性批量落库
最后,在刷盘阶段,我们严格遵循 先写库,后 ACK 的原则,实现了 At-Least-Once 语义。
private void flushAll() {
if (pendingMap.isEmpty()) return;
try {
// 1. 数据库批量写入 (Batch Insert/Update)
int updated = sessionUpdateService.batchCreateOrUpdateSessions(requests);
// 2. 只有入库成功,才对这批消息进行 ACK
// 如果 DB 报错,这里不会执行 ACK,NATS 会在超时后自动重发所有消息
for (MessageWrapper wrapper : pendingMap.values()) {
wrapper.message().ack();
}
pendingMap.clear();
log.debug("Batch updated {} sessions", updated);
} catch (Exception e) {
// 异常处理:日志记录
// 注意:这里没有显式调用 NAK,而是依靠 NATS 的 AckWait 机制自动重试
// 或者也可以在这里显式调用 nak() 加速重试
log.error("Batch update failed...", e);
}
}
6. 总结
从 V1 到 V2 的演进,体现了架构设计中从“功能实现”到“生产高可用”的思维转变:
- 一致性优先:在消息顺序和数据不丢面前,非阻塞的极致吞吐量是可以被权衡的。阻塞队列提供了系统自我保护的底线,防止了雪崩和乱序。
- 事务延伸:通过延迟 ACK,将中间件的消息生命周期与数据库事务强绑定,实现了跨系统的最终一致性。
- 架构极简:单线程模型在 I/O 密集型(数据库写)与计算密集型(Map 操作)混合的场景下,通过串行化设计去除了复杂的锁机制,反而提升了系统的稳定性和可维护性。
通过这次优化,系统成功填补了高并发下的并发陷阱,不仅解决了写放大问题,更确保了会话数据的精准与安全。
本文由mdnice多平台发布