分布式事务解决方案的理论依据
- CAP理论
- BASE理论
- 2PC协议
- 3PC协议
- Paxos算法
- Raft一致性协议
分布式事务的几种解决方案
- 基于数据库XA/JTA协议的方式;(需要数据库厂商支持;JAVA组件有atomikos等;)
- 异步校对数据的方式;(支付宝/微信支付主动查询支付状态,对账单的形式)
- 基于可靠消息(MQ)的解决方案;(异步场景;通用性较强;拓展性较高;)
- TCC编程式解决方案;(Hmily等封装的DTX框架)
整体设计思路
- 可靠生产:保证消息一定发送到MQ服务;
- 可靠消费:保证消息取出来一定正确消费掉,达到最终多放数据达到一致;
可靠消息生产
- 记录消息发送
- 为了确保数据一定成功发送到MQ:
- 在同一事务中,增加一个记录表的操作,记录每一条发往MQ的数据以及它的发送状态;
- 修改消息发送状态
- 利用MQ发布确认机制(confirm)
- 开启确认发布确认机制后,MQ准确受理消息会返回回执;
- 以RabbitMQ为例(注意:kafka不提供可靠消息机制)
# 生产者开启消息发送确认机制
spring:
rabbitmq:
publisher-confirms: true
private final @NonNull RabbitTemplate amqpTemplate;
@PostConstruct
private void confirmCallBack() {
amqpTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (log.isDebugEnabled()) {
log.debug("MQ回调==>" + correlationData);
}
// ack为true,代表mq已经准确收到消息
if (ack) {
String id = correlationData.getId();
Assert.isTrue(!StringUtils.isEmpty(id), "correlationData id can not null.");
this.getBaseRepository()
.findById(id)
.flatMap(obj -> {
obj.setStatus(1);
return this.getBaseRepository().save(obj);
}).subscribe();
}
});
}
private void sentMsg(String json) {
CorrelationData correlationData = new CorrelationData();
correlationData.setId(json.id);
amqpTemplate.convertAndSend("msg.exchange","xxx",json,correlationData);
}
- 记录定时检测
- 如果出现回执没收到,消息状态修改失败等特殊情况:
- 兜底的方案:定时检查消息表,超时没发送成功,再次重发;
可靠消息处理
- 正常情况:
- 开启手动ACK模式,由消费者控制消息的重发/清除/丢弃;
- 幂等性;防止重复处理,一次用户操作,只对应一次数据处理;
- 幂等:根据ID或者业务数据,判断数据是否重复,重复就忽略;
- 消息重发:
- 消费者处理失败,需要MQ再次重发给消费者;
- 出现异常一般会重试几次,由消费者自身记录重试次数,并进行次数控制;
- 消息丢弃:
- 消费者处理失败,直接丢弃或者转移到死信队列(DLQ)
- 重试次数过多,消息内容格式错误等情况,通过线上预警机制通知运维人员-人工干涉;
# 消费者开启手动ACK,控制消息在MQ中的删除,重发...
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: MANUAL
try{
// 执行业务操作,同一个数据不能处理两次,根据业务情况去重,保证幂等行==>redis记录处理情况
channel.basicAck(tag,false);
}catch(Exception e){
// 异常情况: 根据需要去:重发/丢弃
// 重发一定次数后,丢弃,日志警告==>防止重发多次,导致死循环
channel.basicNack(tag,false,false);
// 系统关键数据,永远是有人工干预
}
总结
- 优点:
- 通用性强
- 拓展性强
- 方案成熟
- 缺点
- 基于消息中间件,只适合异步场景;
- 消息处理会有延迟,需要业务上能够容忍;
建议
- 不要使用分布式事务(尽量避免分布式事务,尽量将非核心事务做成异步)