会话更新的防抖进化 —— 填补“乱序”与“丢数据”的深坑

摘要:在上一篇文章中,我们设计了一个基于 Actor 模式的“写缓冲(Write-Behind)”防抖系统,看似美好,但是还是有消息乱序与数据丢失的隐患。本文将详细记录 V2 版本的重构思路:通过引入 阻塞背压 (Blocking Backpressure)延迟确认 (Deferred ACK)事件循环 (Event Loop),构建一个更加健壮、严谨的防抖系统。


1. 背景与挑战:从“跑通”到“跑稳”

在构建即时通讯系统的会话列表时,核心矛盾在于海量消息吞吐数据库有限写入能力之间的冲突。业务要求每条消息都能更新会话的 last_active_timedigest(摘要),但直接的数据库 UPDATE 操作会导致严重的写放大(Write Amplification)。

上期的 V1 采用了“流水线 + Actor”模式:利用内存队列暂存消息,通过 Map 进行去重合并,最后批量落库。这种方案有三个隐患:

  1. 消息乱序风险:利用 NAK(拒绝消息)进行限流,会导致 NATS 将消息重新投递。在重试过程中,旧消息可能排在新消息之后,破坏 FIFO 顺序,导致会话状态“时光倒流”。
  2. 数据丢失风险:消息一旦执行 ACK,若此时服务发生 OOM 或断电,内存中未落库的数据将永久丢失,违背了数据可靠性原则。
  3. 驱动模型冗余:依赖定时器(ScheduledExecutor)轮询,在低负载时存在空转资源浪费,在高负载时又难以做到极致的“贪婪消费”。

针对上述问题,V2 版本对核心数据流转逻辑进行了深度重构。


2. 核心重构一:流量控制 —— 弃用 NAK,拥抱阻塞 (Blocking Backpressure)

2.1 隐患分析:NAK 带来的“乱序风暴”

初始版本为了保持消费者线程的非阻塞特性,在队列满时选择直接 NAK 消息。NATS Server 收到 NAK 后会将消息重新加入待发送队列。

这引发了一个严重的逻辑漏洞:重试打破了顺序
假设用户先后发送了消息 A(内容:Hello)和消息 B(内容:Bye)。若 A 被 NAK 而 B 成功入队,A 稍后被重试投递时,将排在 B 之后。系统处理时会误认为 A 是最新状态,导致会话摘要错误地显示为 "Hello" 而非 "Bye"。此外,频繁的 NAK 还会增加中间件的负载,甚至触发最大投递次数限制导致消息被丢弃。

2.2 解决方案:基于 TCP 的自然背压

V2 版本将接收队列替换为阻塞队列 (BlockingQueue),并使用 put() 操作替代 offer()

  • 机制:设置极小的队列容量(如 Size=8)。当消费者处理速度低于生产者时,队列瞬间填满。此时,NATS 消费线程在尝试 put 时被操作系统挂起(Block)。
  • 连锁反应:消费线程停止从 Socket 读取数据 -> 系统的 TCP 接收窗口(Receive Window)被填满 -> NATS Server 感知拥塞 -> 自动降低对该客户端的推送速率。
  • 收益
    • 严格顺序:消息在服务端排队等待,进入系统的顺序永远是绝对的 FIFO。
    • 零丢包:不再有 NAK,也就消除了因“重试次数超限”而被丢弃的风险。

3. 核心重构二:数据安全 —— 延迟确认 (Deferred ACK) 与原子性提交

3.1 隐患分析:过早 ACK 导致的数据“裸奔”

初始版本遵循 接收 -> 转换 -> ACK -> 入队 的流程。ACK 意味着“责任转移完成”。但在 Write-Behind 模式下,数据此时仅仅存在于易失性的内存中。一旦发生服务宕机,这部分已确认但未持久化的数据将彻底丢失。

3.2 解决方案:持有句柄的事务性处理

