分布式消息处理机制-kafka

安装kafka

  1. tar -zxvf
  2. 进入到config目录下修改server.properties
    broker.id
    listeners=PLAINTEXT://192.168.11.140:9092
    zookeeper.connect
  3. 启动
    sh kafka-server-start.sh -daemon ../config/server.properties
    sh kafka-server-stop.sh

zookeeper上注册的节点信息
cluster, controller, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, latest_producer_id_block, config

controller – 控制节点
brokers – kafka集群的broker信息 。 topic
consumer ids/owners/offsets

基本操作

image.png
image.png
image.png

http://kafka.apache.org/documentation/#quickstart

kafka的实现细节

消息是kafka中最基本的数据单元,消息由一串字节构成,其中主要由key和value构成,key和value都是byte数组。key的主要作用是根据一定的策略,将消息路由到指定的分区,这样就可以保证包含同一key的消息全部写入到同一个分区内,key可以是null。为了提高网络的存储和利用率,生产者会批量发送消息到kafka,并在发送之前消息进行压缩

topic&partitiion

  • Topic是用于存储消息的逻辑概念,可以看做一个消息集合。每个topic可以有多个生产者向其推送消息,也可以有任意多个消费者消费其中的消息
  • 每个topic可以划分多个分区(每个Topic至少有一个分区),同一topic下的不同分区包含的消息是不同的。每个消息在被添加到分区时,都会被分配一个offset(称之为偏移量),他是消息在此分区中的唯一编号,kafka通过offset保证消息在分区内的顺序,offset的顺序不跨分区,即kafka只保证在同一分区内的消息是有序的;
image.png
image.png
image.png

Partition是以文件的形式存储在文件系统中,存储在kafka-log目录下,命名规则是:<topic_name>-<partition_id>

kafka的高吞吐量的因素

  1. 顺序写的方式存储数据;
  2. 批量发送;在异步发送模式中,kafka允许进行批量发送,也就是先讲消息缓存到内存中,然后一次请求批量发送出去。这样减少了磁盘频繁IO以及网络IO造成的性能瓶颈。
    batch.size 每批次发送的数据大小
    linger.ms 间隔时间
  3. 零拷贝
    消息从发送到落地保存,broker维护的消息日志本身就是文件目录,每个文件都是二进制保存,生产者和消费者使用相同的格式来处理。在消费者获取消息时,服务器先从硬盘读取数据到内存,然后把内存中的数据原封不动的通过socket发送给消费者,虽然这个操作描述起来很简单,但实际上经历了很多步骤
  • 操作系统将数据从磁盘读入到内核空间的页缓存
  • 应用程序将数据从内核空间读入到用户空间缓存中
  • 应用程序将数据写回到内核空间到socket缓存中
  • 操作系统将数据从socket缓冲区复制到网卡缓冲区,以便将数据经网络发出


    image.png

通过“零拷贝”技术可以去掉这些没必要的数据复制操作,同时也会减少上下文切换次数


image.png

日志策略

日志保留策略

  • 无论消费者是否已经消费了消息,kafka都会一直保存这些消息,但并不会像数据库那样长期保存,为了避免磁盘被占满,kafka会配置相应的保留策略(retention policy),以实现周期性地删除陈旧的消息

kafka有两种“保留策略”:

  1. 根据消息保留的时间,当消息在kafka中保存的时间超过了指定时间,就可以被删除;
  2. 根据topic存储的数据大小,当topic所占的日志文件大小大于一个阈值,则可以开始删除最旧的消息

日志压缩策略

在很多场景中,消息的key与value的值之间的对应关系是不断变化的,就像数据库中的数据会不断被修改一样,消费者只关心key对应的最新的value。我们可以开启日志压缩功能,kafka定期将相同key的消息进行合并,只保留最新的value值


image.png

消息可靠性机制

