一、 背景
3节点kafka集群 ,broker id 分别为 0 1 2 。
二、 改造操作
$ /data/kafka/bin/kafka-topics.sh --zookeeper 192.168.1.110:2181 --topic testTopic --describe
Topic:testTopic PartitionCount:16 ReplicationFactor:1 Configs:
Topic: testTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: testTopic Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: testTopic Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: testTopic Partition: 3 Leader: 0 Replicas: 0 Isr: 0
Topic: testTopic Partition: 4 Leader: 2 Replicas: 2 Isr: 2
Topic: testTopic Partition: 5 Leader: 1 Replicas: 1 Isr: 1
Topic: testTopic Partition: 6 Leader: 0 Replicas: 0 Isr: 0
Topic: testTopic Partition: 7 Leader: 2 Replicas: 2 Isr: 2
Topic: testTopic Partition: 8 Leader: 1 Replicas: 1 Isr: 1
Topic: testTopic Partition: 9 Leader: 0 Replicas: 0 Isr: 0
Topic: testTopic Partition: 10 Leader: 2 Replicas: 2 Isr: 2
Topic: testTopic Partition: 11 Leader: 1 Replicas: 1 Isr: 1
Topic: testTopic Partition: 12 Leader: 0 Replicas: 0 Isr: 0
Topic: testTopic Partition: 13 Leader: 2 Replicas: 2 Isr: 2
Topic: testTopic Partition: 14 Leader: 1 Replicas: 1 Isr: 1
Topic: testTopic Partition: 15 Leader: 0 Replicas: 0 Isr: 0
$ cat topic-generate.json
{
"topics": [
{
"topic": "testTopic"
}
],
"version": 1
}
$ /data/kafka/bin/kafka-reassign-partitions.sh --zookeeper 192.168.1.110:2181 --topics-to-move-json-file topic-generate.json --broker-list "0,1,2" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"testTopic","partition":15,"replicas":[0],"log_dirs":["any"]},{"topic":"testTopic","partition":2,"replicas":[1],"log_dirs":["any"]},{"topic":"testTopic","partition":7,"replicas":[2],"log_dirs":["any"]},{"topic":"testTopic","partition":12,"replicas":[0],"log_dirs":["any"]},{"topic":"testTopic","partition":4,"replicas":[2],"log_dirs":["any"]},{"topic":"testTopic","partition":11,"replicas":[1],"log_dirs":["any"]},{"topic":"testTopic","partition":10,"replicas":[2],"log_dirs":["any"]},{"topic":"testTopic","partition":8,"replicas":[1],"log_dirs":["any"]},{"topic":"testTopic","partition":5,"replicas":[1],"log_dirs":["any"]},{"topic":"testTopic","partition":13,"replicas":[2],"log_dirs":["any"]},{"topic":"testTopic","partition":1,"replicas":[2],"log_dirs":["any"]},{"topic":"testTopic","partition":14,"replicas":[1],"log_dirs":["any"]},{"topic":"testTopic","partition":9,"replicas":[0],"log_dirs":["any"]},{"topic":"testTopic","partition":6,"replicas":[0],"log_dirs":["any"]},{"topic":"testTopic","partition":0,"replicas":[0],"log_dirs":["any"]},{"topic":"testTopic","partition":3,"replicas":[0],"log_dirs":["any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"testTopic","partition":14,"replicas":[2],"log_dirs":["any"]},{"topic":"testTopic","partition":3,"replicas":[0],"log_dirs":["any"]},{"topic":"testTopic","partition":8,"replicas":[2],"log_dirs":["any"]},{"topic":"testTopic","partition":0,"replicas":[0],"log_dirs":["any"]},{"topic":"testTopic","partition":11,"replicas":[2],"log_dirs":["any"]},{"topic":"testTopic","partition":5,"replicas":[2],"log_dirs":["any"]},{"topic":"testTopic","partition":13,"replicas":[1],"log_dirs":["any"]},{"topic":"testTopic","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"testTopic","partition":10,"replicas":[1],"log_dirs":["any"]},{"topic":"testTopic","partition":15,"replicas":[0],"log_dirs":["any"]},{"topic":"testTopic","partition":7,"replicas":[1],"log_dirs":["any"]},{"topic":"testTopic","partition":12,"replicas":[0],"log_dirs":["any"]},{"topic":"testTopic","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"testTopic","partition":4,"replicas":[1],"log_dirs":["any"]},{"topic":"testTopic","partition":9,"replicas":[0],"log_dirs":["any"]},{"topic":"testTopic","partition":6,"replicas":[0],"log_dirs":["any"]}]}
将 Proposed partition reassignment configuration 下的 json 复制到临时文件,通 sed 去掉 多余的 部分 (,"log_dirs":["any"] )
$ sed -i 's/,"log_dirs":["any"]//g' tmp.json
如果需要将改topic的 每个分区 (partition)都改为 3 副本,我们需要将 改json 文件中的 "replicas": [x] 改为 "replicas": [0,1,2] ,可通过nodpad修改,修改后的文件内容我们导入到 partition-replica-reassignment.json
注: 如果将数据副本设为2也可以,只是这里 json文件就不好修改了,手动分配容易导致分区在各节点分布不均,设为3 (broker节点数)就不用担心这个问题
$ cat partition-replica-reassignment.json
{
"version": 1,
"partitions": [
{
"topic": "testTopic",
"partition": 14,
"replicas": [
0,
1,
2
]
},
{
"topic": "testTopic",
"partition": 3,
"replicas": [
0,
1,
2
]
},
{
"topic": "testTopic",
"partition": 8,
"replicas": [
0,
1,
2
]
},
{
"topic": "testTopic",
"partition": 0,
"replicas": [
0,
1,
2
]
},
{
"topic": "testTopic",
"partition": 11,
"replicas": [
0,
1,
2
]
},
{
"topic": "testTopic",
"partition": 5,
"replicas": [
0,
1,
2
]
},
{
"topic": "testTopic",
"partition": 13,
"replicas": [
0,
1,
2
]
},
{
"topic": "testTopic",
"partition": 2,
"replicas": [
0,
1,
2
]
},
{
"topic": "testTopic",
"partition": 10,
"replicas": [
0,
1,
2
]
},
{
"topic": "testTopic",
"partition": 15,
"replicas": [
0,
1,
2
]
},
{
"topic": "testTopic",
"partition": 7,
"replicas": [
0,
1,
2
]
},
{
"topic": "testTopic",
"partition": 12,
"replicas": [
0,
1,
2
]
},
{
"topic": "testTopic",
"partition": 1,
"replicas": [
0,
1,
2
]
},
{
"topic": "testTopic",
"partition": 4,
"replicas": [
0,
1,
2
]
},
{
"topic": "testTopic",
"partition": 9,
"replicas": [
0,
1,
2
]
},
{
"topic": "testTopic",
"partition": 6,
"replicas": [
0,
1,
2
]
}
]
}
$ /data/kafka/bin/kafka-reassign-partitions.sh --zookeeper 192.168.1.110:2181 --reassignment-json-file partition-replica-reassignment.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"testTopic","partition":15,"replicas":[0],"log_dirs":["any"]},{"topic":"testTopic","partition":2,"replicas":[1],"log_dirs":["any"]},{"topic":"testTopic","partition":7,"replicas":[2],"log_dirs":["any"]},{"topic":"testTopic","partition":12,"replicas":[0],"log_dirs":["any"]},{"topic":"testTopic","partition":4,"replicas":[2],"log_dirs":["any"]},{"topic":"testTopic","partition":11,"replicas":[1],"log_dirs":["any"]},{"topic":"testTopic","partition":10,"replicas":[2],"log_dirs":["any"]},{"topic":"testTopic","partition":8,"replicas":[1],"log_dirs":["any"]},{"topic":"testTopic","partition":5,"replicas":[1],"log_dirs":["any"]},{"topic":"testTopic","partition":13,"replicas":[2],"log_dirs":["any"]},{"topic":"testTopic","partition":1,"replicas":[2],"log_dirs":["any"]},{"topic":"testTopic","partition":14,"replicas":[1],"log_dirs":["any"]},{"topic":"testTopic","partition":9,"replicas":[0],"log_dirs":["any"]},{"topic":"testTopic","partition":6,"replicas":[0],"log_dirs":["any"]},{"topic":"testTopic","partition":0,"replicas":[0],"log_dirs":["any"]},{"topic":"testTopic","partition":3,"replicas":[0],"log_dirs":["any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.
$ /data/kafka/bin/kafka-reassign-partitions.sh --zookeeper 192.168.1.110:2181 --reassignment-json-file partition-replica-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition testTopic-15 completed successfully
Reassignment of partition testTopic-2 completed successfully
Reassignment of partition testTopic-7 completed successfully
Reassignment of partition testTopic-12 completed successfully
Reassignment of partition testTopic-4 completed successfully
Reassignment of partition testTopic-11 completed successfully
Reassignment of partition testTopic-10 completed successfully
Reassignment of partition testTopic-8 completed successfully
Reassignment of partition testTopic-5 completed successfully
Reassignment of partition testTopic-13 completed successfully
Reassignment of partition testTopic-1 completed successfully
Reassignment of partition testTopic-14 completed successfully
Reassignment of partition testTopic-9 completed successfully
Reassignment of partition testTopic-6 completed successfully
Reassignment of partition testTopic-0 completed successfully
Reassignment of partition testTopic-3 completed successfully
$ /data/kafka/bin/kafka-topics.sh --zookeeper 192.168.1.110:2181 --topic testTopic --describe
Topic:testTopic PartitionCount:16 ReplicationFactor:3 Configs:
Topic: testTopic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: testTopic Partition: 1 Leader: 2 Replicas: 0,1,2 Isr: 2,1,0
Topic: testTopic Partition: 2 Leader: 1 Replicas: 0,1,2 Isr: 1,0,2
Topic: testTopic Partition: 3 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: testTopic Partition: 4 Leader: 2 Replicas: 0,1,2 Isr: 2,1,0
Topic: testTopic Partition: 5 Leader: 1 Replicas: 0,1,2 Isr: 1,0,2
Topic: testTopic Partition: 6 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: testTopic Partition: 7 Leader: 2 Replicas: 0,1,2 Isr: 2,1,0
Topic: testTopic Partition: 8 Leader: 1 Replicas: 0,1,2 Isr: 1,0,2
Topic: testTopic Partition: 9 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: testTopic Partition: 10 Leader: 2 Replicas: 0,1,2 Isr: 2,1,0
Topic: testTopic Partition: 11 Leader: 1 Replicas: 0,1,2 Isr: 1,0,2
Topic: testTopic Partition: 12 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: testTopic Partition: 13 Leader: 2 Replicas: 0,1,2 Isr: 2,1,0
Topic: testTopic Partition: 14 Leader: 1 Replicas: 0,1,2 Isr: 1,0,2
Topic: testTopic Partition: 15 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
要熟练使用 Kafka 自带的 kafka-reassign-partitions.sh 脚本工具来完成对 topic 的分区分配、分区副本增加等操作。
该脚本有三个参数:
--generate:配合着 --topics-to-move-json-file 可以生成分区分配策略,该参数适用于分区多的情况。
--execute: 配合着 --reassignment-json-file 可以执行分区分配策略。
--verify: 配合着 --reassignment-json-file 可以检查分区分配进度。
通过以上命令,既可以分配分区,也可以增加分区副本数,非常方便。
如果要kafak中新创的 topic ,默认就多副本,需要在配置文件中显式声明,然后重启 kafka进程。
$ cat /kingdee/kafka/config/server.properties
num.partitions=1
default.replication.factor=3
设置默认的 num.partitions=1 而不是 大于1 ,是为了 避免出现 有些topic 确实需要单分区,而代码中又没有声明,从而发出多单分区的情况。
设置默认的 default.replication.factor=3, 如果代码中没有指定副本数,那么 读此配置,副本为 3.
代码里显式指定,类似shell命令行传递环境变量,优先级高于配置文件 ,就无法配置文件能兜底了。
三、参考
在线json美化
https://toolgg.com/jsonedit
kafka 主题、分区check脚本
https://www.jianshu.com/p/d524a84d505a
https://www.jianshu.com/p/fba009885042
教你如何重新分布kafka分区、增加分区副本数
https://cloud.tencent.com/developer/article/1755177
Kafka动态增加Topic的副本
https://www.cnblogs.com/xiao987334176/p/10315176.html
kafka修改分区、副本数、副本迁移
https://sukbeta.github.io/kafka-Modify-Partitions-and-ReplicationFactor
为什么不尝试从json文件中删除所有"log_dirs": ["any"]?
https://www.656463.com/wenda/fqzxfpzKafka110zsb_258