为了实现 At-Least-Once(至少一次) 的投递保证,ACK 的时机必须后移至数据库事务提交之后。这要求内存中的防抖容器(Map)不仅要存储业务数据,还必须持有原始消息的引用(Handle)。

  • 数据结构升级:Map 的 Value 包装为 MessageWrapper,包含 SessionUpdateEvent(业务对象)和 Message(NATS 原生句柄)。
  • 智能合并策略
    • 覆盖场景:当新消息更新了同一个 Session,旧消息的数据价值失效。此时应立即 ACK 旧消息,仅保留新消息在 Map 中。
    • 持久化场景:仅当 flushAll() 方法成功执行完数据库 batchUpdate 后,才遍历当前批次的所有 Message 执行 ack()
    • 异常兜底:若入库失败,则对该批次所有 Message 执行 nak(),触发 NATS 重发,等待下一次处理。

4. 核心重构三:驱动模型 —— 单线程事件循环 (Event Loop)

4.1 隐患分析:定时器的局限性

使用 ScheduledExecutorService 存在天然的滞后性。即使队列瞬间被打满,消费者也必须等待定时器触发。且此前尝试的虚拟线程模型并不适合这种“单例、长期驻留、CPU 密集型(Map 操作)”的任务。

4.2 解决方案:贪婪的单线程 Loop

V2 版本采用了类似 Redis 或 Node.js 的 Single Thread Event Loop 模型。创建一个长期驻留的平台线程,运行一个永不停止的 while 循环。

  • 贪婪消费:使用 queue.poll(timeout)。一旦有数据,立即唤醒处理;处理完一条后,继续循环尝试获取,最大化吞吐。
  • 自带心跳poll 的超时时间(如 500ms)即为“最大落库延迟”。如果系统空闲,线程休眠;一旦超时醒来,强制检查是否需要刷盘。
  • 无锁设计:由于数据读取、Map 合并、数据库写入均在同一个线程内串行执行,彻底消除了并发竞争,既安全又高效。

5. 核心代码实现 (V2)

Talk is cheap. 下面是经过 V2 重构后的核心代码实现。请注意代码是如何通过 BlockingQueueLinkedHashMap 的组合来实现上述流控与安全逻辑的。

5.1 整体结构与双队列定义

我们摒弃了复杂的第三方缓存组件,直接使用 JVM 内部数据结构,减少网络开销。

@Slf4j
@Component
public class SessionUpdateListener implements NatsConsumer {

  // 【Queue A】接收队列:容量极小 (8),核心作用是建立"背压"
  // 当消费者处理不过来时,这个队列会满,从而阻塞 NATS 客户端线程
  private static final int RECEIVE_QUEUE_SIZE = 8;
  private final BlockingQueue<Message> receiveQueue = new ArrayBlockingQueue<>(RECEIVE_QUEUE_SIZE);

  // 【Queue B】防抖容器:LRU 模式,用于合并重复的 Session 更新
  // accessOrder=true 确保我们能在容量满时挤出"最老"的数据
  private static final int PENDING_MAP_SIZE = 1000;
  private final LinkedHashMap<String, MessageWrapper> pendingMap =
    new LinkedHashMap<>(16, 0.75f, true);

  // 单线程处理器:系统的"心脏"
  private final Thread processingThread;

  public SessionUpdateListener(SessionUpdateService sessionUpdateService) {
    this.sessionUpdateService = sessionUpdateService;
    // 启动一个守护线程,专门负责"消费 -> 合并 -> 落库"的全流程
    processingThread = new Thread(this::processLoop, "session-update-processor");
    processingThread.setDaemon(true);
    processingThread.start();
  }
  
  // 包装类:持有 NATS 原始句柄,直到入库成功才 ACK
  private record MessageWrapper(Message message, SessionUpdateEvent event) {}
}

5.2 生产者:优雅的阻塞背压

这是 V2 版本最大的改变。我们不再使用 offer + NAK,而是直接使用 put

  @SneakyThrows
  @Override
  public void onMessage(Message msg) {
    // 【关键点】阻塞式写入
    // 如果 receiveQueue 满了,当前线程(NATS Client 线程)会在此挂起 (WAITING)
    // 这会导致 TCP 接收窗口填满,进而让 NATS Server 自动降低推送速率
    // 保证了所有消息严格 FIFO,不乱序,不丢弃
    receiveQueue.put(msg);
  }

5.3 消费者:贪婪的事件循环 (Event Loop)