消息可靠性
生产者发送消息到broker,有三种确认方式(request.required.acks)

  • acks = 0:producer不会等待broker(leader)发送ack。因为发送消息网络超时或broker crash(1.Partition的Leader还没有commit消息 2. Leader与Follower数据不同步),既有可能丢失也可能会重发。
  • acks = 1:当leader接收到消息之后发送ack,丢会重发,丢的概率很小。
  • acks = -1:当所有的follower都同步消息成功后发送ack,丢失消息可能性比较低。

消息存储可靠性

  • 每一条消息被发送到 broker 中,会根据 partition 规则选择被存储到哪一个partition。如果 partition 规则设置的合理,所有消息可以均匀分布到不同的 partition 里,这样就实现了水平扩展。
  • 在创建topic时可以指定这个topic对应的 partition 的数据。在发送一条消息时,可以指定这条消息的key, producer 根据这个 key 和 partition机制来判断这个消息发送到哪个 partition。
  • kafka的高可靠性的保障来自于另一个叫副本(replication)策略,通过设置副本的相关参数,可以使kafka在性能和可靠性之间做不同的切换。

高可靠性的副本

sh kafka-topics.sh --create --zookeeper 192.168.11.140:2181 
--replication-factor 2  --partitions 3 --topic sixsix 
  • --replication-factor 表示的副本数

副本机制
ISR(副本同步队列)
维护的是有资格的 follower 节点

  1. 副本的所有节点都必须要和 zookeeper 保持连接状态
  2. 副本的最后一条消息的 offset 和 leader 副本的最后一条消息的 offset 之间的差值不能超过指定的阈值,这个阈值是可以设置的(replica.lag.max.messages)

HW&LEO
关于 follower 副本同步的过程中,还有两个关键的概念,HW(HighWatermark)和 LEO(Log End Offset) 这两个参数跟 ISR 集合紧密关联。

  • HW标记了一个特殊的 offset,当消费者处理消息的时候,只能拉取到 HW 之前的消息,HW之后的消息对消费者来说是不可见的。也就是说,取 partition 对应 ISR中最小的 LEO 作为 HW,consumer 最多只能消费到 HW所在的位置。每个 replica 都有 HW,leader 和 follower 各自维护更新自己的 HW 的状态。对于leader新写入的消息,consumer不能立刻消费,leader 会等待该消息被所有ISR中的 replicas 同步更新 HW ,此时消息才能被 consumer 消费。 这样就保证了如果leader副本损坏,该消息仍然可以从新选举的leader 中获取
  • LEO是所有副本都会有的一个offset 标记,它指向追加到当前副本的最后一个消息的 offset,当生产者向 leader 副本追加消息的时候, leader 副本的LEO 标记就会递增; 当follower 副本成功从leader副本拉取消息并更新到本地的时候,follower的副本的LEO就会增加

查看kafka数据文件内容

在使用kafka的过程中有时候需要我们查看产生的消息的信息,这些都被记录在kafka的log文件中。由于log 文件的特殊格式,需要通过kafka提供的工具来查看

./bin/kafka-run-class.sh kafka.tools.DumpLogSegments 
--files /tmp/kafka-logs*/000**.log  --print-data-log{查看消息内容}

文件存储机制

存储机制
在kafka文件存储中,同一个topic下有多个不同的partition,每个partition为一个目录,partition的名称规则为:topic名称 + 有序序号,第一个序号从0开始,最大的序号为partition数量减1,partition是实际上物理上的概念,而topic是逻辑上的概念
partition还可以细分为segment,这个segment是什么呢?假设kafka以partition为最小存储单位,那么我们可以想象当kafka的producer不断发送消息,必然会引起partition文件的无限扩张,这样对于消息文件的维护以及被消费的消息的清理带来非常大的挑战,所以kafka以segment为单位又把partition进行细分。每个partition相当于一个巨型文件被平均分配到多个大小相等的segment数据文件中(每个segment文件中的消息不一定相等)。这种特性方便已经被消费的清理,提高磁盘的利用率
segment file组成:由2大部分组成,分别为index file 和data file,此2个文件一一对应,成对出现,后缀“.index"和“.log”分别表示segment索引文件、数据文件。
segment文件命名规则:partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset。数值最大为64位long大小,19位数字字符长度,没有数字用0填充

