Kafka系列《四》-- 生产者Producer中的事务性

背景

与幂等性默认就支持不同,事务性是需要额外的API支持的,通常的模式是:

properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "trans1");

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        kafkaProducer.initTransactions();
        kafkaProducer.beginTransaction();
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("kafka-k8s-test","Messages no: " + i ));
        }
        kafkaProducer.commitTransaction();  // kafkaProducer.abortTransaction();

事务性需要额外指定一个transactional.id配置,然后调用事务的初始化、开始、提交/回退

整体流程

producer端的事务状态包括:UNINITIALIZED, INITIALIZING, READY, IN_TRANSACTION, COMMITTING_TRANSACTION, ABORTING_TRANSACTION, ABORTABLE_ERROR, FATAL_ERROR;

  • 初始状态为UNINITIALIZED;通过initTransactions这个API,事务状态会更新为INITIALIZING,然后开始事务的初始化流程

  • 首先初始化COORDINATOR:在幂等性中介绍了producer id是从broken获取的,事务性是在幂等性基础上实现的,因此也需要一个producer id;不同的是,使用事务时,需要先与特定的broken协商事务状态,这个特定的broken就是COORDINATOR;它是通过FIND_COORDINATOR请求来确定的

Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=producer-trans1, correlationId=3, headerVersion=1) and timeout 30000 to node -1: FindCoordinatorRequestData(key='trans1', keyType=1, coordinatorKeys=[])

  • broken收到这个请求以后,会根据配置的transactional id 来计算得到一个节点作为COORDINATOR,并返回这个节点的相关信息

Received FIND_COORDINATOR response from node -1 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=producer-trans1, correlationId=3, headerVersion=1): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=2, host='1.1.1.1', port=9092, coordinators=[])

  • 然后初始化producer id:确定了COORDINATOR之后,事务相关的状态都由COORDINATOR来维护了;最先需要确定的就是producer id了,仍然是通过INIT_PRODUCER_ID请求由COORDINATOR分配一个固定的producer id;这个producer id 不会因为producer的重启而重新分配,只要transactional id不变,那么分配的producer id 就不会变;

Sending INIT_PRODUCER_ID request with header RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=1, clientId=producer-trans1, correlationId=7, headerVersion=1) and timeout 30000 to node 2: InitProducerIdRequestData(transactionalId='trans1', transactionTimeoutMs=60000, producerId=-1, producerEpoch=-1)

  • 如果发生了producer重启,按照上面所说producer id是不会变的,但是epoch是会自增的;通过这种机制,能够确保每个transactional id只有一个producer处于活跃状态;对于producer id 相同但是 epoch更小的producer会自动被COORDINATOR淘汰;这里epoch初始仍为0,随着producer的重启每次都会自增

Received INIT_PRODUCER_ID response from node 2 for request with header RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=1, clientId=producer-trans1, correlationId=7, headerVersion=1): InitProducerIdResponseData(throttleTimeMs=0, errorCode=0, producerId=255083, producerEpoch=0)

  • 经过上述的初始化COORDINATOR和producer id后,事务状态再转换为READY;需要注意的是FIND_COORDINATOR请求和INIT_PRODUCER_ID请求都属于事务类型的请求,在事务类型的请求全部完成之前,是不会开始send方法发送数据的

  • 再通过beginTransactionAPI后,事务状态转换为IN_TRANSACTION;这个状态只是producer端的状态转换,不涉及和COORDINATOR的交互

  • 只有事务进入IN_TRANSACTION状态后,才可以往事务中添加topic partition;这个过程是在send方法添加批次数据到缓存中时完成的,往那些topic partition发送数据,就会将哪些topic partition添加到事务中

  • 在NIO线程每次轮询开始时,就会检测事务中有没有新添加的topic partition;如果存在新添加的topic partition,那么需要和COORDINATOR同步这个信息,通过ADD_PARTITIONS_TO_TXN请求将新添加的topic partition以增量方式发送给COORDINATOR

