全局架构图
磁盘结构
记录格式
type VarInt int // 变长整型,使用Varints和ZigZag编码的整型
type RecordBatch struct {
FirstOffset int64 // 起始偏移
Length int32 // 从PartitionLeaderEpoch开始的长度
PartitionLeaderEpoch int32 // 分区Leader纪元
Magic int8 // 消息版本号,当前为2,表示V2
Crc32 int32 // crc校验和
Attributes int16 // [0-2]压缩格式, 4时间戳类型, 5是否出于事务中, 6控制消息
LastOffsetDelta int32 // 最后一个Record的offset与FirstOffset的差值,用于保证消息组装的正确性
FirstTimestamp int64 // 第一个Record的时间戳
MaxTimestamp int64 // 最后一个Record的时间戳,用于保证消息组装的正确性
ProducerID int64 // 用于支持幂等性和事务
ProducerEpoch int32 // 用于支持幂等性和事务
FirstSequence int32 // 用于支持幂等性和事务
RecordsCount int32 // RecordsCount数组元素个数
Records []Record
}
type Record struct {
Length VarInt // Record长度
Attributions int8 // 属性,暂时没用
TimestampDelta VarInt // 相对于RecordBatch的FirstTimestamp的偏移量
OffsetDelta VarInt // 相对于RecordBatch的FirstOffset的偏移量
KeyLength VarInt // key长度
Key []byte // key内容
ValueLength VarInt // value长度
Value []byte // value内容
HeadersCount VarInt // Headers数组元素个数
Headers []Header // Headers数组,用于支持应用级别扩展
}
type Header struct {
HeaderKeyLength VarInt
HeaderKey string
HeaderValueLength VarInt
HeaderValue string
}
日志文件存储
使用时间戳查找消息
- 通过时间戳日志分段索引文件名查找对应的日志分段文件
- 在该日志分段中通过二分法查找到最近的偏移量
- 通过该偏移量在偏移量日志分段索引文件中查找对应的消息位置
- 从该位置开始,向后查找,直到找到不小于指定时间戳的消息
日志清理
日志删除
- 基于时间:rog.retention.hours/minutes/ms
- 基于日志大小: log.retention.bytes
- 基于起始偏移量: DelectRecordsRequest.logStartOffset
日志压缩/合并
对于相同的key的不同value值,只保留最后一个版本。当应用仅关心消息的最新value时,可以开启日志合并功能。
- 线程会选择污浊率最高的日志文件来进行清理,污浊率=dirtyBytes/(cleanBytes+dirtyBytes)。
- log.cleaner.min.compaction.lag.ms,消息在被清理前的最小保留时间。
- 实现方式:第一次遍历日志来构建key和最后的offset的映射关系,第二次遍历判断是否需要保留,如果不需要,就删除。SkimpyOffsetMap使用md5来进计算key的哈希值,在映射时仅考虑md5,如果不同的key哈希到了同一个md5,会导致某个key对应的消息丢失,丢失率取决于md5的冲突率,冲突时用线性探测法来处理。
- 合并时,是对整个日志进行合并,所以清理之后,可能会将多个日志分段合并为一个段。
消费位移
- 保存在_comsumer_offset主题中
- 可以通过offset或者时间戳进行定位
- 利用seek功能,我们可以将消费位移保存在外部存储中
消费者重均衡
消费组分区分配策略
RangeAssignor
- 原理:对于每一个订阅的主题,按照消费者总数和主题分区数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配
- 例子:C0[T0P0,T0P1;T1P1,T1P2],C1[T0P2;T1P2]
- 缺点:在一个消费者订阅多个主题的情况下且主题分区无法整除消费者数时,会导致不均衡
- 评价:适合消费者和主题分区数能够确定且不变时,不实用,对扩容不友好,建议不要用
RoundRobinAssignor
- 原理:将消费组内所有消费者及消费者订阅的所有主题的分区按照字典序排序,然后通过轮询方式逐个将分区依次分配给每个消费者
- 例子:C0[T0P0;T0P2;T1P1],C2[T0P1;T1P0;T1P2]
- 缺点:在消费者订阅的主题不一样时,会导致不均衡
- 评价:一般情况下同一个消费组会订阅相同的主题信息,可以使用
StickyAssignor
- 原理:很复杂
- 目的:要分区的分配要尽可能均匀;分区的分配尽可能与上次分配的保持相同。
- 评价:比上面两种都好,建议使用
自定义Assignor
- 原理:实现PartitionAssignor接口
- 评价:不建议
发生时机
- 组成员数发生变更:加入组或者离开组或者被剔出组。
- 订阅主题数发生变更:正则订阅或者手动更改订阅主题数。
- 订阅主题的分区数发生变更:分区重分配。
流程
分区重分配
基本原理:先通过控制器为每个分区添加新副本(增加副本因子),待复制完成后,将旧的副本从副本清单中删除(恢复为原先的副本因子)
事务
幂等性
实现原理:
Kafka 的幂等只能保证单个生产者会话(session)中单分区的幂等。对于每一个生产者,kafka会为其分配一个pid,每一对<pid,partiton>都对应一个序列号,在生产者发送消息的时候,序列号递增。当kafka收到新消息时,如果序列号sn<so+1,则说明发生了重复写入,则丢弃;如果序列号sn>so+1,说明出现了消息乱序,抛出异常OutOfOrderSequenceException。
事务
概念:kafka的事务可以保证应用程序将多个的消费消息、生产消息、提交消费位移当作原子操作来处理,同时成功或失败,即使该生产或消费会跨多个分区。
应用场景:Consume-Transform-Produce,以支持流失计算
- 使用transactionID获取计算获取TransactionCoordinator的broker地址。
- 使用transactionID请求得到PID信息,TC在收到该请求后会将transaction和pid保存到__transaction_state中,以进行持久化。
- 生产者使用beginTransaction()开启一个事务。
- 消费-转换-生产
- 应用程序通过消费者消费到消息,转换完成后,在生产者向新的分区写入消息之前,先通过AddPartitionsToTxnRequest将新的分区记录到__transaction_state中,包括<transactionID,pid,topic-partitions>。
- 生产者向对应的分区所在的broker发送消息,消息中会包含<pid,seq_num>,注意由于写入的消息的事务控制字段都是1,所以在read_commited级别下对应用程序是不可见的。
- 通过AddOffsetsToTxnRequest将所有要提交的分区的offset的信息和group_id写入__transaction_state中,TC可以通过对应的group_id来计算出GC,GC也会保存在__transaction_state中,从而在生产者宕机后,支持后续TC的崩溃恢复。
- 生产者通过TxnOffsetCommitRequest将所有分区的偏移量条,写入到__consumer_offsets中,注意由于写入的消息的事务控制字段都是1,所以在read_commited级别下对应用程序是不可见的。
- 生产者通过EndTxnRequest向TC提交或者中止事务,TC会将PREPARE_COMMIT或PREPARE_ABORT信息写入到__transaction_state中,然后在通过WriteTxnMarkersRequest请求向分区(GC和生产者写入的分区)写入COMMIT或ABORT消息,再之后将COMPLETE_COMMIT或COMPLETE_ABORT写入到__transaction_state中。
复制
如上一主三从
- 其中2个follower在ISR集合中,1个失效follower在OSR集合中,其中min.insync.replicas=2,当ISR集合中的broker数少于2个时,该分区将禁止写入。
- ISR集合中的所有follower中最小的LEO为HW,每次follower向leader进行fetch时,会带上自身的leo,leader会计算出hw进行更新,并返回给follower。
- 每当OSR中的一个follower追上最小的LEO即HW时,该集合将会进入ISR集合中;每次follower请求拉取到leader副本leo前最新的消息时,则认为是一次caughUp,leader副本将会更新对应follower的lastCaughUpTime时间,在每replicaMaxLagTime/2一次的isr-expiration后台周期任务中,如果检查到某个follower满足now-lastCaughUpTime>replicaMaxLagTime,则将该follower将到OSR集合中。
- 每次ISR集合的变更都会被集合到isrChangeSet中,2.5s一周期的isr-changge-propagation任务会将ISR变更信息写入ZK中的/isr_change_notification/isr_change_*中,controller会通过Watcher监听到该消息,进而更新自身的元数据并向其它broker发送更新元数据的请求,然后删除isr_change节点。
- 当producer发送消息时,携带的acks参数会告诉leader需要几个节点的确认才能响应成功,leader副本写入数据到本地日志后会hold,等待其它follower将这条消息复制走,当acks-1个follower复制后,才会解除hold,响应成功。