背景
- Kafka系列《一》-- 生产者Producer流程及Partition详解
- Kafka系列《二》-- 生产者Producer中的请求及源码详解
- Kafka系列《三》-- 生产者Producer中的幂等性
- Kafka系列《四》-- 生产者Producer中的事务性
- Kafka系列《五》-- 消费者Consumer流程概览
- Kafka系列《六》-- 消费者Consumer中的消费方案分配算法解析
- Kafka系列《七》-- 消费者Consumer中的重平衡解析
- Kafka系列《八》-- 消费者Consumer中的消费过程解析
- Kafka系列《九》-- 消费者Consumer中的消费session会话和transaction事务
与幂等性默认就支持不同,事务性是需要额外的API支持的,通常的模式是:
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "trans1");
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("kafka-k8s-test","Messages no: " + i ));
}
kafkaProducer.commitTransaction(); // kafkaProducer.abortTransaction();
事务性需要额外指定一个transactional.id
配置,然后调用事务的初始化、开始、提交/回退
整体流程
producer端的事务状态包括:UNINITIALIZED, INITIALIZING, READY, IN_TRANSACTION, COMMITTING_TRANSACTION, ABORTING_TRANSACTION, ABORTABLE_ERROR, FATAL_ERROR;
初始状态为
UNINITIALIZED
;通过initTransactions
这个API,事务状态会更新为INITIALIZING
,然后开始事务的初始化流程首先初始化
COORDINATOR
:在幂等性中介绍了producer id是从broken获取的,事务性是在幂等性基础上实现的,因此也需要一个producer id;不同的是,使用事务时,需要先与特定的broken协商事务状态,这个特定的broken就是COORDINATOR
;它是通过FIND_COORDINATOR
请求来确定的
Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=producer-trans1, correlationId=3, headerVersion=1) and timeout 30000 to node -1: FindCoordinatorRequestData(key='trans1', keyType=1, coordinatorKeys=[])
- broken收到这个请求以后,会根据配置的transactional id 来计算得到一个节点作为
COORDINATOR
,并返回这个节点的相关信息
Received FIND_COORDINATOR response from node -1 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=producer-trans1, correlationId=3, headerVersion=1): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=2, host='1.1.1.1', port=9092, coordinators=[])
- 然后初始化producer id:确定了
COORDINATOR
之后,事务相关的状态都由COORDINATOR
来维护了;最先需要确定的就是producer id了,仍然是通过INIT_PRODUCER_ID
请求由COORDINATOR
分配一个固定的producer id
;这个producer id 不会因为producer的重启而重新分配,只要transactional id不变,那么分配的producer id 就不会变;
Sending INIT_PRODUCER_ID request with header RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=1, clientId=producer-trans1, correlationId=7, headerVersion=1) and timeout 30000 to node 2: InitProducerIdRequestData(transactionalId='trans1', transactionTimeoutMs=60000, producerId=-1, producerEpoch=-1)
- 如果发生了producer重启,按照上面所说producer id是不会变的,但是epoch是会自增的;通过这种机制,能够确保每个transactional id只有一个producer处于活跃状态;对于producer id 相同但是 epoch更小的producer会自动被
COORDINATOR
淘汰;这里epoch初始仍为0,随着producer的重启每次都会自增
Received INIT_PRODUCER_ID response from node 2 for request with header RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=1, clientId=producer-trans1, correlationId=7, headerVersion=1): InitProducerIdResponseData(throttleTimeMs=0, errorCode=0, producerId=255083, producerEpoch=0)
经过上述的初始化
COORDINATOR
和producer id后,事务状态再转换为READY
;需要注意的是FIND_COORDINATOR
请求和INIT_PRODUCER_ID
请求都属于事务类型的请求,在事务类型的请求全部完成之前,是不会开始send方法发送数据的再通过
beginTransaction
API后,事务状态转换为IN_TRANSACTION
;这个状态只是producer端的状态转换,不涉及和COORDINATOR
的交互只有事务进入
IN_TRANSACTION
状态后,才可以往事务中添加topic partition;这个过程是在send方法添加批次数据到缓存中时完成的,往那些topic partition发送数据,就会将哪些topic partition添加到事务中在NIO线程每次轮询开始时,就会检测事务中有没有新添加的topic partition;如果存在新添加的topic partition,那么需要和
COORDINATOR
同步这个信息,通过ADD_PARTITIONS_TO_TXN
请求将新添加的topic partition以增量方式发送给COORDINATOR
;
Sending ADD_PARTITIONS_TO_TXN request with header RequestHeader(apiKey=ADD_PARTITIONS_TO_TXN, apiVersion=1, clientId=producer-trans1, correlationId=9, headerVersion=1) and timeout 30000 to node 2: AddPartitionsToTxnRequestData(transactions=[], v3AndBelowTransactionalId='trans1', v3AndBelowProducerId=255083, v3AndBelowProducerEpoch=0, v3AndBelowTopics=[AddPartitionsToTxnTopic(name='kafka-k8s-test', partitions=[1])])
-
ADD_PARTITIONS_TO_TXN
请求也是属于事务类型的请求,因此需要先等待这个请求完成后,NIO线程才会开始从缓存中拉去批次数据发送;正常收到这个请求的响应后,会将topic partition加入到partitionsInTransaction
集合中,同时清空新添加的topic partition;避免下次循环开始时重复检测
Received ADD_PARTITIONS_TO_TXN response from node 2 for request with header RequestHeader(apiKey=ADD_PARTITIONS_TO_TXN, apiVersion=1, clientId=producer-trans1, correlationId=9, headerVersion=1): AddPartitionsToTxnResponseData(throttleTimeMs=0, errorCode=0, resultsByTransaction=[], resultsByTopicV3AndBelow=[AddPartitionsToTxnTopicResult(name='kafka-k8s-test', resultsByPartition=[AddPartitionsToTxnPartitionResult(partitionIndex=1, partitionErrorCode=0)])])
- 在事务状态下,NIO从缓存中拉取批次数据时只会考虑那些已经加入到事务中的topic partition;即只会考虑
partitionsInTransaction
集合中的那些topic partition;不在集合中的topic partition会直接跳过
synchronized boolean isSendToPartitionAllowed(TopicPartition tp) {
return partitionsInTransaction.contains(tp);
}
- NIO线程正常拉取到批次数据后,会将是否事务批次写入到消息的固定头部中;用来标识这个批次的数据是事务中产生的,需要控制事务;正如之前所说,事务是在幂等性基础上实现的,因此也会写入producer id、epoch、序号等信息到固定头部
short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch, isDeleteHorizonSet);
buffer.putShort(position + ATTRIBUTES_OFFSET, attributes);
- 其它流程和幂等性中的完全一致,需要正常结束事务时通过
commitTransaction
API完成;调用这个API后,会先将事务状态更新为COMMITTING_TRANSACTION
,然后发送END_TXN
请求到COORDINATOR
,携带信息committed=true
表示正常提交事务
Sending END_TXN request with header RequestHeader(apiKey=END_TXN, apiVersion=1, clientId=producer-trans1, correlationId=13, headerVersion=1) and timeout 30000 to node 2: EndTxnRequestData(transactionalId='trans1', producerId=255083, producerEpoch=0, committed=true)
- 收到
COORDINATOR
响应后,再将事务状态更新为READY
;再次再次开启事务时,就不需要调用initTransactions
API了,直接使用beginTransaction
这个API即可
Received END_TXN response from node 2 for request with header RequestHeader(apiKey=END_TXN, apiVersion=1, clientId=producer-trans1, correlationId=13, headerVersion=1): EndTxnResponseData(throttleTimeMs=0, errorCode=0)
事务的回退则是通过
abortTransaction
API完成,调用这个API后,先将事务状态更新为ABORTING_TRANSACTION
状态;然后也是发送END_TXN
请求到COORDINATOR
,携带信息committed=false
表示事务回退事务中的异常处理就更加苛刻了,因为必须保证事务的原子性;如果producer发送数据过程中遇到
授权、ProducerFencedException
等异常会进入FATAL_ERROR
状态,这种状态是不可恢复的,事务在这种状态下也无法正常abort;只能重新启动producer;其它的异常producer也会尽可能的重试,重试不了的会进入
ABORTABLE_ERROR
状态;这种状态是可以正常abort的,abort之后事务可以由用户重新执行而不需要重启producer
COORDINATOR的作用
从上面的流程中可以看到producer也只是保证了producer端的事务状态流转,而事务的实际实现并没有在producer端,而是在COORDINATOR
端
而COORDINATOR
如何实现事务的就属于broken的功能了,不舒服producer的范畴了;具体的可以参考这些文档: