Kafka 可靠机制

kafka 作为一种开源的分布式消息队列, 在生产环境中被大量的使用. 主要用于对系统的流量削峰填谷和做一些系统架构上的解耦.

相关名词

首先介绍一下kafka相关的名词:

  • broker: 一般一个kafka集群有多个broker节点, broker集群组成了kafka集群

  • topic: 逻辑上存在的概念, 数据投递到某个topic

  • partition: 一个topic的数据由一个或多个partition保存, 每个partition是一个有序的队列, 同时每个partition都有备份的partition, 一组partition中会选举出一个leader partition用来做读写的操作, 这个leader partition用来跟consumer 和 provider进行数据交互, 这也是leader-follower模式的好处, 能够保证数据的有序性. follower partition会去leader partition中fetch数据. 如果一旦leader partition出现故障, follower partition中还有备份数据, 可以从follower partition中再选举出一个leader partition继续提供服务.当然这里还有一个问题就是, 如果leader partition发生故障以后, 其他follower partition没能fetch到最新的数据, 有一部分数据丢失的话这怎么办? 我们先把问题抛出来, 后面在介绍.

  • segment: partition由多个segment组成. partition只是一个文件夹, 实际的数据保存在segment中. segment包含两种文件, 索引文件(.index)和数据文件(.log), 在索引文件的命名是最后一条消息的偏移量, 数据文件的命名和索引文件保持一致.

  • offset: 消息消费进度的偏移值

  • consumer: 消息消费者

  • consumer group: 消息消费者组, 一条消息可以被多个consumer group消费, 一个consumer group只能有一个consumer消费消息

  • replicas: patition的副本, 保障partition的高可用

  • leader: replicas的一个角色, 一个partition只有一个leader, 且通过这个leader partition和生产者, 消费者进行交互

  • follower: replicas中的一个角色, follower partition是 leader partition的备份, 从leader partition中拉取数据, 一旦leader partition宕机会从follower partition中选举出一个leader partition继续提供服务.

  • controller: 用来进行Leader partition的选举和各种failover

Kafka 文件存储

Partition

Topic是一个逻辑概念, 我们可以认为是一类消息的分组. Partition之于Topic是存储数据的物理存在. 一个Topic可以进行分片成多个partition, 这是为了实现负载均衡和增强可扩展性, 从存储层面来看, partition的数据是日志追加的方式. 当producer发送消息时根据一定的路由规则(随机, key-hash, 轮询算法等), 决定保存到具体的某一个partition中,并顺序的写入到磁盘中. 所以, 但对某个partition而言, 消息是有序的. partition文件包含两部分, 一部分是索引文件, 一部分是数据文件. 索引文件记录的是消息的偏移量, 能够定位到具体的某一个消息保存在数据文件的哪个位置.

partition

我们可以在server.properties中配置partition的数量, num.partition=1 .当指定了partition数量后, kafka会均匀的将partition分配到不同的broker中. partition的数量决定了这个topic的吞吐率

Segment

Partition并不是最终存储数据的最小粒度, Segement才是最终保存数据的最小单元.在server.properties中, 我们可以指定log.dirs来指定partition的存储位置.partition文件的命名规则是: topic name + 索引号, 索引范围[0, num.partitions - 1].

我们来举个例子, 下面是一个partition文件夹的示例:

00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log

前面我们提到segment文件包含两部分, 一部分是索引文件(.index), 一部分是数据文件(.log), 索引文件保存的是元数据. 在命名上也颇为讲究, 文件名含有消息的偏移量, 以00000000000000170410.index和00000000000000170410.log为例

segment

索引文件的元数据指向的是对应数据文件中消息的物理偏移地址, 有了消息的物理地址, 就可以访问对应的消息了. 如元数据[2, 365]为例, 表示第二个消息,也是该partition的第170410+2=170412条消息的物理偏移地址为365.举个例子, 假设我们要查找的170414条消息, 我们可以发现其落在索引文件00000000000000170410.index中, 并根据该文件中的[4, 666], 定位到数据文件666的位置读取即可. 这里又会涉及到一个问题, 如何判断完整的读完消息了, 也就是定位到一条消息的终止位置. kafka的消息具有一定的格式, 会记录消息的偏移量和消息体的长度, 所以读取的时候可以定位到消息的终止位置. kafka的消息格式:

