Kafka特性
- 顺序读写的方式访问磁盘,从而避免随机读写磁盘导致的性能瓶颈
2.支持批量读写消息,并且会对消息进行批量压缩
3.支持分区,每个分区的消息保证顺序传输,而分区之间可以并发操作
4.支持在线增加分区,支持在线水平扩展
5.支持每个分区创建多个副本,其中只有一个Leader副本负责读写,其他副本只负责与Leader副本进行同步从而提高容灾能力
核心概念
Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)
Partition
parition是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件
Producer
负责发布消息到Kafka broker
Consumer
消费消息。每个consumer属于一个特定的consuer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。使用consumer high level API时,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。
ConsumerGroup
提及Consumer Group,最先想到的就是Group与Consumer Client的关联关系:
1.Consumer Group用group.id(String)作为全局唯一标识符
2.每个Group可以有零个、一个或多个Consumer Client
3.每个Group可以管理零个、一个或多个Topic
4.Group下每个Consumer Client可同时订阅Topic的一个或多个Partition
5.Group下同一个Partition只能被一个Client订阅,多Group下的Client订阅不受影响
Consumer Group的作用主要有:管理Partition的Offset信息;管理Consumer Client与Partition的分配。正因为所有Partition的Offset信息是由Group统一管理,所以如果一个Partition有多个Consumer,那么每个Consumer在该Partition上的Offset很可能会不一致,这样会导致在Rebalance后赋值处理的Client的消费起点发生混乱;与此同时,这种场景也不符合Kafka中Partition消息消费的一致性;因此在同一Group下一个Partition只能对应一个Consumer Client。
Group Coordinator
Group Coordinator是一个服务,每个Broker在启动的时候都会启动一个该服务。Group Coordinator的作用是用来存储Group的相关Meta信息,并将对应Partition的Offset信息记录到Kafka内置Topic(__consumer_offsets)中。Kafka在0.9之前是基于Zookeeper来存储Partition的Offset信息(consumers/{group}/offsets/{topic}/{partition}),因为ZK并不适用于频繁的写操作,所以在0.9之后通过内置Topic的方式来记录对应Partition的Offset。
每个Group都会选择一个Coordinator来完成自己组内各Partition的Offset信息,选择的规则如下:
1.计算Group对应在__consumer_offsets上的Partition
2.根据对应的Partition寻找该Partition的leader所对应的Broker,该Broker上的Group Coordinator即就是该Group的Coordinator
Rebalance
Rebalance是一个分区-客户端重分配协议。旨在特定条件下,基于给定的分配策略来为Group下所有Consumer Client重新分配所要订阅的Partition。Rebalance是Consumer Group中一个重要的特性,也为Group提供了High Availability and Scalability。但同样Rebalance也存在相应的弊端:在Rebalance期间,整个Group对外不可用。
Rebalance 触发条件
Group中有新Consumer加入
Group中已有的Consumer挂掉
Coordinator挂了,集群选出新Coordinator
Topic新增Partition个数
Consumer Client调用unsubscrible(),取消订阅Topic
Rebalance 过程
Rebalance的本质即就是Partition的分配;首先客户端会向Coordinator发送JGR,等待leader发送Partition分配结果到Coordinator后,然后再向Coordinator发送SGR获取分配结果。
Kafka通过Heartbeats(心跳)的方式实现Consumer Client与Coordinator之间的通信,用来相互告知对方的存在。如果Coordinator挂掉导致的Rebalance,则Kafka会重新选择一个Coordinator,然后所有的Client会执行JGR、SGR;如果由于Client的变化导致Rebalance,则会通知有效Client进行JGR、SGR。
Leader
首先Kafka会将接收到的消息分区(partition),每个主题(topic)的消息有不同的分区。这样一方面消息的存储就不会受到单一服务器存储空间大小的限制,另一方面消息的处理也可以在多个服务器上并行。
其次为了保证高可用,每个分区都会有一定数量的副本(replica)。这样如果有部分服务器不可用,副本所在的服务器就会接替上来,保证应用的持续性。
但是,为了保证较高的处理效率,消息的读写都是在固定的一个副本上完成。这个副本就是所谓的Leader,而其他副本则是Follower。而Follower则会定期地到Leader上同步数据。
如果某个分区所在的服务器除了问题,不可用,kafka会从该分区的其他的副本中选择一个作为新的Leader。之后所有的读写就会转移到这个新的Leader上。现在的问题是应当选择哪个作为新的Leader。显然,只有那些跟Leader保持同步的Follower才应该被选作新的Leader。
Follower
Kafka中partition replication之间同步数据,从partition的leader复制数据到follower只需要一个线程(ReplicaFetcherThread),实际上复制是follower(一个follower相当于consumer)主动从leader批量拉取消息的,这极大提高了吞吐量,从中可以看出无处不显示Kafka高吞吐量设计思想。
OffSet
kafka消费者在会保存其消费的进度,也就是offset
ISR
Kafka会在Zookeeper上针对每个Topic维护一个称为ISR(in-sync replica,已同步的副本)的集合,该集合中是一些分区的副本。只有当这些副本都跟Leader中的副本同步了之后,kafka才会认为消息已提交,并反馈给消息的生产者。如果这个集合有增减,kafka会更新zookeeper上的记录。
如果某个分区的Leader不可用,Kafka就会从ISR集合中选择一个副本作为新的Leader。
显然通过ISR,kafka需要的冗余度较低,可以容忍的失败数比较高。假设某个topic有f+1个副本,kafka可以容忍f个服务器不可用。
HW (High Watermark)
HW标记了一个特殊的offset,当消费者处理数据的时候,只能拉取到HW之前的数据,HW之后的数据对消费者来说是不可见对。与ISR集合类似,HW也是由Leader副本管理对。当ISR集合中全部的Follower副本都拉取HW指定消息进行同步后,Leader会递增HW的值。这个流程就是数据生产的commit
LEO (Log End Offset)
LEO是所有的副本都会有的一个offset标记,它指向追加到当前副本的最后一个消息的offset。当生产者向Leader副本追加消息的时候,Leader副本的LEO标记会增加;当Follower副本成功从Leader副本拉取消息并更新到本地的时候,Follower副本的LEO就会增加
Cluster&Controller
Kafka集群中多个broker,有一个会被选举为controller leader,负责管理整个集群中分区和副本的状态,比如partition的leader 副本故障,由controller 负责为该partition重新选举新的leader 副本;当检测到ISR列表发生变化,有controller通知集群中所有broker更新其MetadataCache信息;或者增加某个topic分区的时候也会由controller管理分区的重新分配工
保留策略
1.基于空间维度
2.基于时间维度
3.基于起始位移维度
- 基于空间维度
也称size-based retention,指的是Kafka定期为那些超过磁盘空间阈值的topic进行日志段的删除。这个阈值由broker端参数log.retention.bytes和topic级别参数retention.bytes控制,默认是-1,表示Kafka当前未开启这个留存机制,即不管topic日志量涨到多少,Kafka都不视其为“超过阈值”。如果用户要开启这种留存机制,必须显式设置log.retention.bytes(或retention.bytes)。
一旦用户设置了阈值,那么Kafka就会在定时任务中尝试比较当前日志量总大小是否超过阈值至少一个日志段的大小。这里所说的总大小是指所有日志段文件的大小,不包括索引文件的大小!如果是则会尝试从最老的日志段文件开始删起。注意这里的“超过阈值至少一个日志段的大小”,这就是说超过阈值的部分必须要大于一个日志段的大小,否则不会进行删除的,原因就是因为删除的标的是日志段文件——即文件只能被当做一个整体进行删除,无法删除部分内容。
举个例子来说明,假设日志段大小是700MB,当前分区共有4个日志段文件,大小分别是700MB,700MB,700MB和1234B——显然1234B那个文件就是active日志段。此时该分区总的日志大小是3*700MB+1234B=2100MB+1234B,如果阈值设置为2000MB,那么超出阈值的部分就是100MB+1234B,小于日志段大小700MB,故Kafka不会执行任何删除操作,即使总大小已经超过了阈值;反之如果阈值设置为1400MB,那么超过阈值的部分就是700MB+1234B > 700MB,此时Kafka会删除最老的那个日志段文件。
- 基于时间维度
也称time-based retention,指的是Kafka定期未那些超过时间阈值的topic进行日志段删除操作。这个阈值由broker端参数log.retention.ms、log.retention.mintues、log.retention.hours以及topic级别参数retention.ms控制。如果同时设置了log.retention.ms、log.retention.mintues、log.retention.hours,以log.retention.ms优先级为最高,log.retention.mintues次之,log.retention.hours最次。当前这三个参数的默认值依次是null, null和168,故Kafka为每个topic默认保存7天的日志。
这里需要讨论下这“7天”是如何界定的?在0.10.0.0之前,Kafka每次检查时都会将当前时间与每个日志段文件的最新修改时间做比较,如果两者的差值超过了上面设定的阈值(比如上面说的7天),那么Kafka就会尝试删除该文件。不过这种界定方法是有问题的,因为文件的最新修改时间是可变动的——比如用户在终端通过touch命令查看该日志段文件或Kafka对该文件切分时都可能导致最新修改时间的变化从而扰乱了该规则的判定,因此自0.10.0.0版本起,Kafka在消息体中引入了时间戳字段(当然不是单纯为了修复这个问题),并且为每个日志段文件都维护一个最大时间戳字段。通过将当前时间与该最大时间戳字段进行比较来判定是否过期。使用当前最大时间戳字段的好处在于它对用户是透明的,用户在外部无法直接修改它,故不会造成判定上的混乱。
最大时间戳字段的更新机制也很简单,每次日志段写入新的消息时,都会尝试更新该字段。因为消息时间戳通常是递增的,故每次写入操作时都会保证最大时间戳字段是会被更新的,而一旦一个日志段写满了被切分之后它就不再接收任何新的消息,其最大时间戳字段的值也将保持不变。倘若该值距离当前时间超过了设定的阈值,那么该日志段文件就会被删除。
- 基于起始位移维度
用户对前两种留存机制实际上是相当熟悉的,下面我们讨论下第三种留存机制:基于日志起始位移(log start offset)。这实际上是0.11.0.0版本新增加的功能。其实增加这个功能的初衷主要是为了Kafka流处理应用——在流处理应用中存在着大量的中间消息,这些消息可能已经被处理过了,但依然保存在topic日志中,占用了大量的磁盘空间。如果通过设置基于时间维度的机制来删除这些消息就需要用户设置很小的时间阈值,这可能导致这些消息尚未被下游操作算子(operator)处理就被删除;如果设置得过大,则极大地增加了空间占用。故社区在0.11.0.0引入了第三种留存机制:基于起始位移
所谓起始位移,就是指分区日志的当前起始位移——注意它是分区级别的值,而非日志段级别。故每个分区都只维护一个起始位移值。该值在初始化时被设置为最老日志段文件的基础位移(base offset),随着日志段的不断删除,该值会被更新当前最老日志段的基础位移。另外Kafka提供提供了一个脚本命令帮助用户设置指定分区的起始位移:kafka-delete-records.sh。
该留存机制是默认开启的,不需要用户任何配置。Kafka会为每个日志段做这样的检查:1. 获取日志段A的下一个日志段B的基础位移;2. 如果该值小于分区当前起始位移则删除此日志段A。
日志压缩
- None
- gzip
- snappy
- lz4
kafka从0.7版本就开始支持压缩功能:
1.kafka的发送端将消息按照批量(如果批量设置一条或者很小,可能有相反的效果)的方式进行压缩。
2.服务器端直接将压缩消息保存(特别注意,如果kafka的版本不同,那么就存在broker需要先解压缩再压缩的问题,导致消耗资源过多)。
3.消费端自动解压缩,测试了下,发送端无论采用什么压缩模式,消费端无论设置什么解压模式,都可以自动完成解压缩功能。
4.压缩消息可以和非压缩消息混存,也就是说如果你kafka里面先保存的是非压缩消息,后面改成压缩,不用担心,kafka消费端自动支持。