Sending ADD_PARTITIONS_TO_TXN request with header RequestHeader(apiKey=ADD_PARTITIONS_TO_TXN, apiVersion=1, clientId=producer-trans1, correlationId=9, headerVersion=1) and timeout 30000 to node 2: AddPartitionsToTxnRequestData(transactions=[], v3AndBelowTransactionalId='trans1', v3AndBelowProducerId=255083, v3AndBelowProducerEpoch=0, v3AndBelowTopics=[AddPartitionsToTxnTopic(name='kafka-k8s-test', partitions=[1])])

  • ADD_PARTITIONS_TO_TXN请求也是属于事务类型的请求,因此需要先等待这个请求完成后,NIO线程才会开始从缓存中拉去批次数据发送;正常收到这个请求的响应后,会将topic partition加入到partitionsInTransaction集合中,同时清空新添加的topic partition;避免下次循环开始时重复检测

Received ADD_PARTITIONS_TO_TXN response from node 2 for request with header RequestHeader(apiKey=ADD_PARTITIONS_TO_TXN, apiVersion=1, clientId=producer-trans1, correlationId=9, headerVersion=1): AddPartitionsToTxnResponseData(throttleTimeMs=0, errorCode=0, resultsByTransaction=[], resultsByTopicV3AndBelow=[AddPartitionsToTxnTopicResult(name='kafka-k8s-test', resultsByPartition=[AddPartitionsToTxnPartitionResult(partitionIndex=1, partitionErrorCode=0)])])

  • 在事务状态下,NIO从缓存中拉取批次数据时只会考虑那些已经加入到事务中的topic partition;即只会考虑partitionsInTransaction集合中的那些topic partition;不在集合中的topic partition会直接跳过
synchronized boolean isSendToPartitionAllowed(TopicPartition tp) {
        return partitionsInTransaction.contains(tp);
    }
  • NIO线程正常拉取到批次数据后,会将是否事务批次写入到消息的固定头部中;用来标识这个批次的数据是事务中产生的,需要控制事务;正如之前所说,事务是在幂等性基础上实现的,因此也会写入producer id、epoch、序号等信息到固定头部
short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch, isDeleteHorizonSet);
buffer.putShort(position + ATTRIBUTES_OFFSET, attributes);
  • 其它流程和幂等性中的完全一致,需要正常结束事务时通过commitTransactionAPI完成;调用这个API后,会先将事务状态更新为COMMITTING_TRANSACTION,然后发送END_TXN请求到COORDINATOR,携带信息committed=true表示正常提交事务

Sending END_TXN request with header RequestHeader(apiKey=END_TXN, apiVersion=1, clientId=producer-trans1, correlationId=13, headerVersion=1) and timeout 30000 to node 2: EndTxnRequestData(transactionalId='trans1', producerId=255083, producerEpoch=0, committed=true)

  • 收到COORDINATOR响应后,再将事务状态更新为READY;再次再次开启事务时,就不需要调用initTransactionsAPI了,直接使用beginTransaction这个API即可

Received END_TXN response from node 2 for request with header RequestHeader(apiKey=END_TXN, apiVersion=1, clientId=producer-trans1, correlationId=13, headerVersion=1): EndTxnResponseData(throttleTimeMs=0, errorCode=0)

  • 事务的回退则是通过abortTransactionAPI完成,调用这个API后,先将事务状态更新为ABORTING_TRANSACTION状态;然后也是发送END_TXN请求到COORDINATOR,携带信息committed=false表示事务回退

  • 事务中的异常处理就更加苛刻了,因为必须保证事务的原子性;如果producer发送数据过程中遇到授权、ProducerFencedException等异常会进入FATAL_ERROR状态,这种状态是不可恢复的,事务在这种状态下也无法正常abort;只能重新启动producer;

  • 其它的异常producer也会尽可能的重试,重试不了的会进入ABORTABLE_ERROR状态;这种状态是可以正常abort的,abort之后事务可以由用户重新执行而不需要重启producer

COORDINATOR的作用

从上面的流程中可以看到producer也只是保证了producer端的事务状态流转,而事务的实际实现并没有在producer端,而是在COORDINATOR

COORDINATOR如何实现事务的就属于broken的功能了,不舒服producer的范畴了;具体的可以参考这些文档:

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350