一文精通kafka 消费者的三种语义

本文主要是以kafka 09的client为例子,详解kafka client的使用,包括kafka消费者的三种消费语义at-most-once, at-least-once, 和 exactly-once message ,生产者的使用等。

欢迎工作一到五年的Java工程师朋友们加入Java技术交流:611481448

群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!

(一) 创建topic

bin/kafka-topics--zookeeperlocalhost:2181--create--topicnormal-topic--partitions2--replication-factor1

(二) 生产者

publicclass ProducerExample {publicstaticvoidmain(String[] str) throws InterruptedException, IOException { System.out.println("Starting ProducerExample ..."); sendMessages(); }privatestaticvoidsendMessages() throws InterruptedException, IOException { Producer producer = createProducer(); sendMessages(producer);// Allow the producer to complete sending of the messages before program exit.Thread.sleep(20); }privatestaticProducer createProducer() { Properties props =newProperties(); props.put("bootstrap.servers","localhost:9092"); props.put("acks","all"); props.put("retries",0);// Controls how much bytes sender would wait to batch up before publishing to Kafka.props.put("batch.size",10); props.put("linger.ms",1); props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");returnnewKafkaProducer(props); }privatestaticvoidsendMessages(Producer producer) {Stringtopic ="normal-topic";intpartition =0;longrecord =1;for(inti =1; i <=10; i++) { producer.send(newProducerRecord(topic, partition, Long.toString(record),Long.toString(record++))); } }}

(三)消费者

消费者注册到kafka有多种方式:

subscribe:这种方式在新增topic或者partition或者消费者增加或者消费者减少的时候,会进行消费者组内消费者的再平衡。

assign:这种方式注册的消费者不会进行rebalance。

上面两种方式都是可以实现,三种消费语义的。具体API的使用请看下文。

1. At-most-once Kafka Consumer

做多一次消费语义是kafka消费者的默认实现。配置这种消费者最简单的方式是

1). enable.auto.commit设置为true。

2). auto.commit.interval.ms设置为一个较低的时间范围。

3). consumer.commitSync()不要调用该方法。

由于上面的配置,就可以使得kafka有线程负责按照指定间隔提交offset。

但是这种方式会使得kafka消费者有两种消费语义:

a.最多一次语义->at-most-once

消费者的offset已经提交,但是消息还在处理,这个时候挂了,再重启的时候会从上次提交的offset处消费,导致上次在处理的消息部分丢失。

b. 最少一次消费语义->at-least-once

消费者已经处理完了,但是offset还没提交,那么这个时候消费者挂了,就会导致消费者重复消费消息处理。但是由于auto.commit.interval.ms设置为一个较低的时间范围,会降低这种情况出现的概率。

代码如下:

publicclass AtMostOnceConsumer {publicstaticvoidmain(String[] str) throws InterruptedException { System.out.println("Starting AtMostOnceConsumer ..."); execute(); }privatestaticvoidexecute() throws InterruptedException { KafkaConsumer consumer = createConsumer();// Subscribe to all partition in that topic. 'assign' could be used here// instead of 'subscribe' to subscribe to specific partition.consumer.subscribe(Arrays.asList("normal-topic")); processRecords(consumer); }privatestaticKafkaConsumer createConsumer() { Properties props =newProperties(); props.put("bootstrap.servers","localhost:9092");StringconsumeGroup ="cg1"; props.put("group.id", consumeGroup);// Set this property, if auto commit should happen.props.put("enable.auto.commit","true");// Auto commit interval, kafka would commit offset at this interval.props.put("auto.commit.interval.ms","101");// This is how to control number of records being read in each pollprops.put("max.partition.fetch.bytes","135");// Set this if you want to always read from beginning.// props.put("auto.offset.reset", "earliest");props.put("heartbeat.interval.ms","3000"); props.put("session.timeout.ms","6001"); props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");returnnewKafkaConsumer(props); }privatestaticvoidprocessRecords(KafkaConsumer consumer) {while(true) { ConsumerRecords records = consumer.poll(100);longlastOffset =0;for(ConsumerRecord record : records) { System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); lastOffset = record.offset(); } System.out.println("lastOffset read: "+ lastOffset);process(); } }privatestaticvoidprocess() throws InterruptedException {// create some delay to simulate processing of the message.Thread.sleep(20); }}

2. At-least-once kafka consumer

实现最少一次消费语义的消费者也很简单。

1). 设置enable.auto.commit为false

2). 消息处理完之后手动调用consumer.commitSync()