我们移除了 ScheduledExecutor,改用 poll(timeout) 机制。这既保证了高负载下的实时消费,又充当了低负载下的心跳机制。

  private void processLoop() {
    while (true) {
      try {
        // 1. 贪婪获取:带 500ms 超时
        // 有数据 -> 立即返回,微秒级延迟
        // 无数据 -> 等待 500ms,相当于心跳间隔
        Message msg = receiveQueue.poll(CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS);
        
        if (msg != null) {
          addToPendingMap(msg); // 进货:放入 Map 合并
        } else {
          flushAll(); // 心跳:超时没数据,强制刷盘,防止数据滞留
        }
      } catch (Exception e) {
         if (e instanceof InterruptedException) {
          throw (RuntimeException) e;
        }
        log.error("Error processing queue", e);
      }
    }
  }

5.4 核心逻辑:智能合并与延迟 ACK

这是解决“乱序”与“丢数据”的关键逻辑。注意我们是如何处理 ACK 的:只 ACK 被覆盖的旧消息,保留最新消息的句柄直到落库。

  private void addToPendingMap(Message msg) {
    // ... 解析代码略 (ConvertUtils) ...

    // 智能合并逻辑 (compute)
    pendingMap.compute(sessionKey, (key, existing) -> {
      if (existing != null) {
        if (existing.event().getTimestamp() > event.getTimestamp()) {
          // 【场景 A:乱序】
          // 内存里的消息比新来的更"新",说明新消息是迟到的旧状态
          // 策略:直接 ACK 新消息(丢弃),保留内存里的现有值
          msg.ack();
          return existing;
        } else {
          // 【场景 B:正常更新/覆盖】
          // 新消息是最新的,旧消息已经没用了
          // 策略:ACK 旧消息(它完成了历史使命),将新消息放入 Map
          existing.message().ack();
          return new MessageWrapper(msg, event);
        }
      }
      // 【场景 C:新值】直接存入
      return new MessageWrapper(msg, event);
    });
    
    // 兜底策略:如果 Map 满了,主动挤出最老的数据单独落库
    if (pendingMap.size() >= PENDING_MAP_SIZE && !pendingMap.containsKey(sessionKey)) {
        evictOldest();
    }
  }

5.5 提交阶段:原子性批量落库

最后,在刷盘阶段,我们严格遵循 先写库,后 ACK 的原则,实现了 At-Least-Once 语义。

  private void flushAll() {
    if (pendingMap.isEmpty()) return;

    try {
      // 1. 数据库批量写入 (Batch Insert/Update)
      int updated = sessionUpdateService.batchCreateOrUpdateSessions(requests);
      
      // 2. 只有入库成功,才对这批消息进行 ACK
      // 如果 DB 报错,这里不会执行 ACK,NATS 会在超时后自动重发所有消息
      for (MessageWrapper wrapper : pendingMap.values()) {
        wrapper.message().ack();
      }
      pendingMap.clear();
      log.debug("Batch updated {} sessions", updated);
    } catch (Exception e) {
      // 异常处理:日志记录
      // 注意:这里没有显式调用 NAK,而是依靠 NATS 的 AckWait 机制自动重试
      // 或者也可以在这里显式调用 nak() 加速重试
      log.error("Batch update failed...", e);
    }
  }

6. 总结

从 V1 到 V2 的演进,体现了架构设计中从“功能实现”到“生产高可用”的思维转变:

  1. 一致性优先:在消息顺序和数据不丢面前,非阻塞的极致吞吐量是可以被权衡的。阻塞队列提供了系统自我保护的底线,防止了雪崩和乱序。
  2. 事务延伸:通过延迟 ACK,将中间件的消息生命周期与数据库事务强绑定,实现了跨系统的最终一致性。
  3. 架构极简单线程模型在 I/O 密集型(数据库写)与计算密集型(Map 操作)混合的场景下,通过串行化设计去除了复杂的锁机制,反而提升了系统的稳定性和可维护性。

通过这次优化,系统成功填补了高并发下的并发陷阱,不仅解决了写放大问题,更确保了会话数据的精准与安全。

本文由mdnice多平台发布

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容