前面我们已经讲述了生产者 https://www.jianshu.com/p/40afc57471da 消费者 https://www.jianshu.com/p/44e9e058ec5b 那么今天我们看下主题与分区。
主题,分区的作用
主题是对消息进行了归类,分区是对消息进行了二次归类,分区的划分不仅为 Kafka 提供了可伸缩性、水平扩展的功能,还通过多副本机制来为 Kafka 提供数据冗余以提高数据可靠性。
主题作用
对于主题消息队列都是一样的kafka也不例外,管理主要包括主题创建,修改,查询,删除等操作。
思考🤔
如果我们像一个没有创建的主题中发送一条消息会怎么样呢?
首先如果我们像没有创建的主题中发送一条消息,broker首先会去查看自动创建主题选项是否开启 auto .cr eate.topics .enable
是否位true。当生产者向一个尚未创建的主题发送消息时,会自动创建一个分区数为 num partitions(默认值为1)、副本因子为 default.repl cation.factor (默认值为1)的主题。或者当任意一个客户端向未知主题发送元数据请求时,都会按照配置参数 num.partitions default.replicatio .factor值来创建一个相应的主题。
看起来自动创建主题的方式很方便,很好用。但是这种方式大大增加维护成
主题、分区、副本和Log 之间的关系
我们可以使用命令 创建一个分区数为4,副本因子为2的主题。命令如下:
bin/kafka-top cs .sh - zookeeper localhost: 2181/kafka --create --topic top create --partitions 4 --replication-factor 2
创建完脚本后的日志文件展示为:
[root@node1 kafka 2.11] # ls -al /tmp/kafka-logs/ I grep topic- create
drwxr xr x 2 root root 4096 Sep 8 15: 54 topic-create-0
drwxr-xr-x 2 root root 4096 Sep 8 15: 54 topic-create-1
[root@node2 kafka 2.12]# ls -al /tmp/kafka-logs/ lgrep topic- create
drwxr-xr-x 2 root root 4096 Sep 8 15: 49 topic-create-1
drwxr-xr- x 2 root root 4096 Sep 8 15 : 49 topic create-2
drwxr-xr- x 2 root root 4096 Sep 8 15: 49 topic- create-3
[root@node3 kafka 2.13] # ls - al /tmp/kafka-logs/ lgrep topic-create
drwxr-xr- x 2 root root 4096 Sep 8 07 : 54 topic-create-0
drwxr-xr- x 2 root root 4096 Sep 8 07 : 54 topic-create-2
drwxr- xr- x 2 root root 4096 Sep 8 07 : 54 topic-create-3
可以看到 node 1节点中创建了2个文件夹 topic-create-0和topic-create-1,对应主题topic-create的2个分区编号为0和1的分区,命名方式可以概括为 topic-partition的结构。
三个broker节点一共创建了8个文件夹 ,这个数字 实质上是分区数 与副本因子 的乘积。每个副本才真正对应了一个命名形式如topic-partition的文件夹。
主题分区副本分配策略
主题、分区、副本和 Log (日志)的关系如图1所示,主题和分区都是提供给上层用户的抽象,而在副本层面或更加确切地说是 Log 面才有实际物理上的存在。同一个分区中的多个副本必须分布在不同的 broker 中,这样才能提供有效的数据冗余。对于例中的分区数为4,副本因子为2,broker 数为3情况下会按照2,3,3 的分区副本个数分配给各个 broker 是最优的选择。再比如在分区数为3,副本因子3 ,并且 broker 数同样为3的情况下,分配3,3,3的分区副本个数给各个broker是最优的选择,也就是每个 broker 中都拥有所有分区的一个个副本。
主题其他操作
主题创建
bin/kafka-top cs .sh - zookeeper localhost: 2181/kafka --create --topic top create --partitions 4 --replication-factor 2
主题查询
通过list指令可以查看当前所有可用的主题,示例如下:
[root@nodel kafka_2 .11-2. 0. 0] # bin/kafka-topics.sh --zookeeper localhost: 2181/
kafka -lis七
consumer offsets
topic-create
topic-demo
topic-config
主题删除
第一步, 删除ZooKeeper中的节点/con巨g/七opics几opic-delete。
[zk: localhost:2181/kafka (CONNECTED) 7] rm -rf /config/topics/topic-delete
第二步, 删除ZooKeeper中的节点/brokers/topics/topic-delete及其子节点。
[zk: localhost:2181/kafka (CONNECTED) 8] delete /brokers/topics/topic-delete
第三步, 删除集群中所有与主题topic-delete有关的文件。
#集群中的各个broker节点中执行rm -rf /tmp/kafka吐ogs/topic-delete*命令来删除与主题
topic-delete有关的文件
[root@nodel kafka_2.ll-2.0.0]# rm -rf /tmp/kafka-logs/topic-delete*
[root@node2 kafka_2.ll-2.0.0]# rm -rf /tmp/kafk /topic-delete*
[root@node3 kafka_2.11-2.0.0]# rm -rf /tmp/kafka-logs/topic-delete*
分区作用
分区主要包含了优先副本的选举、 分区重分配、 复制限流、修改副本因子等功能。
优先副本选举
分区使用多副本机制来提升可靠性, 但只有leader副本对外提供读写服务, 而follower副本只负责在内部进行消息的同步。如果一个分区的leader副本不可用, 那么就意味着整个分区变得不可用, 此时就需要Kafka从剩余的follower副本中挑选 一个新的leader副本来继续对外提供服务。
在创建主题的时候, 该主题的分区及副本会尽可能均匀地分布到Kafka集群的各个broker节点上,对应的leader副本的分配也比较均匀。比如我们使用kafka-topics.sh脚本创建一个分区数为3、 副本因子为3的主题topic-partitions, 创建之后的分布信息如下:
[root@nodel kafka_2.ll-2.0.0)# bin/kafka-topics.sh --zookeeper localhost:2181/
kafka --describe --topic topic-partitions
Topic:topic-partitions PartitionCount: 3 ReplicationFactor: 3 Con figs:
Topic: topic-par七itions Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: topic-partitions Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: topic-partitions Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
可以看到 leader副本均匀分布在 brokerld为0、1、2的broker节点之中。针对同一个分区而言,同一个 broker节点中不可能出现它的多个副本,即Kafka集群的一个 broker中最多只能有它的一个副本,我们 可以将leader副本所在的broker节点叫作分区的leader节点,而 follower副本所在的broker节点叫作分区的follower节点。
随着时间的更替,Kafka集群的broker节点不可避免地会遇到宅机或崩溃的问题, 当分区的leader节点发生故障时, 其中 一个follower节点就会成为新的leader节点,这样就会导致集群的负载不均衡,从而影响整体的健壮性和稳定性。 当原来的leader节点恢复之后重新加入集群时,它只能成为 一个新的follower节点而不再对外提供服务。 后面章节后讲到副本选举原理
我们将brokerld为2的节点重启, 那么主题 topic-partitions新的分布信息如下:
[root@nodel kafka_2.ll-2.0.0]# bin/kafka-topics.sh --zookeeper localhos:2181/
kafka --describe --topic topic-partitions
Topic: topic-partitions PartitionCouht: 3 ReplicationFactor: 3 Con figs:
Topic: topic-partitions Partition: 0 Leader: 1 Replicas: 1, 2, 0 Isr: 1, 0, 2
Topic: 七opic-partitions Partition: 1 Leader: 0 Replicas: 2, 0, 1 Isr: 0, 1, 2
Topic: 七opic-partitions Partition: 2 Leader: 0 Replicas: 0, 1, 2 Isr: 0, 1, 2
可以看到原本 分区 l的leader节点为2, 现在变成了0, 如此一 来原本均衡的负载变成了失衡:节点0 的负载最高,而节点1的负载最低。
kafka是怎样防止重新选举后的的负载不均衡 🤔
为了能够有效地治理负载失衡的情况,Kafka引入了优先副本(preferred replica)的概念。所谓的优先副本 是指在 AR 集合列表中的第一个副本 。 比如上面 主题 topic-partitions中 分区 0的AR集合列表(Replicas)为[1,2, 0], 那么分区0 的优先副本即为1。理想情况下, 优先副本就是该分区的leader副本, 所以也可以称之为 preferred leader。Kafka要确保所有主题的优先副本在Kafka集群中均匀分布, 这样就保证了所有分区的leader均衡 分布。 如果leader 分布过于集中,就会造成集群负载不均衡。
所谓的优先副本的选举 是指通过一定的方式促使优先副本 选举为 leader副本, 以此来促进集群的负载均衡, 这 一行为也可以称为 “分区平衡 ” 。
注意
分区平衡,并不意味着Kafka集群的负载均衡, 因为还要考虑集群中的分区分配是否均衡。 更进一步, 每个分区的leader副本的负载也是各不相同的,有些leader副本的负载很高,比如需要承载TPS为30000 的负荷, 而有些leader副本只需承载个位数的负荷 。 也就是说,就算集群中的分区 分配均衡、leader 分配均衡,也并不能确保整个集群的负载就是均衡的, 还需要其他一 些硬性的指标来做进一步的衡量, 这个后面章节会讲到。
在Kafka中可以提供分区自动平衡的功能, 与此对应的broker端参数是auto.leader.rebalance.enable, 此参数的默认值为true, 即默认情况下此功能是开启的。 如果开启分区自动平衡的功能, 则Kafka的控制器会启动一个定时任务, 这个定时任务会轮询所有的broker节点, 计算每个broker节点的分区不平衡率(broker中的不平衡率=非优先副本的leader个数/分区总数)是否超过leader.imbalance.per.broker.percentage参数配置的比值,默认值为10%, 如果超过设定的比值则会自动执行优先副本的选举动作以求分区平衡。 执行周期由参数leader.imbalance.check.interval.seconds控制, 默认值为300秒, 即
5分钟。
不过在生产环境中不建议将autua.leader.rebalance.enable 设置为默认的true, 因为这 可能引起负面的性能问题, 也有可能引起客户端 一 定时间的阻塞。 因为执行的时间无法自主掌控, 如果在关键时期(比如电商大促波峰期)执行关键任务的关卡上执行优先副本的自动选举操作, 势必会有业务阻塞、 频繁超时之类的风险。
思考🤔 分区数越多是否越好
leader 副本的转移也是一项高成本的工作, 如果要执行的分区数很多, 那么必然会对客户端造成一定的影响。如果集群中包含大量的分区, 那么上面的这种使用方式有可能会失效。在优先副本的选举过程中, 具体的元数据信息会被存入ZooKeeper 的 /admin/preferred_replica_election节点, 如果这些数据超过了ZooKeeper节点所允许的大小, 那么选举就会失败。 默认情况下ZooKeeper所允许的节点数据大小为1MB。
分区重分配
当集群中的一个节点突然宅机下线时,如果节点上的分区是单副本的, 那么这些分区就变得不可用了,在节点恢复前,相应的数据也就处于丢失状态;如果节点上的分区是多副本的,那么位于这个节点上的leader副本的角色会转交到集群的其他 follower副本中。 总而言之, 这个节点上的分区副本都已经处于功能失效的状态,Kafka并不会将这些失效的分区副本自动地迁移到集群中剩余的可用broker节点上,如果放任不管, 则不仅会影响整个集群的均衡负载,还会影响整体服务的可用性和可靠性。
当要对集群中的一个节点进行有计划的下线操作时, 为了保证分区及副本的合理分配, 我们也希望通过某种方式能够将该节点上的分区副本迁移到其他的可用节点上。当集群中新增broker节点时, 只有新创建的主题分区才有可能被分配到这个节点上, 而之前的主题分区并不会自动分配到新加入的节点中, 因为在它们被创建时还没有这个新节点, 这样新节点的负载和原先节点的负载之间严重不均衡。
kafka提供的重分区策略主要是在集群扩容和broker节点失效的情况下对分区进行迁移。
kakfa主要是通过脚本实现该方案,方案主要分为3步骤:
第一步:首先创建需要一个包含主题清单的JSON文件, 其次根据主题清单和broker节点清单生成一份重分配方案, 最后根据这份方案执行具体的重分配动作。
[root@nodel kafka_2.ll-2.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181/
kafka --describe --topic topic-demo
Topic:topic-reassign Parti七ionCount:4 ReplicationFactor: 2 Con figs:
Topic: topic-demo Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0, 2
Topic: topic-demo Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: topic-demo Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: topic-demo Partition: 3 Leader: 0 Replicas: 0,1 Isr: 0,1
我们可以观察到主题topic-reassign在3个节点中都有相应的分区副本分布。由于某种原因,我们想要下线brokerld为1的broker节点, 在此之前, 我们要做的就是将其上的分区副本迁移出去。使用 kafka-reassign-partitions.sh脚本的第一步就是要创建一 个JSON文件如下:
{
"topics":[
{
"topic":"topic-demo"
}
],
"version":1
}
第二步就是根据这个JSON文件和指定所要分配的broker节点列表来生成 一 份候选的重分配方案, 具体内容参考如下:
[root@nodel kafka 2. 11-2.0.0]# bin/kafka-reassign-partitions.sh --zookeeper
localhost:2181/kafka --generate --topics一 to-move-json-file reassign. json
--broker-list 0,2
Current partition replica assignment
{"version": 1, "partitions": [ {"topic": "topic-demo", "partition":2,"replicas
": [2, 1], "log_dirs": ["any", "any"] } , {"topic": "topic-demo", "partition": 1, "repli
cas": [ 1, 0], "log_dirs": ["any", "any"] } , {"topic": "topic-demo", "partition": 3, "re
plicas": [ 0, 1], "log_dirs": ["any", "any"] } , {"topic": "topic-demo", "parti七ion":0,
"replicas": [ 0, 2], "log_dirs": ["any", "any"]}]}
Proposed partition reassignment configuration
{"version":l,"parti七ions": [ {"七opic":"topic-demo","parti七ion":2,"replicas
": [2, 0], "log_dirs": ["any", "any"] } , {"topic": "topic-demo", "partition": 1, "repli
cas": [0, 2], "log_dirs": ["any", "any"]}, {"七opic":"topic-demo","partition":3,"re
plicas": [ 0, 2], "log_dirs": ["any", "any"] } , {"topic": "topic-demo", "partition": 0,
"replicas": [2, 0, "log_di工 s": ["any", "any"]}]}
第三步执行具体的重分配动作, 详细参考如下:
[root@nodel kafka_2 .11-2. 0. 0] # bin/kafka-reassign-parti tions. sh --zookeeper
localhost:2181/kafk a --execute --reassigrunent-json-file project.json
修改副本因子
修改副本因子的使用场景也很多, 比如在创建主题时填写了错误的副本因子数而需要修改, 再比如运行一段时间之后想要通过增加副本因子数来提高容错性和可靠性。修改副本因子也可以按照分区重分配的方式实现。
思考🤔 分区数越多吞吐量就越高吗
抛开硬件资源的影响, 消息写入的吞吐量还会受到消息大小 、 消息压缩方式、 消息发送方式(同步/异步)、 消息确认类型(acks)、 副本因子等参数的影响, 消息消费的吞吐量还会受到应用逻辑处理速度的影响的情况下。
实验证明分区数为1时吞吐量最低, 随着分区数的增长, 相应的吞吐量也跟着上涨。 一旦分区数超过了某个阙值之后, 整体的吞吐量是不升反降的。
分区可以保证有时序吗
可以,一般我们在做业务的时候可以根据消息的key进行分区,这样相同的key就会被分到同一个分区,然后消费者从同一个分区消费就可以保证消息的有序性。
该文章是我读了深入理解kafka核心设计与实践原理的感受和总结希望可以帮到大家