目录
目录.png
分布式事务整体文章
分布式事务解决方案
2PC, 两阶段提交
- 分为准备阶段,提交/回滚阶段
- 实现:XA方案
- 优点: 尽量保证了数据的强一致,适合对数据强一致要求很高的关键领域
- 缺点:会锁数据,有性能问题
3PC,三阶段提交
-
分为: CanCommit、PreCommit、DoCommit
3PC.png - 优点:相比二阶段提交,三阶段提交降低了阻塞范围,在等待超时后协调者或参与者会中断事务。避免了协调者单点问题,阶段 3 中协调者出现问题时,参与者会继续提交事务
- 缺点:数据不一致问题依然存在,当在参与者收到 preCommit 请求后等待 doCommit 指令时,此时如果协调者请求中断事务,而协调者无法与参与者正常通信,会导致参与者继续提交事务,造成数据不一致
TCC补偿事务
- 具体可以看 分布式事务之Seata对应的TCC实现
本地消息表
- eBay 提出,核心思路是将分布式事务拆分成本地事务进行处理,在业务侧增加本地消息表,跟业务侧的本地数据放在一个事务里面,就是本地事务,后续执行rpc,mq等业务,执行完成之后将本地消息事务状态设置为成功。定时任务扫描本地消息表没执行,或者执行超时的进行补偿,也就是最终一致性
MQ事务方案(可靠消息事务)
-
RocketMq实现
事务主动方发消息.png
1:发送方向 MQ 服务端(MQ Server)发送 half 消息
2:MQ Server 将消息持久化成功之后,向发送方 ack 确认消息已经发送成。
3:发送方开始执行本地事务逻辑。
4:发送方根据本地事务执行结果向 MQ Server 提交二次确认(commit 或是 rollback)
5:MQ Server 收到 commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 rollback 状态则删除半消息,订阅方将不会接受该消息
事务主动方消息恢复.png
- MQ服务端确认超时了
2:MQ Server 对该消息发起消息回查
3:发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果
4:发送方根据检查得到的本地事务的最终状态再次提交二次确认
5:MQ Server基于 commit/rollback 对消息进行投递或者删除
- 优点: 消息数据独立存储 ,降低业务系统与消息系统之间的耦合。
吞吐量好 - 缺点: 一次消息发送需要两次网络请求(half 消息 + commit/rollback 消息) 。
业务处理服务需要实现消息状态回查接口,只能用于MQ,需要MQ支持
最大努力送达
- 最大努力通知也称为定期校对
- 事务主动方尽最大努力(重试,轮询,可以通过本地消息表定时任务进行重试)将事务发送给事务接收方,但是仍然存在消息接收不到,此时需要事务被动方主动调用事务主动方的消息校对接口查询业务消息并消费,这种通知的可靠性是由事务被动方保证的
- 最大努力通知适用于业务通知类型,例如微信交易的结果,就是通过最大努力通知方式通知各个商户,既有回调通知,也有交易查询接口
SAGA事务
- 具体可以看 分布式事务之Seata对应的SAGA实现
分布式事务 servicecomb的saga原理
-
这里使用官方git的demo, booking -> car -> hotel
booking -> car -> hotel.png
最开始调用时先经过sagastart注解
- SagaStartAspect
@Around("execution(@org.apache.servicecomb.pack.omega.context.annotations.SagaStart * *(..)) && @annotation(sagaStart)")
Object advise(ProceedingJoinPoint joinPoint, SagaStart sagaStart) throws Throwable {
initializeOmegaContext();
if(context.getAlphaMetas().isAkkaEnabled() && sagaStart.timeout()>0){
SagaStartAnnotationProcessorTimeoutWrapper wrapper = new SagaStartAnnotationProcessorTimeoutWrapper(this.sagaStartAnnotationProcessor);
return wrapper.apply(joinPoint,sagaStart,context);
}else{
SagaStartAnnotationProcessorWrapper wrapper = new SagaStartAnnotationProcessorWrapper(this.sagaStartAnnotationProcessor);
return wrapper.apply(joinPoint,sagaStart,context);
}
}
private void initializeOmegaContext() {
context.setLocalTxId(context.newGlobalTxId());
}
- SagaStartAnnotationProcessor
public AlphaResponse preIntercept(int timeout) {
try {
return sender
.send(new SagaStartedEvent(omegaContext.globalTxId(), omegaContext.localTxId(), timeout));
} catch (OmegaException e) {
throw new TransactionalException(e.getMessage(), e.getCause());
}
}
SagaStartAnnotationProcessor远程调用alpha步骤1.2.3
- SagaLoadBalanceSender extends LoadBalanceSenderAdapter
@Override
public AlphaResponse send(TxEvent event) {
do {
final SagaMessageSender messageSender = pickMessageSender();
Optional<AlphaResponse> response = doGrpcSend(messageSender, event, new SenderExecutor<TxEvent>() {
@Override
public AlphaResponse apply(TxEvent event) {
return messageSender.send(event);
}
});
if (response.isPresent()) return response.get();
} while (!Thread.currentThread().isInterrupted());
throw new OmegaException("Failed to send event " + event + " due to interruption");
}
LoadBalanceSenderAdapter
// 轮询做负载
public <T> T pickMessageSender() {
return (T) senderPicker.pick(loadContext.getSenders(),
loadContext.getGrpcOnErrorHandler().getGrpcRetryContext().getDefaultMessageSender());
}
// 统一模板方法,回调子类自己的方法
public <T> Optional<AlphaResponse> doGrpcSend(MessageSender messageSender, T event, SenderExecutor<T> executor) {
AlphaResponse response = null;
try {
long startTime = System.nanoTime();
response = executor.apply(event);
loadContext.getSenders().put(messageSender, System.nanoTime() - startTime);
} catch (OmegaException e) {
throw e;
} catch (Exception e) {
LOG.error("Retry sending event {} due to failure", event, e);
loadContext.getSenders().put(messageSender, Long.MAX_VALUE);
}
return Optional.fromNullable(response);
}
booking调用car业务代码时@Compensable注解
- TransactionAspect
@Around("execution(@org.apache.servicecomb.pack.omega.transaction.annotations.Compensable * *(..)) && @annotation(compensable)")
Object advise(ProceedingJoinPoint joinPoint, Compensable compensable) throws Throwable {
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
// just check if we need to setup the transaction context information first
TransactionContext transactionContext = extractTransactionContext(joinPoint.getArgs());
if (transactionContext != null) {
populateOmegaContext(context, transactionContext);
}
// SCB-1011 Need to check if the globalTxId transaction is null to avoid the message sending failure
if (context.globalTxId() == null) {
throw new OmegaException("Cannot find the globalTxId from OmegaContext. Please using @SagaStart to start a global transaction.");
}
String localTxId = context.localTxId();
context.newLocalTxId();
LOG.debug("Updated context {} for compensable method {} ", context, method.toString());
int forwardRetries = compensable.forwardRetries();
// 可以有前向后向补偿,不过一般用default
RecoveryPolicy recoveryPolicy = RecoveryPolicyFactory.getRecoveryPolicy(forwardRetries);
try {
return recoveryPolicy.apply(joinPoint, compensable, interceptor, context, localTxId, forwardRetries);
} finally {
context.setLocalTxId(localTxId);
LOG.debug("Restored context back to {}", context);
}
}
- 其他代码类似sagastart也是需要上传给alpha, 这里了注意远程调用carbooking接口时会rpc filter拦截,header里面填充请求头加上全局事务id信息。全局事务id比如雪花算法snowflake可以生成。
alpha集群开启时
- 集群高可用保证,依赖于akka sharding 分片
-
参考文章14 15对akka描述更详细
alpha集群开启时.png
基于servicecomb的saga思考
- 高可用: 集群部署,akka fsm状态机高可用由akk保证
- 高性能: grpc通信
- 高可靠: 主要是服务于客户端的高可靠
一: 调用方a调用服务器b,此时如果b服务器宕机导致没有给事务协调器alpha发end事件如果处理?或者调用方宕机怎么办?
1. 依靠补偿接口有处理无效补偿事件的能力。执行分支事务与RPC发送结束事件本来就无法保证原子性。所以这里直接进行补偿接口调用,需要补偿接口写好。
2. 分布式事务发起方宕机没结束事件没关系,alpha扫描发现事务没结束事件可以直接考虑进行补偿了。反正本来响应也是异常返回给用户。异常事件时会等30秒左右再调用各个补偿接口防止有子事务为完成。
3. 服务发起方到事务协调器这块的网络有可能出现异常, 重试不行就抛异常,分布式事务还是使调用方耦合了事务协调器
二: 可配置化SagaStartAspect.sender(SagaMessageSender是接口有不同实现)。OmegaSpringConfig -> TransactionAspectConfig -> 此构造方法,springboot回去OmegaSpringConfig找实现SagaMessageSender的bean最终是sagaLoadBalanceSender,这样每个项目可根据需求实现不同策略
@Aspect
@Order(value = 100)
public class SagaStartAspect {
private final SagaStartAnnotationProcessor sagaStartAnnotationProcessor;
private final OmegaContext context;
public SagaStartAspect(SagaMessageSender sender, OmegaContext context) {
this.context = context;
this.sagaStartAnnotationProcessor = new SagaStartAnnotationProcessor(context, sender);
}
}
@Bean
SagaMessageSender sagaLoadBalanceSender(@Qualifier("sagaLoadContext") LoadBalanceContext loadBalanceSenderContext) {
final SagaMessageSender sagaMessageSender = new SagaLoadBalanceSender(loadBalanceSenderContext, new FastestSender());
sagaMessageSender.onConnected();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
sagaMessageSender.onDisconnected();
sagaMessageSender.close();
}
}));
return sagaMessageSender;
}
@Bean
SagaStartAspect sagaStartAspect(SagaMessageSender sender, OmegaContext context) {
return new SagaStartAspect(sender, context);
}
参考文章
- 分布式事务看这一篇就够了
- 分布式事务:2PC、3PC、SAGA、TCC
- Seata 分布式事务实践和开源详解 | GIAC 实录
- at与xa区别
- saga使用场景
- 可靠消息最终一致性(本地消息表)
- 云原生时代分布式事务
- 两天看完分布式事务
- 带你读透 SEATA 的 AT 模式
- 分布式事务 Seata AT模式原理与实战
- serviccomb-omega源码解读
- servicecomb-saga各个issue,优化
- Akka中文指南
- akka分片
- akka分片以及分片故障自愈
- servicecomb文档
- springboot事务回滚源码_分布式事务解决方案ServiceComb - 重点Alpha源码-让你找信心...
- 对比7种分布式事务方案,还是偏爱阿里开源的Seata,真香!