baseOffset: int64 #偏移量
batchLength: int32 #消息体长度
partitionLeaderEpoch: int32
magic: int8 (current magic value is 2)
crc: int32
attributes: int16
    bit 0~2:
        0: no compression
        1: gzip
        2: snappy
        3: lz4
    bit 3: timestampType
    bit 4: isTransactional (0 means not transactional)
    bit 5: isControlBatch (0 means not a control batch)
    bit 6~15: unused
lastOffsetDelta: int32
firstTimestamp: int64
maxTimestamp: int64
producerId: int64
producerEpoch: int16
baseSequence: int32
records: [Record]

Kafka 存储的可靠性

为了提高可用性, kafka也采用了副本机制. 每个partition都会配置一定的副本, 我们可以在broker的server.properties文件中设置default.replication.factor=n n >= 2 来设置复制因子.基于多副本机制, kafka可实现自动故障转移.

  • Leader: 响应client端读写请求的副本, producer和consumer只和Leader交互
  • Follower: 被动地备份Leader数据, 不能响应client端读写请求, 一旦Leader挂掉, 会从他的Follower中选举一个新的Leader进行服务
  • ISR: 包含了Leader在内的其他与Leader数据保持同步的副本, ISR集合是一个动态集合, 可以将长时间(阈值)没和Leader同步数据的Follower从集合中剔除.ISR由Leader进行维护跟踪

AR & ISR & OSR

  • AR: 全部副本的集合. AR = ISR + OSR
  • ISR: Leader+可被选举成为Leader的Follower动态集合
  • OSR: 从ISR中剔除的Follower动态集合, 可以尝试加入到ISR

假设我们将复制因子设置成2, 也就是有一个leader和一个follower. 在broker启动时, 此时AR=ISR,OSR为空集合. 一旦Leader发生宕机或是服务不可用, kafka将会从ISR中选举出一个新的Leader继续提供服务. Leader partition会跟踪Follower, 如果某个Follower宕机或是落后太多, 将会被移除ISR列表.落后太多的定义有两种:

  • follower复制的消息落后于leader预设的阈值,在server.properties配置中的replica.lag.max.messages来配置
  • 超过一定时间, leader没有收到follower来pull消息的请求, 可以通过replica.lag.time.max.ms来配置.

HW & LEO

  • LEO: 日志末端位移(Log End Offset). 是收到的最新消息位移
  • HW: 水位值, 对于同一个副本而言, HW值小于等于LEO, 小于等于HW的消息都是被认为已经备份了, 对于一个partition而言, 其ISR中最小的LEO作为其HW, Consumer也最多只能消费到HW所在的位置. 每个Replica都有HW, 他们各自负责更新自己的HW.

我们来举例说明HW和LEO.

HW & LEO

Kafka的副本机制并不是完全采用同步机制和异步机制, 所谓同步机制就是指, 当Leader partition收到消息时, 需要保证所有的Follower partition都完成了复制, 才对producer响应成功, 这样就依赖最慢的副本的HW. 这种方式看起来比较可靠, 但是会降低吞吐率, 影响到系统的性能. 异步机制是指只需要保证Leader partition收到消息后, 就响应producer成功(常见的一种主从模式), 如果此时Leader partition发生宕机, 则可能发生消息丢失, 这在一些系统上是不被允许的.

消息刷盘

kafka 通过持久化消息到磁盘来保障消息存储的可靠性, 但是有个矛盾点, 频繁的刷盘会导致性能降低, 但是刷盘时间间隔过长又存在丢消息的风险. kafka提供了log.flush.interval.mslog.flush.interval.messages 两个参数来控制 Broker 的刷盘时机.

  • log.flush.interval.ms : 默认null, 单位ms, 用于控制日志刷盘的时间间隔
  • log.flush.interval.messages: 用于控制日志刷盘的消息量, 即每累计多少条消息后写到磁盘上.

推荐配置:

#每当producer写入10000条消息时,刷数据到磁盘 
log.flush.interval.messages=10000

#每间隔1秒钟时间,刷数据到磁盘
log.flush.interval.ms=1000

Kafka 消息的可靠性

kafka消息的可靠性贯穿了消息的生产, 发送, 存储, 到消费的全链路.

消息生产可靠性

要保证不丢消息先从消息的源头开始讲起. Producer发送消息有三中方式: Sync(同步), Async(异步), OneWay.通过producer.type来配置, 默认是值sync.

同步对比异步

同步

