Partition
Parition是物理上的概念,每个Topic包含一个或多个Partition.
Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
分片:为了提升性能,为每个topic建多个partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。若创建topic1和topic2两个topic,且分别有13个和19个分区,则整个集群上会相应会生成共32个文件夹(本文所用集群共8个节点,此处topic1和topic2 replication-factor均为1)
消息持久化文件结构:
$KAFKA_HOME/config/server.properties
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according to the retention policies
log.retention.check.interval.ms=300000
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
#新建topic默认的partion数量
num.partitions=10
副本:
消费者组:
如果需要实现广播,只要每个Consumer有一个独立的Group就可以了。要实现单播只要所有的Consumer在同一个Group里。用Consumer Group还可以将Consumer进行自由的分组而不需要多次发送消息到不同的Topic。
与rocketmq机制有点不同,严格来说只是对应了rocketmq的集群消费模式
push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成Consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据Consumer的消费能力以适当的速率消费消息。
对于Kafka而言,pull模式更合适。pull模式可简化broker的设计,Consumer可自主控制消费消息的速率,同时Consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。
kafka的高可用
复制replication
一个topic有多个partition,为了尽可能的发挥负载均衡的作用,那partion就要大于broker的数据,然后尽可能的均衡分布到每台broker上。
同一个partion有多个副本,分布在不同的机器上。当然不能再同一个broker上,一挂全挂玩毛线。
Kafka分配Replica的算法如下:
将所有Broker(假设共n个Broker)和待分配的Partition排序
将第i个Partition分配到第(i mod n)个Broker上
将第i个Partition的第j个Replica分配到第((i + j) mod n)个Broker上
leader选举机制
为了保证数据一致性,必须在这些replica中选出leader
rocketmq是写死的master 、slave ,简单粗暴好用。
zookeeper保存了集群状态信息,有多少topic 路由 分区信息 副本信息 分区leader信息
和大部分分布式系统一样,Kafka处理失败需要明确定义一个Broker是否“活着”。对于Kafka而言,Kafka存活包含两个条件,一是它必须维护与Zookeeper的session(这个通过Zookeeper的Heartbeat机制来实现)。二是Follower必须能够及时将Leader的消息复制过来,不能“落后太多”
ISR(即in-sync Replica)
Kafka在Zookeeper中动态维护了一个ISR(in-sync replicas),这个ISR里的所有Replica都跟上了leader,只有ISR里的成员才有被选为Leader的可能。在这种模式下,对于f+1个Replica,一个Partition能在保证不丢失已经commit的消息的前提下容忍f个Replica的失败。在大多数使用场景中,这种模式是非常有利的。事实上,为了容忍f个Replica的失败,Majority Vote和ISR在commit前需要等待的Replica数量是一样的,但是ISR需要的总的Replica的个数几乎是Majority Vote的一半。
leader选举不是通过zookeeper,而是在所有broker中选择一个controller。所有Partition的Leader选举都由controller决定
ontroller会将Leader的改变直接通过RPC的方式(比Zookeeper Queue的方式更高效)通知需为此作出响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。
controller怎么failover
controller挂了后,大家竞选,zookeeper选出其中一个,更新集群状态信息
Partition重新分配
什么时候触发?
consumer怎么集群消费
以partition为单位,统一个consumergroup,每个consumer负责其中一部分。
当有新的consumer节点加入、或删除时,需要重新分配
rebalance
目前,最新版(0.8.2.1)Kafka的Consumer Rebalance的控制策略是由每一个Consumer通过在Zookeeper上注册Watch完成的。每个Consumer被创建时会触发Consumer Group的Rebalance,具体启动流程如下:
High Level Consumer启动时将其ID注册到其Consumer Group下,在Zookeeper上的路径为/consumers/[consumer group]/ids/[consumer id]
在/consumers/[consumer group]/ids上注册Watch
在/brokers/ids上注册Watch
如果Consumer通过Topic Filter创建消息流,则它会同时在/brokers/topics上也创建Watch
强制自己在其Consumer Group内启动Rebalance流程