这种方式就是要手动在处理完该次poll得到消息之后,调用offset异步提交函数consumer.commitSync()。建议是消费者内部实现密等,来避免消费者重复处理消息进而得到重复结果。最多一次发生的场景是消费者的消息处理完并输出到结果库(也可能是部分处理完),但是offset还没提交,这个时候消费者挂掉了,再重启的时候会重新消费并处理消息。

代码如下:

publicclass AtLeastOnceConsumer {publicstaticvoidmain(String[] str) throws InterruptedException { System.out.println("Starting AutoOffsetGuranteedAtLeastOnceConsumer ..."); execute(); }privatestaticvoidexecute() throws InterruptedException { KafkaConsumer consumer = createConsumer();// Subscribe to all partition in that topic. 'assign' could be used here// instead of 'subscribe' to subscribe to specific partition.consumer.subscribe(Arrays.asList("normal-topic")); processRecords(consumer); }privatestaticKafkaConsumer createConsumer() { Properties props =newProperties(); props.put("bootstrap.servers","localhost:9092");StringconsumeGroup ="cg1"; props.put("group.id", consumeGroup);// Set this property, if auto commit should happen.props.put("enable.auto.commit","true");// Make Auto commit interval to a big number so that auto commit does not happen,// we are going to control the offset commit via consumer.commitSync(); after processing // message.props.put("auto.commit.interval.ms","999999999999");// This is how to control number of messages being read in each pollprops.put("max.partition.fetch.bytes","135"); props.put("heartbeat.interval.ms","3000"); props.put("session.timeout.ms","6001"); props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");returnnewKafkaConsumer(props); }privatestaticvoidprocessRecords(KafkaConsumer consumer) throws {while(true) { ConsumerRecords records = consumer.poll(100);longlastOffset =0;for(ConsumerRecord record : records) { System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); lastOffset = record.offset(); } System.out.println("lastOffset read: "+ lastOffset);process();// Below call is important to control the offset commit. Do this call after you// finish processing the business process.consumer.commitSync(); } }privatestaticvoidprocess() throws InterruptedException {// create some delay to simulate processing of the record.Thread.sleep(20); }}

3. 使用subscribe实现Exactly-once

使用subscribe实现Exactly-once 很简单,具体思路如下:

1). 将enable.auto.commit设置为false。

2). 不调用consumer.commitSync()。

3). 使用subcribe定于topic。

4). 实现一个ConsumerRebalanceListener,在该listener内部执行

consumer.seek(topicPartition,offset),从指定的topic/partition的offset处启动。

5). 在处理消息的时候,要同时控制保存住每个消息的offset。以原子事务的方式保存offset和处理的消息结果。传统数据库实现原子事务比较简单。但对于非传统数据库,比如hdfs或者nosql,为了实现这个目标,只能将offset与消息保存在同一行。

6). 实现密等,作为保护层。

代码如下:

