kafka的应用实战

消息中间件能做什么

消费中间件用来解决分布式系统之间消息传递,从而实现应用程序之间的协同如异步化处理。

Java中使用kafka进行通信

同步发送和异步发送

异步发送

当消息发送到buffer队列时立即返回;当消息被确认后发生回调。

producer.send(new ProducerRecord<>(topic, msg), new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        
    }
});  
批量发送

batch.size:控制批量提交的字节数大小,当批量的消息大小超过这个大小,会统一发送。
linger.ms:控制发送时间的间隔,当批量的消息间隔超过这个值,会统一发送。
当两个都配置的话,只需要满足一个就批量发送。

同步发送

异步发送返回后阻塞,等消息完成则继续执行

RecordMetadata recordMetadata = producer.send(new ProducerRecord<>(topic, msg)).get()

基础配置分析

group.id

对消费者进行分组,同一个消费组只能有一个消费者来消费消息,不同组可以同时消费同一个消息。


image.png

enable.auto.commit

只有提交后的消息,下次才不能被接受到。
enable.auto.commit表示是否允许自动提交,若允许则可以通过auto.commit.interval.ms来控制提交频率;若不允许则可以通过consumer.commitSync()来手动提交

auto.offset.reset

针对有新groupid的消费者来消费指定的topic
auto.offset.reset=latest,新的消费者从其他消费者最后一个offset开始消费topic下的消息
auto.offset.reset= earliest:新的消费者从该topic最早的消息开始消费。

max.poll.records

限制每次调用poll返回的消息数

Topic和Partition

Topic

表示消息类别,是逻辑上的概念。

Partition

每个topic分为多个Partition,同一个topic下不同分区的消息是不同的,Offset表示消息在分区的偏移量即位置。


image.png
存储

以文件形式存储,命名规则是<topic_name>-<partition_id>


image.png

关于消息分发

自定义Partitioner

通过实现Partitioner来自定义分发策略

消息默认的分发机制

若key有值,则采用key的hash取模的分区算法;如果key为null,则metadata.max.age.ms范围内随机选择一个。这个值是默认十分钟更新一次。

消费端如何消费指定的分区

TopicPartition topicPartition=new TopicPartition(topic,0);
consumer.assign(Arrays.asList(topicPartition));

消息的消费原理

kafka消息消费原理演示

3个消费者3个分区

consumer1会消费partition0分区、consumer2会消费partition1分区、consumer3会消费 partition2分区

3个partiton对应2个consumer

consumer1会消费partition0/partition1分区、consumer2会消费partition2分区

3个partition对应4个或以上consumer

仍然只有3个consumer对应3个partition,其他的consumer无法消费消息

consumer和partition的数量建议

1 若consumer大于partition是浪费,因为一个分区只能被一个消费者消费,从而导致消费者空闲
2 若consumer小于等于partition,为了分配均匀,最好保证partition是consumer的整数倍。
3 若consumer只能保证一个partition的顺序是有序的,从多个partition因为读取数据是不一样的,从而使无序的。

什么时候会触发分区策略(rebalance)

1 消费者的数量发生变化:如新增一个消费者或去掉一个消费者
2 分区的数量发生变化

分区分配策略

定义

对同一个topic,同一个消费组,如何分配消费者和分区的对应关系,可以通过PARTITION_ASSIGNMENT_STRATEGY_CONFIG来指定分区策略

RangeAssignor(范围分区)

前提:同一个topic下
1 对分区按序号排序,对消费者按字母顺序排序
2 前m个消费者每个分配n+l个分区,后面的(消费者数量-m)个消费者每个分配n个分区:n = 分区数/消费者数量,m= 分区数%消费者数量
C1-0 将消费 0, 1, 2, 3 分区
C2-0 将消费 4, 5, 6 分区
C3-0 将消费 7, 8, 9 分区
弊端是一个topic时,c1-0多分配一个分区,若多个topic时,c1-0就会多分配一个分区。

RoundRobinAssignor(轮询分区)

1 把分区和消费者分别按照haseCode排序
2 通过轮询算法分配分区给消费者
假如按照 hashCode 排序完的topic-partitions组依次为T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我们的消费者线程排序为C1-0, C1-1, C2-0, C2-1,最后分区分配的结果为:
C1-0 将消费 T1-5, T1-2, T1-6 分区;
C1-1 将消费 T1-3, T1-1, T1-9 分区;
C2-0 将消费 T1-0, T1-4 分区;
C2-1 将消费 T1-8, T1-7 分区;

StrickyAssignor 分配策略

1 分区的分配尽可能的均匀,分区的分配尽可能和上次分配保持相同
2 当两者发生冲突时, 第 一 个目标优先于第二个目标

执行Rebalance以及管理consumer的group

coordinator(协调者)来执行对于consumer group的管理,当consumer group的第一个consumer启动的时候,它会去和kafka server确定谁是它们组的coordinator。之后该group内的所有成员都会和该coordinator进行协调通信。

如何确定coordinator

消费者向kafka集群中的任意一个broker发送一个GroupCoordinatorRequest请求,服务端会返回一个负载最小的broker节点的id,并将该broker设置为coordinator。

JoinGroup的过程

join阶段:消费者向coordinator发送joinGroup请求,coordinator选举一个cosumer为leader,并把组成员信息和订阅信息发送给消费者
1 选举算法:若消费组没有leader,则选择第一个加入消费组为消费者leader,若这是leader挂了,则随机选举一个leader。
2 每个消费者都可以设置自己的分区策略,coordinator会根据组内消费者的投票解决来实现消费组的分区分配。
Sync阶段:leader将消费者对应的partition分配方案同步给consumer group 中的所有consumer。

如何保存消费端的消费位置

什么是offset

offset表示分区中消息的唯一编号


image.png
offset在哪里维护

1 将offset信息保存在consumer_offsets_*的文件
2 通过 Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount 来计算位移在那个分区里; 由于默认情况下groupMetadataTopicPartitionCount有50个分区。

分区的副本机制

通过副本机制实现冗余,一个分区存在多个副本,leader副本实现所有请求的读写,剩余的是follower副本,主要从leader副本中同步消息日志。
可以通过replication-factor来创建副本,通过get /brokers/topics/secondTopic/partitions/1/state来获取各个分区中的leader是什么。

副本的leader选举

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350