Kafka消费者

1 消费者概念

1.1 消费者与消费者组

应用程序--->kafka--->应用程序

生产者     主题      消费者

1. 上游应用程序将数据发送到主题中再由下游应用程序读取、验证数据。

2. 出现的可能性情况:生产者生产数据的速度超过消费者验证数据的速度

这个时候就可以使用消费者组,由消费者组订阅主题,消费者组中的每个消费者分别消费部分分区的数据,实现消费端的横向扩展

3. 需要注意的地方:消费者组中的消费者数量应不超过分区数,避免造成资源浪费

1.2 消费者组和分区再均衡

1. 什么是再均衡?

    分区的所有权限由一个消费者转向另一个消费者,这种行为就是再均衡。

2. 优势?

    实现了消费者组的高可用性和伸缩性

3. 不足?

    再均衡期间,消费者无法读取消息,造成整个群组一小段时间处于不可用的状态

    当分区被重新分给另一个消费者时,消费者当前的读取状态会丢失,有可能还需要刷新缓存,从而拖慢应用程序

4. 消费者与消费者组的通信

    消费者通过向被指派为`群组协调器`的broker发送心跳来维持他们和群组的从属关系以及他们对分区的所有权关系。只要消费者以正常时间间隔发送心跳,就认为活跃,否则就会进行再均衡。

5. 如果一个消费者发生崩溃,并停止读取消息,群组协调器会等待几秒钟,确认他死亡了才会触发再均衡。在此期间,死掉的消费者不会读取分区中的消息。在清理消费者是,消费者会通知协调器,此时会立即触发一次再均衡。

6. 如何分配分区?

    当有消费者加入群组时,他会向群组协调器发送一个JoinGroup请求。第一个加入群组的消费者成为“群主”。群主从协调器那里获得群组的成员列表,并负责给每个消费者分配分区。

2 创建消费者

2.1 创建所需的必选属性

属性描述

bootstrap.serversbroker的地址清单,地址格式:host:port

key.serializer键的序列化器

value.serializer值的序列化器

2.2 创建代码示例

producer.properties

bootstrap.servers=linux121:9092,linux122:9092,linux123:9092

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

kafka_customer

Propertiesprops=newProperties();

props.load(Demo1_Kafka_Consumer.class.getClassLoader().getResourceAsStream("consumer.properties"));

KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props);

2.3 订阅主题

consumer.subscribe(Collections.singletonList("linux-test-01"));

2.4 轮询

while(true) {

ConsumerRecords<String,String>records=consumer.poll(100);

for(ConsumerRecord<String,String>record:records)

System.out.printf("offset = %d, key = %s, value = %s ,partition = %d%n",record.offset(),record.key(),record.value(),record.partition());

}

2.5 配置信息

配置描述

fetch.min.bytes指定消费者从服务器获取记录的最小字节数。broker在收到消费者的数据请求时,如果可用的数据量小于该配置指定的数据量,会等到有足够的可用数据时才会把他返回给消费者。好处:可以降低消费者和broker的负载。

fetch.max.wait.ms指定broker的等待时间。默认500ms如果没有足够的数据流入kafka,消费者获取最小数据量又无法得到满足,最导致500ms的延迟。

max.partition.fetch.bytes指定服务器从每个分区里返回给消费者的最大字节数。默认为1M,即poll方法从每个分区里返回的记录最多不超过该属性的指定值。在位消费者分配内存时,可以给他多分配一些,因为如果群组中的消费者发生崩溃,剩下的消费者需要处理更多的分区,而该值又必须大于brocker可以接受的最大消息的字节数,否则会使消费者一直处于挂起状态。另外,还需要考虑消费者处理数据的时间。消费者需要频繁调用poll方法来避免会话过期和发生分区在均衡。如果单次调用poll返回的数据太多,消费者需要用更多的时间来处理数据,可能无法及时进行下一次轮询来避免会话过期。

session.timeout.ms指定了消费者在被认为死亡之前可以与服务器断开连接的时间。需要同时修改该属性和heartbeat.interval.ms,heartbeat.interval.ms必须小于该属性,一般是该属性的1/3。该属性的值越小,可以越快的检测和恢复崩溃的节点,不过长时间的轮询或垃圾收集可能导致非预期的再均衡。把该属性的值设置的大一些,可以减少意外的再均衡,不过检测节点崩溃需要更长的时间。

