前言:
在传统的数据库应用中,数据库事务与消息的发送是独立的两个操作,如果数据库事务因为某种原因失败了,消息可能已经被发送出去了,这就导致了数据的不一致性。
方案1:下游服务过滤异常消息
在第3步中,如果发现分发的消息的业务唯一id在数据库不存在,就不执行业务操作
注意:这个并没有解决数据库事务和消息的一致性,只是从业务影响上,忽视程序异常(鸵鸟政策)
方案2:数据库事务提交之后,通过发布/订阅模式将消息发送给消息队列
private final ApplicationEventPublisher applicationEventPublisher;
/**
* 保存单据、并发送消息
*/
@Transactional(rollbackFor = Exception.class)
public void saveOrderAndPushMQ() {
//业务省略
expectedPayBillRepository.save(expectedPayBill);
//这里发送事件里的方法并不是立刻执行的
applicationEventPublisher.publishEvent(new BusinessTaskEvent("发送BusinessTask消息", expectedPayBill));
}
public class BusinessTaskEvent extends ApplicationEvent {
private static final long serialVersionUID = -6206752641309458207L;
public BusinessTask getBusinessTask() {
return businessTask;
}
private ExpectedPayBill expectedPayBill;
public BusinessTaskEvent(Object source, ExpectedPayBill expectedPayBill) {
super(source);
this.expectedPayBill = expectedPayBill;
}
}
真正发送事件
@Component
@RequiredArgsConstructor
@Slf4j
public class TaskListener {
/**
* 阿里云kafka
*/
private final AliYunKafkaProducer aliYunKafkaProducer;
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void createPayableStatementTask(BusinessTaskEvent businessTaskEvent) {
//业务省略
ExpectedPayBill expectedPayBill = businessTaskEvent.getExpectedPayBill();
// 发送消息触发任务执行
aliYunKafkaProducer.send(getMessageTopic(), JSON.toJSONString(expectedPayBill));
}
}
核心注解 @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
方案3:监听数据库binlog变更,再发送事件
方案 | 实现复杂度 | 优劣 |
---|---|---|
下游服务过滤异常消息 | 简单 | 这个并没有解决数据库事务和消息的一致性,容易给下游造成误导 |
数据库事务提交之后,通过发布/订阅模式将消息发送给消息队列 | 简单 | 需要依赖事务框架 |
监听数据库binlog变更,再发送事件 | 复杂 | 引入中间件,增加系统复杂度,但是业务和事件彻底解耦 |