描述
TCC是相对于AT比较偏上层。是针对接口而实现的。要求全局事务发起者接口添加@GlobalTransactional
,参与者接口prepare()添加@TwoPhaseBusinessAction
,并制定commit
方法和rollback
方法。
适合在暴露RPC接口时使用,在OutService层方法中添加@GlobalTransactional
开启全局事务。在InnerService层中方法中添加@TwoPhaseBusinessAction(name = "", commitMethod = "commit", rollbackMethod = "rollback")
注解指定commit和rollback方法
spring中配置GlobalTransactionScanner
初始化。该类实现AbstractAutoProxyCreator
、InitializingBean
。首先会执行wrapIfNecessary()
。
初始化
1.注册资源,封装TCCResource
并缓存。生成ResourceId,将ResourceId注册到TC
2.设置@TwoPhaseBusinessAction
的拦截器为TccActionInterceptor
,设置@GlobalTransactional
的拦截器为GlobalTransactionalInterceptor
一阶段
TM
TM处理流程和AT模式下一样,
1.开启全局事务,向TC注册全局事务并返回XID
2.如果业务执行成功,通知TC全局事务提交
3.如果业务执行失败,通知TC全局事务回滚
4.清除内存中XID
RM
注册分支事务,获取branchId。并将方法调用时的上下文发送给TC。执行业务逻辑
public Map<String, Object> proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction,
Callback<Object> targetCallback) throws Throwable {
Map<String, Object> ret = new HashMap<String, Object>(16);
//TCC name
String actionName = businessAction.name();
BusinessActionContext actionContext = new BusinessActionContext();
actionContext.setXid(xid);
//set action anme
actionContext.setActionName(actionName);
//TODO services
//Creating Branch Record
String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
actionContext.setBranchId(branchId);
//set the parameter whose type is BusinessActionContext
Class<?>[] types = method.getParameterTypes();
int argIndex = 0;
for (Class<?> cls : types) {
if (cls.getName().equals(BusinessActionContext.class.getName())) {
arguments[argIndex] = actionContext;
break;
}
argIndex++;
}
//the final parameters of the try method
ret.put(Constants.TCC_METHOD_ARGUMENTS, arguments);
//the final result
ret.put(Constants.TCC_METHOD_RESULT, targetCallback.execute());
return ret;
}
二阶段
TM通知TC全局提交/回滚。TC通知各分支事务。RM处理消息逻辑是RMHandlerTCC
里。
RM
提交和回滚逻辑一样。
1.用TC发过来的ResourceId查到TCResource,找到commit/rollback方法并执行
2.通知TC执行结果
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
if (tccResource == null) {
throw new ShouldNeverHappenException("TCC resource is not exist, resourceId:" + resourceId);
}
Object targetTCCBean = tccResource.getTargetBean();
Method commitMethod = tccResource.getCommitMethod();
if (targetTCCBean == null || commitMethod == null) {
throw new ShouldNeverHappenException("TCC resource is not available, resourceId:" + resourceId);
}
try {
boolean result = false;
//BusinessActionContext
BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
applicationData);
Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);
LOGGER.info(
"TCC resource commit result :" + ret + ", xid:" + xid + ", branchId:" + branchId + ", resourceId:"
+ resourceId);
if (ret != null) {
if (ret instanceof TwoPhaseResult) {
result = ((TwoPhaseResult)ret).isSuccess();
} else {
result = (boolean)ret;
}
}
return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
} catch (Throwable t) {
String msg = String.format("commit TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
LOGGER.error(msg, t);
throw new FrameworkException(t, msg);
}
}
引用
https://zhuanlan.zhihu.com/p/61981170
https://blog.csdn.net/hosaos/article/details/89847554