auto.offset.reset指定消费者在读取一个没有偏移量的分区或者偏移量无效(因消费者长时间失效,报刊偏移量的记录已经过时并被删除)的情况下该作何处理,值{latest(默认,从最近的消息开始读取),earliest(从开始读取)}

enable.auto.commit指定了消费者是否自动提交偏移量,默认为true,可以通过auto.commit.interval.ms来控制提交频率

partition.assignment.strategy设置选择分区策略。Range:把主题的若干个连续的分区分配给消费者。RoundRobin:把主题中所有分区逐个分配给消费者。

max.poll.records用于控制单词调用call()方法能够返回的记录数量,可以帮你控制在轮询里需要处理的数据量。

receive.buffer.bytes和send.buffer.bytes设置socket在读写数据时用到的TCP缓冲区的大小。如果为-1,则使用操作系统的默认值。如果生产者或者消费者与broker处于不同的数据中心内,可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和较低的宽带。

2.6 提交和偏移量

2.6.1 自动提交

poll方法中返回的是生产者写进去但是还没有被消费者所消费的这部分数据

消费者发消息(包含分区的偏移量)给_consumer_offset这个主题

自动提交设置:enable.auto.commit-->true

自动提交产生的问题:当当前处理的数据偏移量大于提交的数据偏移量的话会造成数据重复

解决办法:缩短提交偏移量的时间差,auto.commit.interval.ms

2.6.2 提交当前偏移量

publicstaticvoidmain(String[]args)throwsIOException{

Propertiesprops=newProperties();

props.load(Demo1_Kafka_Consumer.class.getClassLoader().getResourceAsStream("consumer.properties"));

KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props);

consumer.subscribe(Collections.singletonList("linux-test-01"));

while(true) {

ConsumerRecords<String,String>records=consumer.poll(100);

for(ConsumerRecord<String,String>record:records) {

System.out.printf("offset = %d, key = %s, value = %s ,partition = %d%n",record.offset(),record.key(),record.value(),record.partition());

try{

consumer.commitSync();

}catch(Exceptione) {

e.printStackTrace();

           }

       }

   }

}

不足:在broker对提交请求做出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。

2.6.3 异步提交

while(true) {

ConsumerRecords<String,String>records=consumer.poll(100);

for(ConsumerRecord<String,String>record:records) {

System.out.printf("offset = %d, key = %s, value = %s ,partition = %d%n",record.offset(),record.key(),record.value(),record.partition());

   }

// 提交最后一个偏移量,然后继续做其他的事

consumer.commitAsync();

}

在成功提交后碰到无法恢复的错误之前,commitSync()会一直重试,但是commitAsync不会,这个也是他的不足之处。


不重试的原因:他收到服务器相应的时候可能有一个更大的偏移量已经提交成功了。

commitAsync支持回调,在broker做出响应时执行回调,经常被用于记录提交错误或生成度量指标。


while(true) {

ConsumerRecords<String,String>records=consumer.poll(100);

for(ConsumerRecord<String,String>record:records) {

System.out.printf("offset = %d, key = %s, value = %s ,partition = %d%n",record.offset(),record.key(),record.value(),record.partition());

   }

// consumer.commitAsync();

consumer.commitAsync((offsets,exception)->{

if(exception!=null) {

System.out.println("Commit failed for offsets {}"+offsets+exception);

       }

   });

}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,826评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,968评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,234评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,562评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,611评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,482评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,271评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,166评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,608评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,814评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,926评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,644评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,249评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,866评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,991评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,063评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,871评论 2 354

推荐阅读更多精彩内容

  • 此篇开始进入kafka的另外一侧:消费者。kafka中的消费者比生产者要复杂的多,里面涉及到的消费组,偏移量等概念...
    绍圣阅读 1,915评论 0 0
  • 1、消费者和消费者组 消费者负责订阅Kafka中的主题,并从订阅的主题中拉取消息。与其他消息中间件不同的是:Kaf...
    rookie_yuqi阅读 552评论 0 0
  • 上一节讲到了如何通过构建ProducerRecord对象,选择对应的序列化方式发送数据到Kafka,这一节我们来讲...
    二向箔与歌者阅读 480评论 0 0
  • commit offset时可以附带一个string类型的metadata用于添加一些有关信息 也可以附带一个lo...
    意梦春秋阅读 958评论 0 1
  • 前言 读完本文,你将了解到如下知识点: kafka 的消费者 和 消费者组 如何正确使用kafka consume...
    zwb_jianshu阅读 856评论 0 0