Spring Boot 事务实战:如何优雅解决 DB 与 MQ 的“双写不一致”?

摘要:在分布式系统中,“先存数据库还是先发消息”是一个经典的架构难题。特别是在 IM 系统的多媒体消息处理场景中,如果处理顺序不当,不仅会导致对象存储(OSS)中产生无法回收的“孤儿文件”,还会引发并发重复处理的问题。本文结合代码案例,探讨如何利用 Spring 的 TransactionSynchronizationManager 实现 事务提交后触发 (Trigger After Commit) 机制,优雅解决数据库与消息队列的“双写一致性”问题。


1. 引言:一个看似简单的顺序问题

在“信令与媒体分离”的架构中,核心流程通常如下:API 服务收到消息 -> 落库(标记为 Pending) -> 异步通知 Worker 搬运文件

这一流程涉及两个异构系统的写操作:

  1. DB Write:将消息元数据写入 MySQL。
  2. MQ Write:将搬运任务发布到 NATS。

在实际开发中,直觉性的代码编写往往会陷入以下误区:

误区一:先发消息,后入库

// ❌ 错误示范
natsPublisher.publish(task); // 1. 消息发出,Worker 开始下载转存
messageRepository.save(message); // 2. 数据库报错(如字段超长、唯一键冲突)
// 后果:DB 回滚,业务无记录,但 OSS 中产生了一个永远无法被引用的“孤儿文件”。

误区二:在事务内发消息

// ❌ 错误示范
@Transactional
public void handle() {
    messageRepository.save(message);
    natsPublisher.publish(task); 
    // 3. 代码执行完毕,但在事务提交(Commit)的一瞬间数据库连接断开
}
// 后果:Worker 收到任务并完成处理,但在回调更新状态时发现 DB 中不存在该记录。

2. 核心方案:事务提交后的“惊险一跃”

为了保证 “只有数据库确确实实持久化成功了,才去触发异步任务”,最佳实践是利用 Spring 框架提供的事务同步机制。

以下是优化后的代码实现:

2.1 主业务逻辑

// 1. 准备阶段:预生成任务(纯内存操作,无副作用)
// 此时并没有真正发送 NATS 消息,只是构建了对象
List<MediaTransferTask> mediaTasks = prepareMediaTransferTasks(msg, ids.sessionId());

// 2. 构建消息实体
WxMessage message = buildMessage(msg, accountId, ids.sessionId(), ids.senderId());

try {
    // 【核心步骤 A】数据库落库 (Source of Truth)
    // 这是唯一的“事实来源”。如果这里失败,后续一切都不应发生。
    messageRepository.save(message);
    log.info("Message saved: id={}, wxid={}", message.getId(), message.getWxid());

    // 3. 发布会话更新事件 (内存事件或 MQ)
    SessionUpdateEvent event = SessionUpdateEvent.builder()
        .accountId(accountId)
        // ... build params
        .build();
    sessionEventPublisher.publishSessionUpdate(event);

    // 【核心步骤 B】注册事务回调
    // 关键点:这里不是立即发送,而是“预约”发送
    publishMediaTransferTasksAfterCommit(mediaTasks);

    return ProcessResult.success();

} catch (DataIntegrityViolationException e) {
    // 【并发场景的保护】
    // 如果两个线程同时处理同一条消息(如网络重放或客户端重试),
    // 数据库的唯一索引会抛出此异常。
    // 由于消息发送逻辑在事务提交后执行,失败的线程事务回滚,
    // 因此“afterCommit”钩子不会被触发,完美避免了 Worker 重复搬运文件。
    log.debug("Duplicate message (concurrent): {}", msg.getMessageId());
    return ProcessResult.duplicate();
}

2.2 事务同步器的实现

publishMediaTransferTasksAfterCommit 方法利用了 Spring 的 TransactionSynchronizationManager 来挂载回调。

private void publishMediaTransferTasksAfterCommit(List<MediaTransferTask> tasks) {
    if (CollectionUtils.isEmpty(tasks)) {
        return;
    }
    
    // 判断当前是否在事务中
    if (TransactionSynchronizationManager.isActualTransactionActive()) {
        // 注册同步器
        TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
            @Override
            public void afterCommit() {
                // 【真正发送的时机】
                // 只有当 DB 事务成功 Commit 后,这一行才会执行
                // 此时 DB 里一定有数据,Worker 回调一定能成功
                tasks.forEach(mediaTransferPublisher::publishMediaTransfer);
                log.debug("Async media tasks published after commit: size={}", tasks.size());
            }
        });
    } else {
        // 如果不在事务中(比如非事务方法调用),则立即发送(降级策略)
        tasks.forEach(mediaTransferPublisher::publishMediaTransfer);
    }
}

3. 深度解析:方案优势

3.1 杜绝“孤儿资源”

通过 afterCommit 钩子,严格保证了因果关系:因(DB落库成功) -> 果(触发搬运)
如果 messageRepository.save(message) 因为任何原因(业务校验失败、数据库异常)导致事务回滚,afterCommit 回调将永远不会被执行,NATS 消息也就不会发出,从而从源头上避免了 OSS 资源的浪费。

3.2 天然的幂等性防护

代码中对 DataIntegrityViolationException 的捕获处理是该方案的另一大亮点。
在分布式场景下,消息重复投递是常见现象。

  • 无保护模式:若未加控制,两个线程可能都会发出 NATS 消息,导致 Worker 下载上传两次同样的图片,浪费带宽和计算资源。
  • 事务同步模式:数据库的唯一约束(Unique Key)充当了“守门员”。第二个线程在 save 时会因冲突被拒绝,随之事务回滚。由于事务未成功提交,其注册的 afterCommit 钩子自动失效。最终,只有抢锁成功的线程才会发出唯一的一条异步任务。

4. 兜底策略:应对“反向不一致”

虽然该方案解决了“有文件没记录”的问题,但理论上仍存在极低概率的“反向不一致”:DB 提交成功了,但在执行 afterCommit 发送 NATS 消息的一瞬间,服务宕机或断电。

此时,数据库中存在一条状态为 PENDING 的记录,但永远不会有 Worker 来处理它。

为了达到金融级的一致性,系统应补充一个兜底补偿机制

  1. 定时任务 (Compensation Job):每隔一定周期(如 5 分钟)扫描一次消息表。
  2. 筛选条件create_time < 5分钟前 AND media_status = 'PENDING'
  3. 补偿动作:重新构建 MediaTransferTask 并补发到 NATS。

5. 总结

在处理“数据库事务”与“外部系统调用(MQ/RPC)”混合的业务场景时,“事务同步器(Transaction Synchronization)” 是 Spring 体系中解决双写一致性问题的利器。

通过这一模式的重构,系统实现了:

  1. 资源一致性:杜绝了 OSS 孤儿文件。
  2. 并发安全性:利用数据库锁自动解决并发任务重复发布问题。
  3. 逻辑严密性:确保状态流转严格遵循业务时序。

核心原则:先落库,再提交,回调之中发消息。

本文由mdnice多平台发布

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容