一条消息发送到partition需要保证Leader partition收到消息, 并且所有的Follower partition完成消息同步, 能够保证不丢消息, 但是吞吐率会下降.

异步

异步发送到一个缓冲区, 缓存区的数据没发送出去就返回给client了.
异步发送的方式可以批量(batch)发送, 这样可以提高broker的性能和吞吐量, 减少网络IO和磁盘请求次数,但是可能会有丢数据的风险.

OneWay是异步方式的一种, 他不接收Broker返回的ack值, 只管发, 不管异常.

request.required.acks

我们先来看看producer发消息的流程:

  1. producer往topic中发消息的时候首先通过zk找到Leader partition
  2. Leader partition收到消息以后会先将消息写到log文件中, 数据还保存在内存中
  3. Follower partition被动从Leader partition中pull消息后也将数据写到其本地的log文件中, 这个时候数据也只保存在Follower partition的内存中, 还没写到磁盘. 为了提高性能,立刻向leader发送一个ack. leader收到所有follower的ack消息, 则认为这条消息已经commit了, 将会向producer发送一个ack消息.

上面的三个流程对应三个不同的ack时间节点, producer的ack值有三种配置方式, 我们可以通过request.required.acks属性来配置:

  • 0: producer不等待partition返回确认消息, 这样可以得到最大的吞吐率, 但是可能丢消息
  • 1: 等待Leader partition保存成功状态便返回, 但是不会等待Follower partition同步完成, 有不错的可靠性, 吞吐率也得到提升
  • -1: 等待Follower partition都收到数据, 这种方法看起来最可靠, 理论上不会丢消息

根据request.required.acks的三种设置方式, 我们发现request.required.acks=-1 看起来能够保证不丢消息, 实际上也不能避免丢消息. 假设在如下场景中,我们设置 request.required.acks=-1 , 当Leader partition收到消息后, ISR中除去Leader partition以外的其他Follower副本都因为各种原因(宕机或是同步消息滞后超过阈值), 从ISR中被剔除. 此时ISR中仅剩下Leader partition, 这种情况并不能保证消息的可靠性.

因此, 我们需要对ISR列表中的最小副本数加以约束, 通过设置参数min.insync.replicas大于等于2, 来约束需要除去Leader partition以外的至少一个Follower partition同步了消息. 否则producer将收到异常:

org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。

综上所述, 我们可以在producer中设置request.required.acks=-1broker中设置min.insync.replicas=n n >= 2, 来提高消息的在生产过程中的可靠性.

producer的其他参数

  • batch.size: 当多条消息的partition相同时, producer会尝试将他们组装成一个批量消息, 一次性的发到partition中, 这样可以减少请求次数, 有助于提高性能.需要注意的是batch.size 不建议设置的过大或过小, 过大会浪费内存, 过小这可能降低吞吐量
  • linger.ms: 在批处理的场景下, 有可能消息生产的速率快于消息被发送到partition的速率, 可以通过设置linger.ms来延迟发送消息, 提高发送消息的效率. batch.size 的优先级高于linger.ms, 一旦消息积累的大小达到了batch.size的阈值, 即便没有到linger.ms的时间消息也会被发送到partition.
  • buffer.memory: producer用来缓冲等待发送给partition的消息的总字节数, 也就是一个缓冲区. 如果缓冲区被占满, 之后producer将会被阻塞max.block.ms, 随后抛出异常. buffer.memory的大小大致与producer可使用的总内存相对应, 不是硬绑定, 因为并非producer使用的所有内存都用于缓冲.一些额外的内存用于压缩和维护请求.
  • retries: producer发送消息失败后的重试次数. 部分场景为了绝对可靠设置成Integer.MAX_VALUE
  • max.in.flight.requests.per.connection: 在批量发送的场景下, 假设有两个批量消息, 他们的目标partition相同, 顺序分别为T1, T2. 当他们先后被发送到partition时, T1由于某些原因发送失败了, 这样就可能会导致消息的乱序. 这对于一些对时序很敏感的系统中是很致命的. 为了解决这种问题, 可以通过max.in.flight.requests.per.connection来设置单个链接上发送的未确认请求的最大数目. 如果设置大于1(默认是5), 并且发送失败则存在由于重试而导致消息重新排序的风险. 所以, 在开启重试的情况下(retries > 0), 为了规避消息被重新排序的风险, 建议修改配置max.in.flight.requests.per.connection=1. 这样就能保证一个时间段内只有个批量请求发到partition.

