摘要:在分布式系统中,“先存数据库还是先发消息”是一个经典的架构难题。特别是在 IM 系统的多媒体消息处理场景中,如果处理顺序不当,不仅会导致对象存储(OSS)中产生无法回收的“孤儿文件”,还会引发并发重复处理的问题。本文结合代码案例,探讨如何利用 Spring 的 TransactionSynchronizationManager 实现 事务提交后触发 (Trigger After Commit) 机制,优雅解决数据库与消息队列的“双写一致性”问题。
1. 引言:一个看似简单的顺序问题
在“信令与媒体分离”的架构中,核心流程通常如下:API 服务收到消息 -> 落库(标记为 Pending) -> 异步通知 Worker 搬运文件。
这一流程涉及两个异构系统的写操作:
- DB Write:将消息元数据写入 MySQL。
- MQ Write:将搬运任务发布到 NATS。
在实际开发中,直觉性的代码编写往往会陷入以下误区:
误区一:先发消息,后入库
// ❌ 错误示范
natsPublisher.publish(task); // 1. 消息发出,Worker 开始下载转存
messageRepository.save(message); // 2. 数据库报错(如字段超长、唯一键冲突)
// 后果:DB 回滚,业务无记录,但 OSS 中产生了一个永远无法被引用的“孤儿文件”。
误区二:在事务内发消息
// ❌ 错误示范
@Transactional
public void handle() {
messageRepository.save(message);
natsPublisher.publish(task);
// 3. 代码执行完毕,但在事务提交(Commit)的一瞬间数据库连接断开
}
// 后果:Worker 收到任务并完成处理,但在回调更新状态时发现 DB 中不存在该记录。
2. 核心方案:事务提交后的“惊险一跃”
为了保证 “只有数据库确确实实持久化成功了,才去触发异步任务”,最佳实践是利用 Spring 框架提供的事务同步机制。
以下是优化后的代码实现:
2.1 主业务逻辑
// 1. 准备阶段:预生成任务(纯内存操作,无副作用)
// 此时并没有真正发送 NATS 消息,只是构建了对象
List<MediaTransferTask> mediaTasks = prepareMediaTransferTasks(msg, ids.sessionId());
// 2. 构建消息实体
WxMessage message = buildMessage(msg, accountId, ids.sessionId(), ids.senderId());
try {
// 【核心步骤 A】数据库落库 (Source of Truth)
// 这是唯一的“事实来源”。如果这里失败,后续一切都不应发生。
messageRepository.save(message);
log.info("Message saved: id={}, wxid={}", message.getId(), message.getWxid());
// 3. 发布会话更新事件 (内存事件或 MQ)
SessionUpdateEvent event = SessionUpdateEvent.builder()
.accountId(accountId)
// ... build params
.build();
sessionEventPublisher.publishSessionUpdate(event);
// 【核心步骤 B】注册事务回调
// 关键点:这里不是立即发送,而是“预约”发送
publishMediaTransferTasksAfterCommit(mediaTasks);
return ProcessResult.success();
} catch (DataIntegrityViolationException e) {
// 【并发场景的保护】
// 如果两个线程同时处理同一条消息(如网络重放或客户端重试),
// 数据库的唯一索引会抛出此异常。
// 由于消息发送逻辑在事务提交后执行,失败的线程事务回滚,
// 因此“afterCommit”钩子不会被触发,完美避免了 Worker 重复搬运文件。
log.debug("Duplicate message (concurrent): {}", msg.getMessageId());
return ProcessResult.duplicate();
}
2.2 事务同步器的实现
publishMediaTransferTasksAfterCommit 方法利用了 Spring 的 TransactionSynchronizationManager 来挂载回调。
private void publishMediaTransferTasksAfterCommit(List<MediaTransferTask> tasks) {
if (CollectionUtils.isEmpty(tasks)) {
return;
}
// 判断当前是否在事务中
if (TransactionSynchronizationManager.isActualTransactionActive()) {
// 注册同步器
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
// 【真正发送的时机】
// 只有当 DB 事务成功 Commit 后,这一行才会执行
// 此时 DB 里一定有数据,Worker 回调一定能成功
tasks.forEach(mediaTransferPublisher::publishMediaTransfer);
log.debug("Async media tasks published after commit: size={}", tasks.size());
}
});
} else {
// 如果不在事务中(比如非事务方法调用),则立即发送(降级策略)
tasks.forEach(mediaTransferPublisher::publishMediaTransfer);
}
}
3. 深度解析:方案优势
3.1 杜绝“孤儿资源”
通过 afterCommit 钩子,严格保证了因果关系:因(DB落库成功) -> 果(触发搬运)。
如果 messageRepository.save(message) 因为任何原因(业务校验失败、数据库异常)导致事务回滚,afterCommit 回调将永远不会被执行,NATS 消息也就不会发出,从而从源头上避免了 OSS 资源的浪费。
3.2 天然的幂等性防护
代码中对 DataIntegrityViolationException 的捕获处理是该方案的另一大亮点。
在分布式场景下,消息重复投递是常见现象。
- 无保护模式:若未加控制,两个线程可能都会发出 NATS 消息,导致 Worker 下载上传两次同样的图片,浪费带宽和计算资源。
-
事务同步模式:数据库的唯一约束(Unique Key)充当了“守门员”。第二个线程在
save时会因冲突被拒绝,随之事务回滚。由于事务未成功提交,其注册的afterCommit钩子自动失效。最终,只有抢锁成功的线程才会发出唯一的一条异步任务。
4. 兜底策略:应对“反向不一致”
虽然该方案解决了“有文件没记录”的问题,但理论上仍存在极低概率的“反向不一致”:DB 提交成功了,但在执行 afterCommit 发送 NATS 消息的一瞬间,服务宕机或断电。
此时,数据库中存在一条状态为 PENDING 的记录,但永远不会有 Worker 来处理它。
为了达到金融级的一致性,系统应补充一个兜底补偿机制:
- 定时任务 (Compensation Job):每隔一定周期(如 5 分钟)扫描一次消息表。
-
筛选条件:
create_time < 5分钟前ANDmedia_status = 'PENDING'。 -
补偿动作:重新构建
MediaTransferTask并补发到 NATS。
5. 总结
在处理“数据库事务”与“外部系统调用(MQ/RPC)”混合的业务场景时,“事务同步器(Transaction Synchronization)” 是 Spring 体系中解决双写一致性问题的利器。
通过这一模式的重构,系统实现了:
- 资源一致性:杜绝了 OSS 孤儿文件。
- 并发安全性:利用数据库锁自动解决并发任务重复发布问题。
- 逻辑严密性:确保状态流转严格遵循业务时序。
核心原则:先落库,再提交,回调之中发消息。
本文由mdnice多平台发布