publicclass ExactlyOnceDynamicConsumer {privatestaticOffsetManager offsetManager =newOffsetManager("storage2");publicstaticvoidmain(String[] str) throws InterruptedException { System.out.println("Starting ExactlyOnceDynamicConsumer ..."); readMessages(); }privatestaticvoidreadMessages() throws InterruptedException { KafkaConsumer consumer = createConsumer();// Manually controlling offset but register consumer to topics to get dynamically// assigned partitions. Inside MyConsumerRebalancerListener use// consumer.seek(topicPartition,offset) to control offset which messages to be read.consumer.subscribe(Arrays.asList("normal-topic"),newMyConsumerRebalancerListener(consumer)); processRecords(consumer); }privatestaticKafkaConsumer createConsumer() { Properties props =newProperties(); props.put("bootstrap.servers","localhost:9092");StringconsumeGroup ="cg3"; props.put("group.id", consumeGroup);// Below is a key setting to turn off the auto commit.props.put("enable.auto.commit","false"); props.put("heartbeat.interval.ms","2000"); props.put("session.timeout.ms","6001");// Control maximum data on each poll, make sure this value is bigger than the maximum // single message sizeprops.put("max.partition.fetch.bytes","140"); props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");returnnewKafkaConsumer(props); }privatestaticvoidprocessRecords(KafkaConsumer consumer) {while(true) { ConsumerRecords records = consumer.poll(100);for(ConsumerRecord record : records) { System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());// Save processed offset in external storage.offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(), record.offset()); } } }}publicclass MyConsumerRebalancerListener implements org.apache.kafka.clients.consumer.ConsumerRebalanceListener {privateOffsetManager offsetManager =newOffsetManager("storage2");privateConsumer consumer;publicMyConsumerRebalancerListener(Consumer consumer) {this.consumer = consumer; }publicvoidonPartitionsRevoked(Collection partitions) {for(TopicPartition partition : partitions) { offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(), consumer.position(partition)); } }publicvoidonPartitionsAssigned(Collection partitions) {for(TopicPartition partition : partitions) { consumer.seek(partition, offsetManager.readOffsetFromExternalStore(partition.topic(), partition.partition())); } }}/**

* The partition offset are stored in an external storage. In this case in a local file system where

* program runs.

*/publicclass OffsetManager {privateStringstoragePrefix;publicOffsetManager(StringstoragePrefix) {this.storagePrefix = storagePrefix; }/**

* Overwrite the offset for the topic in an external storage.

*

* @param topic - Topic name.

* @param partition - Partition of the topic.

* @param offset - offset to be stored.

*/voidsaveOffsetInExternalStore(Stringtopic,intpartition,longoffset) {try{ FileWriter writer =newFileWriter(storageName(topic, partition), false); BufferedWriter bufferedWriter =newBufferedWriter(writer); bufferedWriter.write(offset +""); bufferedWriter.flush(); bufferedWriter.close(); }catch(Exception e) { e.printStackTrace();thrownewRuntimeException(e); } }/**

* @return he last offset + 1 for the provided topic and partition.

*/longreadOffsetFromExternalStore(Stringtopic,intpartition) {try{Stream stream = Files.lines(Paths.get(storageName(topic, partition)));returnLong.parseLong(stream.collect(Collectors.toList()).get(0)) +1; }catch(Exception e) { e.printStackTrace(); }return0; }privateStringstorageName(Stringtopic,intpartition) {returnstoragePrefix +"-"+ topic +"-"+ partition; }}

4. 使用assign实现Exactly-once

使用assign实现Exactly-once 也很简单,具体思路如下:

1). 将enable.auto.commit设置为false。

2). 不调用consumer.commitSync()。

3). 调用assign注册kafka消费者到kafka

4). 初次启动的时候,调用consumer.seek(topicPartition,offset)来指定offset。

5). 在处理消息的时候,要同时控制保存住每个消息的offset。以原子事务的方式保存offset和处理的消息结果。传统数据库实现原子事务比较简单。但对于非传统数据库,比如hdfs或者nosql,为了实现这个目标,只能将offset与消息保存在同一行。

6). 实现密等,作为保护层。

代码如下:

publicclass ExactlyOnceStaticConsumer {privatestaticOffsetManager offsetManager =newOffsetManager("storage1");publicstaticvoidmain(String[] str) throws InterruptedException, IOException { System.out.println("Starting ExactlyOnceStaticConsumer ..."); readMessages(); }privatestaticvoidreadMessages() throws InterruptedException, IOException { KafkaConsumer consumer = createConsumer();Stringtopic ="normal-topic";intpartition =1; TopicPartition topicPartition = registerConsumerToSpecificPartition(consumer, topic, partition);// Read the offset for the topic and partition from external storage.longoffset = offsetManager.readOffsetFromExternalStore(topic, partition);// Use seek and go to exact offset for that topic and partition.consumer.seek(topicPartition, offset); processRecords(consumer); }privatestaticKafkaConsumer createConsumer() { Properties props =newProperties(); props.put("bootstrap.servers","localhost:9092");StringconsumeGroup ="cg2"; props.put("group.id", consumeGroup);// Below is a key setting to turn off the auto commit.props.put("enable.auto.commit","false"); props.put("heartbeat.interval.ms","2000"); props.put("session.timeout.ms","6001");// control maximum data on each poll, make sure this value is bigger than the maximum // single message sizeprops.put("max.partition.fetch.bytes","140"); props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");returnnewKafkaConsumer(props); }/**

* Manually listens for specific topic partition. But, if you are looking for example of how to * dynamically listens to partition and want to manually control offset then see

* ExactlyOnceDynamicConsumer.java

*/privatestaticTopicPartition registerConsumerToSpecificPartition( KafkaConsumer consumer,Stringtopic,intpartition) { TopicPartition topicPartition =newTopicPartition(topic, partition); List partitions = Arrays.asList(topicPartition); consumer.assign(partitions);returntopicPartition; }/**

* Process data and store offset in external store. Best practice is to do these operations

* atomically.

*/privatestaticvoidprocessRecords(KafkaConsumer consumer) throws {while(true) { ConsumerRecords records = consumer.poll(100);for(ConsumerRecord record : records) { System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(), record.offset()); } } }}

[完]

喜欢小编轻轻点个关注吧!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容