1 kafka应用系统框架图
作为一款典型的消息中间件产品,kafka系统仍然由producer、broker、consumer三部分组成。kafka涉及的几个常用概念和组件简单介绍如下:
-
message和batch
消息(message),是kafka生成(produce)、传递(deliver)、存储(store)、消费(consume)的基本数据单元。在kafka的很多场合中,消息也叫做记录(record)。因为kafka在消息存储时,只能以追加(append)的方式写入,一般都是顺序读写存储的消息,行为类似于日志(log)的读写访问,在消息存储上下文中,一条消息也称为一条日志。在kafka系统内部,一条消息一般由用户(producer和consumer)规定的key(可选,key可以为空)和消息体(message body或message value)以及kafka系统内部维护的消息元数据组成。消息的key和value虽然对用户来说有不同的编码和含义,但是对kafka来说都是一个字节数组,是透明地进行存储和传递的。
为了提升消息传输和处理的效率,kafka往往将多条消息打包成一个batch,然后在网络中传输。
batch消息的格式为:
baseOffset: int64 // batch中第一条消息的offset batchLength: int32 // batch消息的总长度 partitionLeaderEpoch: int32 magic: int8 (current magic value is 2) // 消息格式的版本,当前版本为V2 crc: int32 // 消息的crc校验值 attributes: int16 // 属性标志位,如消息是否压缩,采用的压缩算法 bit 0~2: 0: no compression 1: gzip 2: snappy 3: lz4 4: zstd bit 3: timestampType // 时间戳类型,0: producer发送消息的时间;1: 消息到达broker的时间 bit 4: isTransactional (0 means not transactional) bit 5: isControlBatch (0 means not a control batch) bit 6~15: unused lastOffsetDelta: int32 // batch中最后一条消息与baseOffset的差值 firstTimestamp: int64 // 第一条消息的时间戳 maxTimestamp: int64 // batch中消息的最大时间戳,基本上是最后一条消息的时间戳 producerId: int64 // 消息是哪个producer发送的 producerEpoch: int16 baseSequence: int32 recordsCount: int32 // batch中消息的条数 records: [Record] // batch中消息列表。如果使用了压缩,则是消息列表压缩后的数据
消息的格式为:
length: varint // 消息的长度 attributes: int8 // 当前没有使用 bit 0~7: unused timestampDelta: varlong // 消息的时间戳。如果消息封装在batch中,则是与第一条消息的时间戳差值 offsetDelta: varint // 消息的offset。如果在batch中,则是与baseOffset的差值 keyLength: varint // 消息key的长度 key: byte[] // 消息的key值 valueLen: varint // 消息内容的长度 value: byte[] // 消息的内容 Headers => [Header] // header列表
Header的格式如下,header是一个key-value值对,具体内容比较灵活,便于扩展:
headerKeyLength: varint headerKey: String headerValueLength: varint Value: byte[]
-
topic和partition
topic(主题)是一组逻辑上的消息集合,可以理解为kafka通过topic将消息进行了分类和管理,一个topic包含了同一种类型的消息,一条消息必须属于也只能属于一个唯一的topic。一个topic由一个或多个partition(分区)组成。一个partition只能在一个broker上,且同一个partition中的消息是保序的,但是同一个topic内不同的partition中的消息是不保序的,因为不同partition的消息处理是完全相互独立的。这是kafka一个非常重要的特性和承诺,在涉及消息端到端的顺序问题时必须仔细考虑。
-
offset
每一条消息写入partition时,都会为这条消息分配一个唯一的offset(偏移)。消息是以追加的方式写入partition中的,offset也是递增的。每写入一条消息就增加1,offset的初始值为0,每个partition独立的管理自己消息的offset。可以说,offset值是partition中消息的唯一标识。
-
producer
producer(生产者),也叫message writer(消息写入者),从消息的发布订阅角度来说,也叫publisher(发布者)。producer用来创建消息并写入kafka。producer与producer之间是完全解耦的,他们之间没有任何通信和关联。每个producer可以独立的根据自己的需要向一个或多个topic发送消息。
-
consumer和consumer group
consumer(消费者),也叫message reader(消息读取者),从消息的发布订阅角度来说,也叫subscriber(订阅者)。consumer group由一个或多个consumer组成。不同的consumer group独立的消费topic中的消息。但是在同一个consumer group内部,同一时刻,一个partition的消息只能被一个consumer消费。consumer是以pull的方式主动拉取topic中的消息的。
-
broker和cluster
一个独立的kafka server实例就是一个broker。broker就是消息的发布(publish)和订阅(subcribe)中心,或者说是消息的中转站。broker接收producer发送过来的消息,将消息存储到磁盘中,并接受consumer的请求,将消息发送给consumer。
kafka集群(cluster)由多个broker组成。集群将多个broker有机组合起来,除了提供单个broker以单独身份就能支持的partition消息传递和存储服务外,还提供集群层面才能支持的能力,如,topic以partition为基本单元分布到不同的borker上,partition的多副本以及高可用等
-
zookeeper
从2.8版本开始,kafka集群已经不需要zookeeper的支持了,kafka的集群特性由直接运行在broker上的基于kafka raft协议的Quorum控制器实现。2.8之前的版本,kafka集群的元数据保存在zookeeper集群中,kafka的很多集群特性(如kafka controller选举)依赖于zookeeper。zookeeper集群一般3个、5个或7个节点。
-
replia、leader replia和follower replica
为了保证消息传递的高可用,kafka的partition可以有多个副本(replica)。这些副本中有一个副本充当leader的角色,其他副本均充当follower的角色。只有leader replica才会接受producer写入消息和consumer消费消息,follower replica只会以pull的方式不断的从leader replica拉取消息,以保持与leader的同步。当leader故障时,kafka集群会在follower replica中选择一个作为新的leader,新的leader继续接受消息写入和消费服务,其他的follower则改向新的leader同步消息。因为新的leader与旧的leader在切换时消息是同步的,所以旧的leader故障不会影响kafka的正常消息服务。partition副本视情况一般2到3个就可以了,也有5个副本的。
-
ISR、LEO和HW
ISR(In Sync Replia)就是跟leader replica保持同步的follower replica。由于网络拥塞、短暂分区、follower replica所在broker负载过重或宕机后重启等各种原因,有些follower并不能时刻与leader保持同步,follower中的消息会不同程度的落后于leader。
LEO(Log End Offset)就是leader或follower副本中最后一条消息的offset。HW(High Watermark)代表的offset表示ISR中所有的副本都包含了这个offset对应的消息。HW对应的消息也叫做已提交日志(Committed Log)。consumer在消费partition的消息时,只能消费到小于或等于该partition leader replica维护的HW的消息,而不能消费大于HW且小于或等于LEO这个区间offset的消息。
rebalance
当consumer group的状态发生变化(如有consumer故障、增减consumer成员等)或consumer group消费的topic状态发生变化(如增加了partition,消费的topic发生变化),kafka集群会自动调整和重新分配consumer消费的partition,这个过程就叫做rebalance(再平衡)。
- __consumer_offsets
__consumer_offsets是kafka集群自己维护的一个特殊的topic,它里面存储的是每个consumer group已经消费了每个topic partition的offset。__consumer_offsets中offset消息的key由group id,topic name,partition id组成,格式为{topic name}-${partition id},value值就是consumer提交的已消费的topic partition offset值。__consumer_offsets的分区数和副本数分别由offsets.topic.num.partitions(默认值为50)和offsets.topic.replication.factor(默认值为1)参数配置。我们通过公式 hash(group id) % offsets.topic.num.partitions
就可以计算出指定consumer group的已提交offset存储的partition。由于consumer group提交的offset消息只有最后一条消息有意义,所以__consumer_offsets是一个compact topic,kafka集群会周期性的对__consumer_offsets执行compact操作,只保留最新的一次提交offset。
- group coordinator(协调器)
group coordinator运行在kafka某个broker上,负责consumer group内所有的consumer成员管理、所有的消费的topic的partition的消费关系分配、offset管理、触发rebalance等功能。group coordinator管理partition分配时,会指定consumer group内某个consumer作为group leader执行具体的partition分配任务。存储某个consumer group已提交offset的__consumer_offsets partition leader副本所在的broker就是该consumer group的协调器运行的broker。
2 kafka集群基本功能
2.1 kafka controller
跟大多数分布式系统一样,集群有一个master角色管理整个集群,协调集群中各个成员的行为。kafka集群中的controller就相当于其它分布式系统的master,用来负责集群topic的分区分配,分区leader选举以及维护集群的所有partition的ISR等集群协调功能。集群中哪个borker是controller也是通过一致性协议选举产生的,2.8版本之前通过zookeeper进行选主,2.8版本后通过kafka raft协议进行选举。如果controller崩溃,集群会重新选举一个broker作为新的controller,并增加controller epoch值(相当于zookeeper ZAB协议的epoch,raft协议的term值)
2.1.1 partition分配
当kafka集群新建了topic或为一个topic新增了partition,controller需要为这些新增加的partition分配到具体的broker上,并把分配结果记录下来,供producer和consumer查询获取。
因为只有partition的leader副本才会处理producer和consumer的读写请求,而partition的其他follower副本需要从相应的leader副本同步消息,为了尽量保证集群中所有broker的负载是均衡的,controller在进行集群全局partition副本分配时需要使partition的分布情况是如下这样的:
- 所有的partition副本在集群中尽量是平均分布的。也就是说每个broker上的partition副本数是一样的
- 所有的partition leader副本在集群中尽量是平均分布的。也就是说每个broker上的partition leader副本数是一样的
- 同一个topic的partition leader副本在集群中尽量是平均分布的。
- 同一个partition的多个副本不能在同一个broker上。因为同一个partition的多个副本放在一个broker上没有意义,达不到高可用的目的。但是同一个topic的不同partition的副本是可以在同一个broker上的。
- 如果集群的broker通过broker.rack参数配置了机架信息,应尽量使partition的副本在机架间平均分布,以提高集群的可用性
PS:因为不同topic的流量不同(大象流和老鼠流),同一个topic的不同partition的流量也可能不同(数据倾斜),上述分布也不能保证broker负载均衡。这些因素比较复杂,暂不考虑。如果实在需要,可以通过topic reassign功能人工进行partition分配
在默认情况下,kafka采用轮询(round-robin)的方式分配partition副本。由于partition leader副本承担的流量比follower副本大,kafka会先分配所有topic的partition leader副本,使所有partition leader副本全局尽量平衡,然后再分配各个partition的follower副本。partition第一个follower副本的位置是相应leader副本的下一个可用broker,后面的副本位置依此类推。
举例来说,假设我们有两个topic,每个topic有两个partition,每个partition有两个副本,这些副本分别标记为1-1-1,1-1-2,1-2-1,1-2-2,2-1-1,2-1-2,2-2-1,2-2-2(编码格式为topic-partition-replia,编号均从1开始,第一个replica是leader replica,其他的是follower replica)。共有四个broker,编号是1-4。我们先对broker按broker id进行排序,然后分配leader副本,最后分配foller副本。
1)没有配置broker.rack的情况
现将副本1-1-1分配到broker 1,然后1-2-1分配到broker 2,依此类推,2-2-1会分配到broker 4。partition 1-1的leader副本分配在broker 1上,那么下一个可用节点是broker 2,所以将副本1-1-2分配到broker 2上。同理,partition 1-2的leader副本分配在broker 2上,那么下一个可用节点是broker 3,所以将副本1-1-2分配到broker 3上。依此类推分配其他的副本分片。最后分配的结果如下图所示:
2)配置了broker.rack的情况
假设配置了两个rack,broker 1和broker 2属于Rack 1,broker 3和broker 4属于Rack 2。我们对rack和rack内的broker分别排序。然后先将副本1-1-1分配到Rack 1的broker 1,然后将副本1-2-1分配到下一个Rack的第一个broker,即Rack 2的broker 3。其他的parttition leader副本依此类推。然后分配follower副本,partition 1-1的leader副本1-1-1分配在Rack 1的broker上,下一个可用的broker是Rack 2的broker 3,所以分配到broker 3上,其他依此类推。最后分配的结果如下图所示:
kafka除了按照集群情况自动分配副本,也提供了reassign工具人工分配和迁移副本到指定broker,这样用户可以根据集群实际的状态和各partition的流量情况分配副本
2.1.2 partition leader选举
kafka集群controller的一项功能是在partition的副本中选择一个副本作为leader副本。在topic的partition创建时,controller首先分配的副本就是leader副本,这个副本又叫做preference leader副本。
当leader副本所在broker失效时(宕机或网络分区等),controller需要为在该broker上的有leader副本的所有partition重新选择一个leader,选择方法就是在该partition的ISR中选择第一个副本作为新的leader副本。但是,如果ISR成员只有一个,就是失效的leader自身,其余的副本都落后于leader怎么办?kafka提供了一个unclean.leader.election配置参数,它的默认值为true。当unclean.leader.election值为true时,controller还是会在非ISR副本中选择一个作为leader,但是这时候使用者需要承担数据丢失和数据不一致的风险。当unclean.leader.election值为false时,则不会选择新的leader,该partition处于不可用状态,只能恢复失效的leader使partition重新变为可用。
当preference leader失效后,controller重新选择一个新的leader,但是preference leader又恢复了,而且同步上了新的leader,是ISR的成员,这时候preference leader仍然会成为实际的leader,原先的新leader变为follower。因为在partition leader初始分配时,使按照集群副本均衡规则进行分配的,这样做可以让集群尽量保持平衡。
2.2 kafka副本复制
为了保证topic的高可用,topic的partition往往有多个副本,所有的follower副本像普通的consumer一样不断地从相应的leader副本pull消息。每个partition的leader副本会维护一个ISR列表存储到集群信息库里,follower副本成为ISR成员或者说与leader是同步的,需要满足以下条件:
1)follower副本处于活跃状态,与zookeeper(2.8之前版本)或kafka raft master之间的心跳正常
2)follower副本最近replica.lag.time.max.ms(默认是10秒)时间内从leader同步过最新消息。需要注意的是,一定要拉取到最新消息,如果最近replica.lag.time.max.ms时间内拉取过消息,但不是最新的,比如落后follower在追赶leader过程中,也不会成为ISR。
follower在同步leader过程中,follower和leader都会维护几个参数,来表示他们之间的同步情况。leader和follower都会为自己的消息队列维护LEO(Last End Offset)和HW(High Watermark)。leader还会为每一个follower维护一个LEO。LEO表示leader或follower队列写入的最后一条消息的offset。HW表示的offset对应的消息写入了所有的ISR。当leader发现所有follower的LEO的最小值大于HW时,则会增加HW值到这个最小值LEO。follower拉取leader的消息时,同时能获取到leader维护的HW值,如果follower发现自己维护的HW值小于leader发送过来的HW值,也会增加本地的HW值到leader的HW值。这样我们可以得到一个不等式:follower HW <= leader HW <= follower LEO <= leader LEO
。HW对应的log又叫做committed log,consumer消费partititon的消息时,只能消费到offset值小于或等于HW值的消息的,由于这个原因,kafka系统又称为分布式committed log消息系统。
2.3 kafka消息存储
kafka的消息内容存储在log.dirs参数配置的目录下。kafka每个partition的数据存放在本地磁盘log.dirs目录下的一个单独的目录下,目录命名规范为${topicName}-${partitionId}
,每个partition由多个LogSegment组成,每个LogSegment由一个数据文件(命名规范为:{baseOffset}.index)和一个时间戳索引文件(命名规范为:${baseOffset}.timeindex)组成,文件名的baseOffset就是相应LogSegment中第一条消息的offset。.index文件存储的是消息的offset到该消息在相应.log文件中的偏移,便于快速在.log文件中快速找到指定offset的消息。.index是一个稀疏索引,每隔一定间隔大小的offset才会建立相应的索引(比如每间隔10条消息建立一个索引)。.timeindex也是一个稀疏索引文件,这样可以根据消息的时间找到对应的消息。
可以考虑将消息日志存放到多个磁盘中,这样多个磁盘可以并发访问,增加消息读写的吞吐量。这种情况下,log.dirs配置的是一个目录列表,kafka会根据每个目录下partition的数量,将新分配的partition放到partition数最少的目录下。如果我们新增了一个磁盘,你会发现新分配的partition都出现在新增的磁盘上。
kafka提供了两个参数log.segment.bytes和log.segment.ms来控制LogSegment文件的大小。log.segment.bytes默认值是1GB,当LogSegment大小达到log.segment.bytes规定的阈值时,kafka会关闭当前LogSegment,生成一个新的LogSegment供消息写入,当前供消息写入的LogSegment称为活跃(Active)LogSegment。log.segment.ms表示最大多长时间会生成一个新的LogSegment,log.segment.ms没有默认值。当这两个参数都配置了值,kafka看哪个阈值先达到,触发生成新的LogSegment。
kafka还提供了log.retention.ms和log.retention.bytes两个参数来控制消息的保留时间。当消息的时间超过了log.retention.ms配置的阈值(默认是168小时,也就是一周),则会被认为是过期的,会被kafka自动删除。或者是partition的总的消息大小超过了log.retention.bytes配置的阈值时,最老的消息也会被kafka自动删除,使相应partition保留的总消息大小维持在log.retention.bytes阈值以下。这个地方需要注意的是,kafka并不是以消息为粒度进行删除的,而是以LogSegment为粒度删除的。也就是说,只有当一个LogSegment的最后一条消息的时间超过log.retention.ms阈值时,该LogSegment才会被删除。这两个参数都配置了值时,也是只要有一个先达到阈值,就会执行相应的删除策略
3 消息写入过程
3.1 三种消息发送模式
当我们使用KafkaProducer向kafka发送消息时非常简单,只要构造一个包含消息key、value、接收topic信息的ProducerRecord对象就可以通过KafkaProducer的send()向kafka发送消息了,而且是线程安全的。KafkaProducer支持通过三种消息发送方式
- 只发送消息,不等待消息响应,也不关心消息是否成功发送(fire-and-forget)
- 同步发送。发送后等待返回结果RecordMetadata后再发送后面的消息。RecordMetadata中包含了消息的topic、partition、offst、时间等信息
- 异步发送。发送时,提供一个Callback回调方法。无需等待消息响应就可以继续发送后续的消息。同时当消息响应时,不管是发送成功还是发生异常,都会调用发送消息时提供的Callback回调方法。在Callback中我们可以实现一些功能,比如跟踪记录成功消息信息,以用于审计、分析统计等;对消息异常进行处理,比如记录异常、重新发送等。我们一般需要兼顾消息发送吞吐量和消息可靠性,建议采用这种模式。
3.2 消息写入kafka基本过程
KafkaProducer客户端虽然使用简单,但是一条消息从客户端到topic partition的日志文件,中间需要经历许多的处理过程。KafkaProducer的内部结构如下所示:
从图中可以看出,消息的发送涉及两类线程,一类是调用KafkaProducer.send()方法的应用程序线程,因为KafkaProducer.send()是多线程安全的,所以这样的线程可以有多个;另一类是与kafka集群通信,实际将消息发送给kafka集群的Sender线程,当我们创建一个KafkaProducer实例时,会创建一个Sender线程,通过该KafkaProducer实例发送的所有消息最终通过该Sender线程发送出去。RecordAccumulator则是一个消息队列,是应用程序线程与Sender线程之间消息传递的桥梁。当我们调用KafkaProducer.send()方法时,消息并没有直接发送出去,只是写入了RecordAccumulator中相应的队列中,最终需要Sender线程在适当的时机将消息从RecordAccumulator队列取出来发送给kafka集群。
消息的发送过程如下:
-
KafkaProducer
构造ProducerRecord实例,ProducerRecord的信息包括:消息的key,消息的value,消息的topic,也可以同时指定消息的partition。
-
ProducerInterceptors
我们可以给KafkaProducer注册一组ProducerInterceptor,这些ProducerInterceptor可以拦截消息,在对消息进一步处理之前可以对消息做一些处理。
-
Serializer
将消息的key和value序列化为byte序列,因为kafka处理的消息与应用无关,这里需要将应用程序以类对象表示的数据转换为kafka理解的字节数组。Kafka producer已经贴心地提供了String、Integer等常见数据类型以及Avro对象的序列化器。当然用户也可以自定义序列化器。当序列化异常时,会抛出SerializationException。
-
获取ClusterMetadata
我们将消息发送给kafka集群,需要知道消息topic的partition所在的broker机器。消息序列化后,会检测producer是否已经获取了所需的cluster信息以及cluster信息是否已经过时了。如果没有有效的cluster信息,会触发Sender线程向kafka集群发送获取集群信息的请求,从而获取ClusterMetadata。ClusterMetadata中包含了集群所有的topic、partition、partition所在的broker、partition leader所在的broker等与消息发送相关的集群信息。除了发送消息时触发获取ClusterMetadata,Sender线程也会周期性的获取ClusterMetadata,获取周期由参数metadata.max.age.ms设置。这样正常情况下,消息发送时都已经有了可用的ClusterMetadata。ClusterMetadata请求超时时间通过参数metadata.fetch.timeout.ms设置。
-
partitioner
producer发送消息时,往往只指定了消息的topic。partitioner用来确定消息的partition。如果消息的key为空,producer默认使用轮询的方式将消息发给topic可用的partition。如果消息的key不为空,producer默认会先计算key的hash值,然后将hash值对partition数求余来确定partition。当然,用户也可以根据自己的需要自定义partition。如果producer在发送消息已经指定了partition,partitioner就无需再计算partition了。
-
RecordAccumulator
RecordAccumulator为每一个topic partition维护了一个队列,消息经过partitioner模块确定topic partition后,会被写入该partition对应的特定队列,然后KafkaProducer.send()方法就返回了,可以继续发送后续的消息。partition队列中的元素并不是一个个原始消息,而是BatchRecord。这是为了提高消息的发送效率,将多条消息打包在一起发送。BatchRecord中消息的大小由producer参数batch.size(单位是字节)决定,如果BatchRecord中累积的消息大小超过了该阈值,会创建一个新的BatchRecord供消息写入。事实上,RecordAccumulator内部维护了一个Map<TopicPartion, ArrayDeque<BatchRecord>>,这个Map的key为topic的partition,value为BatchRecord的双向队列,供应用程序写入,Sender线程读取
RecordAccumulator存储了待发送的BatchRecord,我们可以通过producer参数buffer.memory控制RecordAccumulator队列占用的总的内存空间。当RecordAccumulator占用的内存达到阈值,再写入消息时会抛出异常。同时,producer提供了另一个参数max.block.ms,可以阻塞写入线程一段时间后再抛出异常,如果max.block.ms指定的时间内RecordAccumulator有剩余的空间了,就无需抛出异常了
为了进一步提高消息的传输效率,BatchRecord中的消息支持压缩,通过producer的参数compression.type控制,compression.type有效值包括:none、snappy、gzip、lz4、zstd。
-
Sender发送消息
Sender Thread是一个单独的线程,它会检测RecordAccumulator中有哪些符合条件需要发送的RecordBatch,并把发送到同一个kafka broker的所有RecordBatch封装到一个叫做ClientRequest的对象中,这样不管发送的RecordBatch属于哪个topic partition,只要他们的leader在同一个kafka broker,都在同一个报文里发送给broker,减少了报文发送个数,报文的最大大小则通过producer配置参数max.request.size指定,默认值为1MB。同时,sender在确定RecordBatch需要发送到的broker时,会判断ClusterMetadata中相应topic的metadata是否有效,如果metadata缺失或无效,由于不知道需要发送给哪个borker,也不会发送。对从RecordAccumulator中取出的RecordBatch,sender还会判断RecordBatch中的消息是否已经超时,如果已经超时,则不会发送相应的消息,而且如果消息注册Callback方法,就会调用消息相应的Callback方法。然后sender会将ClientRequest暂存到InFlightRequests中等待kafka broker的响应,并在条件允许时发送RequestSend报文(RequestSend是发送给broker的实际报文,是ClientRequest中的一个字段)给broker。其实sender是把RequestSend写到tcp的send buffer,buffer大小可以通过producer参数send.buffer.bytes设置。
sender什么时候会构造一个ClientRequest把消息发送给kafka?是不是topic partition的消息数一定要累积到batch size才会发送?下面的几种情况都会触发ClientRequest的构造:
1)RecordAccumulator中topic partition有多个RecordBatch或者第一个的RecordBatch达到了batch.size大小
2)消息等待发送超时时间已到。由参数linger.ms参数设置,如果linger.ms为0表示立即发送,不等待
3)RecordAccumulator空间不够了,需要释放空间。
4)某个应用线程执行了producer.flush()方法
5)sender线程关闭,这时sender会尽量把消息发送出去
sender最终真正把消息发送给kafka broker,还需要满足以下几个条件:
1)sender的NetworkClient与broker的tcp连接正常
2)sender发送出去且没有收到响应的消息数小于参数max.in.flight.requests.per.connection设定的值
-
kafka写入消息
kafka broker在接收到RequestSend消息后,会解析RequestSend,取出RecordBatch,如果RecordBatch所属的topic partition的leader replica在当前broker上,则会将RecordBatch写入相应的topic partition 日志文件中,为消息分配offset值。注意,kafka这里其实只写入了操作系统缓存,并没有持久化到磁盘中,而且因为kafka日志文件本身是一个日志类型文件,也不会有其他数据库那样的WAL文件,如果broker宕机,没有持久化到磁盘的消息会消失。另外,kafka是将RecordBatch整体写入日志文件的,而不会解开RecordBatch将里面的消息一条条写入日志文件,所以kafka consumer消费时获取的也是整个RecordBatch,需要在cosnumer端解开RecordBatch才能获取到指定偏移的消息。如果RecordBatch中的消息是压缩的,也是在consumer端进行解压缩。如果RecordBatch所属的topic partition的leader replica不在当前broker上,broker则会返回一个"非首领分区"错误。
topic partition的follower replica会努力拉取leader replica最新写入的消息,leader维护topic partition的HW值,如果ISR都拉取了最新写入的消息,leader会修改HW为最新写入消息的offset,这样新写入的消息就能被consumer消费了。leader会根据producer参数acks决定什么时候回复响应消息给producer:
1)acks=0,leader将消息写入日志文件就会回复响应消息。但在producer端看来,producer将RequestSend消息发送出去就认为消息发送成功,而不会等待broker的响应
2)acks=1,leader将消息写入日志文件就会回复响应消息。producer需要收到表示成功的消息响应才会认为消息发送成功。
3)acks=all,需要所有的ISR都接收到消息,leader才会回复响应消息。由于follower
replica同步消息有快有慢,为了避免producer长久等待消息的响应,producer设置了timeout.ms参数,如果leader在timeout.ms时间内还没有收到follower replica的同步响应,leader会立即返回表示同步超时的异常响应,而不是一直等待所有ISR的消息同步完成
为了保证消息的可靠性,我们需要为partition设置多个副本,副本的多少决定了可靠性的高低。但是从上面broker消息写入过程分析可知,实际决定消息可靠性的是ISR,而不是partition副本数。因为ISR是动态维护的,即使有多个partition,ISR也可能很少,甚至只有leader自身,这时如果ISR成员同时宕机,消息仍然可能丢失(ISR成员都收到了消息,但是都没有持久化到磁盘,leader已返回写入成功响应,producer认为消息已经成功发送,但是consumer无法消费到)。为了管理这种情况,kafka设置了一个参数min.insync.replicas,它可以为某个topic单独配置,或者配置整个集群。当某个topic partition的ISR数量小于设置的min.insync.replicas时,它将拒绝服务。
kafka broker在接收到RequestSend消息时,会检查消息的大小是否超过了broker配置的message.max.bytes(默认值为1MB),如果超过了会拒绝接收,并返回消息过大的异常响应。borker还可以通过receive.buffer.size设置tcp接收缓存大小。当producer与broker间距离较远,传输时延较大时,可以适当增加receive.buffer.size大小,提高数据传输效率
-
sender处理消息响应
sender将RequestSend消息通过网络发送给kafka broker后,将RequestSend消息关联的ClientRequest暂存入了InFlightRequests,等待消息的响应返回。sender接收到ClientResponse后,会根据响应消息中的correlation_id字段找到对应的ClientRequest,然后执行注册的Callback方法,这样ClientRequest就可以释放了。
如果sender在request.timeout.ms时间内还没有收到消息的响应,或者收到的是一个异常响应,而这个异常响应是可以通过重试来解决的,比如tcp连接错误、无leader错误(kafka集群可能正在进行选主)等,且producer参数retries值大于0,则producer会自动重试retries次,两个重试之间的时间间隔由参数retry.backoff.ms设置。如果自动重试后仍然失败,或者响应错误类型是无法通过重试解决的,比如发送消息大小大于broker接收的最大消息message.max.bytes大小,producer会抛出异常(producer.send()等待Future<RecordMetadata>完成)或回调Callback(producer.send()注册了Callback),最终由应用程序决定如何处理,修改处理逻辑重试或仅记录错误事件。
4 消息读取过程
在使用KafkaConsumer实例消费kafka消息时,有一个特性我们要特别注意,就是KafkaConsumer不是多线程安全的,KafkaConsumer方法都在调用KafkaConsumer的应用程序线程中运行(除了consumer向kafka集群发送的心跳,心跳在一个专门的单独线程中发送),所以我们调用KafkaConsumer的所有方法均需要保证在同一个线程中调用,除了KafkaConsumer.wakeup()方法,它设计用来通过其它线程向consumer线程发送信号,从而终止consumer执行。
4.1 Consumer Metadata
跟producer一样,consumer要与kafka集群通信,消费kafka消息,首先需要获取消费的topic partition leader replica所在的broker地址等信息,这些信息可以通过向kafka集群任意broker发送Metadata请求消息获取。
4.2 partition分配
我们知道,一个consumer group有多个consumer,一个topic有多个partition,而且topic的partition在同一时刻只能被consumer group内的一个consumer消费,那么consumer在消费partition消息前需要先确定消费topic的哪个partition。partition的分配通过group coordinator来实现。基本过程如下:
- consumer通过向任意kafka broker发送metadata请求获取consumer group对应的group coordinator所在的borker
- consumer向group coordinator发送JoinGroupRequest,申请加入group。请求中携带了consumer的基本信息,比如消费的topic。
- group coordinator在已知的加入到group中的所有consumer中选择一个consumer作为group leader(选择第一个consumer)。group coordinator发送JoinGroupResponse响应给所有的consumer。其中只有group leader接收到的响应消息中包含所有consumer信息。
- group leader根据配置参数partition.assignment.startegy确定的partition分配策略为每个consumer分配消费的topic partiton。
- consumer向group coordinator发送SyncGroupRequest消息。group leader发送的SyncGroupRequest消息中会包含partition分配的结果
- group coordinator向consumer发送SyncGroupResponse响应,响应中携带了consumer分配的parition。这样consumer就可以向分配的partition拉取消息了
我们可以通过实现接口org.apache.kafka.clients.consumer.internals.PartitionAssignor自定义partition分配策略,但是kafka已经提供了三种分配策略可以直接使用。
-
RangeAssigor
RangeAssigor对每个topic独立进行分配。基本思路为:对每个topic,首先对topic的所有partition按partition id进行排序,然后对这个topic的所有consumer进行排序,通过公式
${partition count} / $ {consumer count}
计算每个consumer消费的partition个数,然后将连续的几个partition分别分配给各个consumer。前面的几个consumer可能会比后面的consumer多消费一个partiton,因为partiton数可能不能被consumer数整除。 -
RoundRobinAssignor
RoundRobinAssignor将所有的topic partition进行整体轮询分配。基本思路为:将所有topic的所有partition按顺序排序,将所有的cosnumer进行排序,然后按顺序逐一分别从topic partition队列和consumer队列中取出一个partition和consumer,如果该consumer消费该topic,则将该partition分配给consumer,否则轮询下一个consumer继续进行判断。
-
StickyAssignor
RangeAssigor和RoundRobinAssignor都没有考虑历史的分配情况。对consumer group第一次进行分配时没什么问题,但是在rebalance场景下,可能会重新打乱消费配对关系,造成重消费现象。StickyAssignor则会考虑当前的partition分配情况,在尽量不改变现有消费分配关系的情况下使partition分配最均衡。
4.3 consumer消费消息
partition分配完后,每个consumer知道了自己消费的topic partition,通过metadata请求可以获取相应partition的leader副本所在的broker信息,然后就可以向broker poll消息了。但是consumer从哪个offset开始poll消息?所以consumer在第一次向broker发送FetchRequest poll消息之前需要向Group Coordinator发送OffsetFetchRequest获取消费消息的起始位置。Group Coordinator会通过key {topic}-${partition}查询 __consumer_offsets topic中是否有offset的有效记录,如果存在,则将consumer所属consumer group最近已提交的offset返回给consumer。如果没有(可能是该partition是第一次分配给该consumer group消费,也可能是该partition长时间没有被该consumer group消费),则根据consumer配置参数auto.offset.reset值确定consumer消费的其实offset。如果auto.offset.reset值为latest,表示从partition的末尾开始消费,如果值为earliest,则从partition的起始位置开始消费。当然,consumer也可以随时通过KafkaConsumer.seek()方法人工设置消费的起始offset。
kafka broker在收到FetchRequest请求后,会使用请求中topic partition的offset查一个skiplist表(该表的节点key值是该partition每个LogSegment中第一条消息的offset值)确定消息所属的LogSegment,然后继续查LogSegment的稀疏索引表(存储在.index文件中),确定offset对应的消息在LogSegment文件中的位置。为了提升消息消费的效率,consumer通过参数fetch.min.bytes和max.partition.fetch.bytes告诉broker每次拉取的消息总的最小值和每个partition的最大值(consumer一次会拉取多个partition的消息)。当kafka中消息较少时,为了让broker及时将消息返回给consumer,consumer通过参数fetch.max.wait.ms告诉broker即使消息大小没有达到fetch.min.bytes值,在收到请求后最多等待fetch.max.wait.ms时间后,也将当前消息返回给consumer。fetch.min.bytes默认值为1MB,待fetch.max.wait.ms默认值为500ms。
为了提升消息的传输效率,kafka采用零拷贝技术让内核通过DMA把磁盘中的消息读出来直接发送到网络上。因为kafka写入消息时将消息写入内存中就返回了,如果consumer跟上了producer的写入速度,拉取消息时不需要读磁盘,直接从内存获取消息发送出去就可以了。
4.4 offset管理
为了避免发生再平衡后,consumer重复拉取消息,consumer需要将已经消费完的消息的offset提交给group coordinator。这样发生再平衡后,consumer可以从上次已提交offset出继续拉取消息。
kafka提供了多种offset提交方式
-
自动提交
如果参数enable.auto.commit值为true,那么consumer会自动提交offset。提交时间间隔由参数auto.commit.interval.ms设置,默认值为5s。应用程序在调用KafkaConsumer.poll()方法时,consumer在拉取消息前会检查当前时间与上次offset提交时间间隔是否大于auto.commit.interval.ms值,如果是大于,则会提交上次拉取的最后一条消息的offset。这里我们需要注意的是,自动提交也是应用程序通过调用KafkaConsumer.poll()方法触发的,如果应用程序处理消息的时间很长,实际提交offset的时间间隔可能也会比较大。
-
手动提交
有的时候应用程序需要自己控制offset提交的时机和提交的值,典型的场景是应用程序希望在可靠处理完消息后再提交offset,而不是像自动提交那样只要时间间隔到了就提交之前拉取到的所有消息。将enable.auto.commit值设为false就是手动提交
手动提交也有多重方式:
1)同步提交
应用程序调用KafkaConsumer.commitSync()方法进行同步提交。只要没有发生不可恢复的错误,commitSync()方法会一直尝试执行提交动作,直到最终提交成功才返回
2)异步提交
应用程序调用KafkaConsumer.commitAsync()方法进行异步提交。这里的异步并不是指执行提交的动作是在与consumer线程不同的线程里执行,而是与同步提交相比,异步提交即使提交失败了也不会重试。这种情况也许是问题,也许不是问题。不是问题的原因是,下一次正常提交会覆盖前面的提交,相当于恢复了正常状态。比如前一次提交offset 2000失败了,后面一次提交offset 3000成功了,kafka认为offset 3000之前的消息都提交了,跟两次提交都成功的效果是一样的。
3)同步和异步提交相结合
结合同步与异步提交的优点,正常消费时采用异步提交,提高消息消费的效率;consumer异常退出或关闭时采用同步提交,保证offset可靠提交。
4)提交指定的offset
前面介绍的commitSync()和commitAsync()方法都没有提供参数,这时提交的offset都是前一次poll的最后一条消息的offset。我们还可以给commitSync()和commitAsync()传入一个参数,指定topic partition提交的具体的offset值。
partition offset提交和管理对kafka消息系统效率来说非常关键,它直接影响了再平衡后consumer是否会重复拉取消息以及重复拉取消息的数量。如果offset提交的比较频繁,会增加consumer和kafka broker的消息处理负载,降低消息处理效率;如果offset提交的间隔比较大,再平衡后重复拉取的消息就会比较多。还有比较重要的一点是,kafka只是简单的记录每次提交的offset值,把最后一次提交的offset值作为最新的已提交offset值,作为再平衡后消息的起始offset,而什么时候提交offset,每次提交的offset值具体是多少,kafka几乎不关心(这个offset对应的消息应该存储在kafka中,否则是无效的offset),所以应用程序可以先提交3000,然后提交2000,再平衡后从2000处开始消费,决定权完全在consumer这边。
4.5 partition再平衡
kafka中的topic partition与consumer group中的consumer的消费关系其实是一种配对关系,当配对双方发生了变化时,kafka会进行再平衡,也就是重新确定这种配对关系,以提升系统效率、高可用性和伸缩性。当然,再平衡也会带来一些负面效果,比如在再平衡期间,consumer不能消费kafka消息,相当于这段时间内系统是不可用的。再平衡后,往往会出现消息的重复拉取和消费的现象。
触发再平衡的条件包括:
- topic的partition数发生了变化(增加或减少)
- consumer消费的topic发生了变化(通过KafkaConsumer.subscribe()方法更新了消费的topic,比如通过正则表达式订阅topic,kafka新增了topic,匹配上了订阅的topic)
- consumer group的consumer数发生了变化,如consumer重新恢复启动,有新的consumer加入,consumer宕机或退出等
需要注意的是,kafka集群broker的增减或者topic partition leader重新选主这类集群状态的变化并不会触发在平衡
有两种情况与日常应用开发比较关系比较密切:
-
Heartbeat
每个consumer会周期性的向所属的consumer group对应的group coordinator发送HeartbeatRequest心跳报告自己的状态,心跳的周期通过参数heartbeat.interval.ms设置,group coordinator如果在session.timeout.ms时间内都没有接收到某个consumer的心跳,则认为这个consumer退出group了,通过在回复其他consumer的HeartbeatRequest响应时,通知其他consumer发生了再平衡,进入在平衡过程。heartbeat心跳的发送和处理在一个单独的线程中进行。
-
consumer poll超时
consumer在每次poll消息前,会判断当前时间与上次poll的时间间隔是否大于参数max.poll.interval.ms设定的值,如果大于,那么consumer会主动发送LeaveGroup消息给group coordinator,group coordinator在接收到消息后会马上触发再平衡,而不是等待心跳超时。consumer应用程序需要仔细评估消息的处理时间合理设定该值,以避免无谓地触发再平衡。另外,consumer在执行close()方法时,也会主动发送LeaveGroup消息,让系统立即触发再平衡。
consumer在调用subscribe()方法时,支持传入一个ConsumerRebalanceListener监听器,ConsumerRebalanceListener提供了两个方法,onPartitionRevoked()方法在consumer停止消费之后,再平衡开始之前被执行。可以发现,这个地方是提交offset的好时机。onPartitonAssigned()方法则会在重新进行partition分配好了之后,但是新的consumer还未消费之前被执行。
5 kafka应用中的几个问题
5.1 kafka为什么这么快
我们在提到kafka时,首先想到的是它的吞吐量非常大,这也是很多人选择kafka作为消息传输组件的重要原因。
以下是保证kafka吞吐量大的一些设计考虑:
- 核心还是kafka集群对消息处理保持简单。消息的序列化/反序列化、压缩/解压缩、batch打包和解包、消息的分区、再平衡和offset的很多管理工作都交给了producer和consumer来处理。kafka在消息写入时只需要分配offset,然后将消息写入内存就完成了。消息消费时根据consumer提供的offset值读取相应的消息给consumer。为了保证消息的高可用,需要维护多个副本。这样极大的减少了kafka的计算量。
- 消息写入和消费均采用batch和压缩的方式,提升了网络传输的效率
- topic分区,多个分区可以并行写入和消费
- 消息写入时只写入内存,不进行磁盘操作,且以append only的方式添加到消息队列末尾
- 通过背景线程将日志flush到磁盘,文件顺序写,磁盘操作效率高
- 消息消费时采用零拷贝技术,或直接从内存文件系统缓存读取消息
- 充分利用文件系统缓存
但是kafka是不是总是这么快?我们同时需要看到kafka为了追求快舍弃了一些特性:
- kafka不支持事务,它只是简单的将从producer接收到的消息传递给consumer,而不考虑消息之间的关系,不保证多个消息的传递可靠性关系,比如原子地都传递成功。
- kafka只保证partition按接收的顺序将消息发送给consumer,而不保证topic级别的消息保序。而且,partition的顺序保证是从partition接收消息的角度来保证的,而不是从端到端业务的角度对消息保序。
- kafka本身并不保证消息只消费一次
所以,kafka在消息独立、允许少量消息丢失或重复、不关心消息顺序的场景下可以保证非常高的吞吐量,但是在需要考虑消息事务、严格保证消息顺序等场景下producer和consumer端需要进行复杂的考虑和处理,可能会比较大的降低kafka的吞吐量,例如对可靠性和保序要求比较高的控制类消息需要非常谨慎的权衡是否适合使用kafka。
5.2 kafka可靠数据传递考虑
我们通过producer向kafka集群发送消息,总是期望消息能被consumer成功消费到。最不能忍的是producer收到了kafka集群消息写入的正常响应,但是consumer仍然没有消费到消息。
kafka提供了一些机制来保证消息的可靠传递,但是有一些因素需要仔细权衡考虑,这些因素往往会影响kafka的吞吐量,需要在可靠性与吞吐量之间求得平衡:
- kakfa topic partiton的leader和follower副本都是把消息写入内存就返回响应了,如果partition的所有副本都同时宕机,可能会造成消息丢失。topic的副本数设置会关系到消息传递的可靠性,副本数越多,可靠性也越高。副本数一般可以设置为3个,可靠性要求更高的话可以设置为5个
- 消息写入时,有一个参数acks控制partition leader副本什么时候返回响应,如果acks等于0或1,在leader宕机时,可能造成消息丢失。如果需要较高的可靠性,acks需要设置为all,表示所有的ISR成员都写入了才认为写入成功了。
- ISR成员是动态的,如果ISR成员太少,比如就只有leader自己,这时即使topic副本数多,即使ISR成员都写入后才返回响应,还是可能丢失消息。需要设置min.insync.replicas参数为合适的值,至少要大于1。
- 当partition leader副本宕机时,kafka会进行重新选主,但是可能ISR只有leader自身,其他副本都落后于leader,这时如果还从其他副本中选主的话将造成消息丢失。需要将unclean.leader.election参数设置为false,这种情况下禁止重新选主。
- 当消息写入失败时,如果这种错误是可恢复的,比如tcp连接错误或无leader错误,producer有重试机制,由参数retries控制重试的次数,如果要求较高的可靠性,可以调大重试次数。
- 消息写入时可能会发生不可恢复的错误,比如发送的消息大小超过了kafka接收消息的大小,或者是应用程序逻辑的错误,应用程序需要监控跟踪这种类型的错误,清除引起消息发送错误的原因
5.3 kafka消息保序
kafka只保证partition消息顺序,不保证topic级别的顺序,而且保证的是partition写入顺序与读取顺序一致,不是业务端到端的保序。
如果对保序要求比较高,topic需要只设置一个partition。这时可以把参数max.in.flight.requests.per.connection设置为1,而retries设置为大于1的数。这样即使发生了可恢复型错误,仍然能保证消息顺序,但是如果发生不可恢复错误,应用层进行重试的话,就无法保序了。也可以采用同步发送的方式,但是这样也极大的降低了吞吐量。如果消息携带了表示顺序的字段,可以在接收端对消息进行重新排序以保证最终的有序。