消费消息可靠性

consumer

Kafka的consumer和partition存在这么一种关系, consumer数量不应该多于partition的设置, 因为多出来的consumer并不能消费partition的消息, consumer也会固定消费某个或是某几个partition的消息, 除非触发rebalance后可能会导致consumer消费的partition发生变化. 此外, 一个partition只被被固定的consumer消费. consumer和partition的关系会被保存在zookeeper的节点中, 当初触发rebalance时, 原有的关系节点将会被删除, 保存新的绑定关系节点.

offset

Kafka offset记录的是消息的消费进度, Kafka的offset有两种保存方式:

  • 通过配置参数: zookeeper.connect, 这种情况下, 消费进度会保存到zookeeper下的consumers/{group}/offsets/{topic}/{partition}目录下.

  • 通过配置参数: bootstrap.servers, 这是通过kafka默认api的消费方式, offset会保存在kafka的一个默认topic__consumer_offsets. 查看当前group的消费进度, 要依靠kafka自带的工具【kafka-consumer-offset-checker】.

    当前高级别版本的API接口都是通过第二种方式来记录偏移量的

提交方式

消息commit的方式有两种, 一种是自动提交enable.auto.commit=true, 一种是手动提交enable.auto.commit=false

自动提交

自动提交规则是, 消费者会每隔一定的时间间隔来自动提交一次该消费者进程的消费的所有partition的offset(由auto.commit.interval.ms指定).需要注意的是这种方式可能导致重复消费和丢消息的问题.

重复消费: 当consumer poll消息并消费后, 未到自动提交offset的时机,这时触发了rebalance. rebalance后这个consumer依然消费原来的partition, 将从最后一个offset消费消息, 这个时候就出现了重复消费.

丢消息: 当consumer poll消息后, 业务上并没有处理完, 但是触发了commit的时机, 提交了offset, 恰巧的是出现了crash. 当rebalance以后, 这条消息就被丢掉了, 没能成功消费.

所以, 最好还是采用手动提交的方式来避免rebalance带来的重复消费和丢消息的问题.

手动提交

刚刚分析了自动提交的问题, 手动提交也不是没有问题, 手动提交也可能导致一些异常.比如说, 一个consumer进程有两个线程t1,t2 分别消费两个partition分区p1, p2. 当t1消费完消息以后手动commit, 这个时候会提交consumer所有partition的offset, 也就是t2的offset也提交了, 但实际上t2这是可能只是poll了消息并没有消费完成, 如果consumer进程这个时候发生crash, 那么t2消费的消息也会丢到. 目前的kafka控制offset粒度还是一个进程粒度.

那咋整呢?如果consumer数量和partition数量能够一致, 那就不会有这个问题, 说白了就不会有一个consumer消费多个partition的情况. 或者, 我们可以poll消息以后放到一个队列中, 将队列中的所有消息消费掉以后提交一个批量的offset.

Kafka 全程解析

生产消息

  1. producer push消息: 基于负载均衡算法获取到目标partition后, producer先从zk中获取到该partition的Leader partition
  2. producer 将消息发给Leader partition
  3. Leader partition将消息追加到日志文件中
  4. Follower partition主动到Leader partition中poll 消息, 写入本地log, 并向Leader partition发回一个ack消息
  5. 当Follower partition完成了同步, Leader partition更新HW, 并向Producer发送ack
  6. producer接收到ack, 确认发送成功

消息存储

topic是一个逻辑概念, 而topic对应的partition是物理概念, partition并不是最小的存储单元,其实是一个文件夹, 由多个segment组成,segment才是最小的存储单元. segment文件分为 索引文件(.index)和数据文件(.log). 索引文件存储元数据用来定位消息在数据文件的具体位置.

消息消费

consumer指定topic消费, 并将消息消费的偏移量记录在一个固定的topic中. partition和consumer的关系是多对一的关系, 即一个partition只能被一个consumer消费, 一个consumer可以消费多个partition. 如果consumer的数据>partition数量, 多出来的consumer并不会消费任何的partition.

参考文章

深入浅出解读 Kafka 的可靠性机制

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,133评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,682评论 3 390
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,784评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,508评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,603评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,607评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,604评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,359评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,805评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,121评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,280评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,959评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,588评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,206评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,193评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,144评论 2 352

推荐阅读更多精彩内容