image.png
image.png

查找方式

以上图为例,读取offset=170218的消息,首先查找segment文件,其中00000000000000000000.index为最开始的文件,第二个文件为00000000000000170410.index(起始偏移为170410+1=170411),而第三个文件为00000000000000239430.index(起始偏移为239430+1=239431),所以这个offset=170418就落到了第二个文件之中。其他后续文件可以依次类推,以其实偏移量命名并排列这些文件,然后根据二分查找法就可以快速定位到具体文件位置,其次根据00000000000000170410.index文件中的[8,1325]定位到00000000000000170410.log文件中的1325的位置进行读取。

消息确认的几种方式

自动提交

image.png

手动提交
手动异步提交
consumer. commitASync() //手动异步ack
手动同步提交
consumer. commitSync() //手动异步ack

指定消费某个分区的消息

image.png

消息的消费原理

之前kafka存在的一个非常大的性能隐患就是利用ZK来记录各个Consumer Group的消费进度(offset)。当然JVM Client 帮我们自动做了这些事情,但是Consumer需要和ZK频繁交互,则利用ZK Client API对ZK频繁写入是一个低效的操作,并且从水平扩展性上来讲也存在问题,所以ZK抖一抖,集群吞吐量就跟着一起抖,严重的时候简直抖的停不下来。
新版的kafka已推荐将consumer的文艺信息保存在Kfaka内部的topic,即_consumer_offsets_topic.通过以下操作来看看_consumer_offsets_topic是怎么存储消费进度的,_consumer_offset_topic 默认有50个分区

  1. 计算 consumer group对应的hash值


    image.png

2.获得 consumer group 的位移信息

bin/kafka-simple-consumer-shell.sh 
--topic __consumer_offsets --partition 15 
-broker-list 192.168.11.140:9092,192.168.11.141:9092,192.168.11.138:9092   
--formatter kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter

kafka的分区分配策略

在kafka中每个topic一般会有很多个partitions。为了提高消息的消费速度,我们可能会启动多个consumer去消费;同时,kafka存在consumer group的概念,也就是group.id一样的consumer,这些consumer属于一个consumer group,组内的所有消费者协调在一起来消费订阅主题的所有分区,当然每一个分区只能由同一个消费组内的consumer来消费,那么同一个consumer group里面的consumer是怎么去分配该消费那个分区里的数据,这个就涉及到了kafka内部分区分配策略(Partition Assignment Strategy)
在 Kafka内部存在两种默认的分区分配策略:Range(默认)和 RoundRobin。通过:partition.assignment.strategy指定

consumer rebalance

当以下事件发生时,Kafka将会进行一次分区分配:

  1. 同一个consumer group内新增了消费者
  2. 消费者离开当前所属的consumer group,包括shuts down 或 crashes
  3. 订阅的主题新增分区(分区数量发生变化)
  4. 消费者主动取消对某个topic的订阅
  5. 也就是说,把分区的所有权从一个消费者移动到另外一个消费者上,这个是kafka consumer 的 rebalance机制,如何 rebalance就涉及到前面说的分区分配策略

两种分区策略
Range 策略(默认)
0 ,1 ,2 ,3 ,4,5,6,7,8,9
c0 [0,3] c1 [4,6] c2 [7,9]
10(partition num/3(consumer num) =3
roundrobin 策略
0 ,1 ,2 ,3 ,4,5,6,7,8,9
c0,c1,c2
c0 [0,3,6,9]
c1 [1,4,7]
c2 [2,5,8]
kafka 的key 为null, 是随机{一个Metadata的同步周期内,默认是10分钟}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容