1、功能介绍
TxnInterceptor代表了事务拦截器,有些拦截器仅仅适用于类型为ROOT的事务,比如:txnCommitter,txnMetricRecorder;除此之外的拦截器适用于ROOT或者LEAF类型的事务,比如: txnPipeliner, txnSpanRefresher,txnLockGatekeeper
TxnCommitter是一个TxnInterceptor,这个对象主要负责事务的提交和回滚的相关操作,并且这个拦截器在拦截器栈中的逻辑顺序是位于span refesh之后的。这个对象会去拦截 EndTransaction事务请求并且协调EndTransaction事务间的执行。对该对象的注释文档中,提到了事务
“并行提交”的概念,运用这个概念,从理念上来讲,可以消除整个RAFT共识往返的成本。然而,要想保证安全地执行该操作,可能会带来额外的编码复杂性。具体的做法是需要对事务的模型进行扩展,额外增加client-side的逻辑,还有对并发的控制并且还有事务恢复机制的支持。
满足并行提交的事务需要满足下面的两个条件之一:
1).如果一个事务已经记录了“COMMITED”状态,那么这个事务为“显示提交”
2).如果一个事务已经记录了“STAGING”状态并且在事务记录上,声明为“in-flight”的所有写意图(write intent)的时间戳等于或晚于事务提交的时间戳,那么这个事务为“隐式提交”
事务可以从满足隐式提交条件转换为满足显示提交条件。因为隐式事务的提交条件从一个分布式条件转移到一个本地事务的记录。所以,当以上两个条件满足其一的话,一个事务将除了会提交给自身之外,还会提交给所有一起并发的observers。txnCommitter拦截器的作用还包括了,确定在并行提交期间将执行的写操作集合,如果拦截器发现这些请求与end transaction事务请求出现在同一个batch中,拦截器会从写意图和查询意图请求中收集这个集合。此批处理中的写操作指示新的意图写操作,而查询意图请求指示先前的管道意图写操作尚未被证明是成功的。在发出批处理之前,txnCommitter将这个集合附加到最终事务请求。
当batch中的请求得到响应的时候,txnCommiter拦截器就会去收集这些结果。基于batch中得到的响应结果,拦截器会去判断这个事务是否已经满足了隐式提交条件从而完成了事务的commit操作。如果在一个batch中的事务都因为满足了隐式提交条件从而使得请求都得到了正确的响应(包括end transaction)。那么拦截器就会启动一个异步的任务,把事务的提交状态记录,从STAGING转换为COMMITTED状态。
如果一个batch中还在执行中的事务因为没有满足隐式提交条件而没有提交成功的话,这里有几个可能导致失败的原因可供参考:
1).对于写意图:由于一个类似于ConditionFailedError的错误逻辑产生导致的这些写意图请求失败。并且这些操作也可能会导致写意图是成功的但是写的时间戳不是所期望的结果因为这些写意图请求偶然写进了时间戳缓存(timestamp cache)或者事务提交了其他的值。在第一个例子中,txnCommitter拦截器将会得到一个错误结果,在第一个例子中,拦截器将会生成一个needTxnRetryAfterStaging.
2).Query intent:这些查询意图可能会失败的原因是因为它们发现了之前的写意图存在失败的,要么是因为从来没有留下一个查询意图,要么就是因为查询意图留下了一个过高的时间戳。在这个例子中,请求将会返回一个错误信息,是因为这些请求将返回一个错误,因为请求都有ErrorIfMissing选项集。它也将阻止写成功在不久的将来,这确保事务永远不会突然变得隐式提交后点由于写最终成功。
3).end txn: 这个请求产生TransactionRetryError错误的原因有很多种,例如如果一个事务的临时提交时间戳推迟了事务的读时间戳。
2、结构和名词解释
stopper: 提供了一种基于channel机制,来停止任意数量worker的对象。每一个worker通过RunWorker()方法来创建
wrapped:调用下一个txnInterceptor的SendLocked()
mu:互斥锁
3、上层入口示例
首先是调用实现了Sender接口的TxnCoordSender对象的Send方法:
方法中会涉及到把装有事务请求的batchRequest发送到拦截器栈的操作:
这里面的SendLocked方法是在lockedSender接口里面,事务的6种拦截器都实现了lockedSender接口和txnInterceptor接口,所以接下来的调用就会直接到txnCommitter拦截器代码逻辑里面
4、执行流程
方法第一步会先去判断是否有EndTransaction,如果没有则跳过这个拦截器的整体流程逻辑。
如果有的话,获取到这个EndTransaction对象。
判断我们是否可以完全省略掉EndTransaction,判断的依据是:事务是否是只读的(根据EndTransaction请求是否包含任何写操作来确定的)
如果EndTransaction的请求头还没有被设置的话,那么就分配事务的key作为它的key
判断事务的提交是否可以和batch中的其他写操作并行提交。
5.1 如果不可以的话:就会移动附加在EndTransaction请求中的in-flight事务的写操作,到IntentSpans中并且清除掉in-flight集合;
5.2可以的条件:如果EndTransaction请求中没有写操作的话,那么就会并发执行
6. 如果EndTransaction请求是一个回滚事务,就会省略掉接下来的逻辑处理
7. 将处理完逻辑的batch通过wrapped lockedSender发送。发送的时候解锁,然后重新锁定。SendLocked方法会返回一个batchResponse和一个错误对象:
7.1 如果发送过程中产生错误(错误对象不为空)
7.1.1
如果batch处理导致错误,但EndTransaction请求成功,则将事务记录暂存到进程中,并且将事务状态降级为PENDING。(因为事务并没有发生提交,因此不需要让拦截器栈中txnCommitter之上的其他拦截器知道记录正在暂存。)
8. 判断事务状态:
8.1 STAGING状态
8.2 COMMITTED状态:返回步骤7中产生的batchResponse
8.3 不是以上两种状态,报错
9. 判断事务是否需要进行重试或者进行刷新操作。如果事务需要进行重试或者刷新的话,就会返回,不执行下面的代码逻辑
10. 如果事务不需要进行重试代表着事务已经隐式提交了。把EndTransaction中的IntentSpans集合和in-flight中的写操作集合进行合并。
11. txnCommitter拦截器运行一个异步任务让事务从隐式提交(所有的写意图为STAGING状态)变为显示提交(COMMITTED状态)。
12. 设置batch响应中的事务状态为COMMITTED