参考图书:深入理解Kafka:核心设计与实践原理
京东购买:https://item.jd.com/12489649.html
本节结构目录:

主题管理
主题管理是通过kafka-topics.sh来实现的,该shell脚本的代码如下:
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
1. 主题的创建
-
分区副本按照kafka既定逻辑创建,示例
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic test --partitions 4 --replication-factor 2注意事项:
这里需要注意的是:--zookeeper localhost:2181/kafka 这里zookeeper的配置必须和配置文件server.properties中zookeeper.connect所配置的属性值一致,如果不一致,创建主题报错:Error while executing topic command : Replication factor: 2 larger than available brokers: 0. -
自定义分区逻辑创建
命令说明如下:
--replica-assignment <String: A list of manual partition-to-broker broker_id_for_part1_replica1 : assignments for the topic being broker_id_for_part1_replica2 , created or altered. broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...>这种方式是按照分区号的大小从小到大排列
分区与分区之间用逗号隔开。
分区内多个副本用冒号隔开使用--replica-assignment创建主题,不需要执行partitions和replication-factor 这两个参数
示例:
[root@slave2 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic test02 --replica-assignment 0:1,1:2,2:1,2:0 Created topic test02.注意事项:
1). 同一个分区,副本不能重复,不能这样写:bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic test03 --replica-assignment 0:0,1:1,2:1,2:0
2). 分区的副本数不一样,报异常:Partition 1 has different replication factor bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic test02 --replica-assignment 0:1,1,2:1,2:0
3). 跳过一个分区,报异常:
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic test02 --replica-assignment 0:1,,2:1,2:0 -
带配置参数的主题创建
[root@slave2 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic test-config --partitions 1 --replication-factor 1 --config cleanup.policy=compact --config max.message.bytes=10000 Created topic test-config. -
创建主题的注意事项
- 创建已经存在主题, 参数 --if-not-exists的作用
- 创建主题的时候 点 . 和 下划线_ 的注意问题
- 机架感知主题的创建: 配置信息:broker.rack
2. 主题的查看
-
查看所有主题
[root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --list test test-config test02 -
通过日志信息查看
示例:查看前文创建的test信息
master节点信息如下:
[root@master kafka-logs]# pwd /tmp/kafka-logs [root@master kafka-logs]# ls -al |grep test drwxr-xr-x 2 root root 141 Apr 29 09:15 test-0 drwxr-xr-x 2 root root 141 Apr 29 09:15 test-1slave1节点信息如下:
[root@slave1 kafka-logs]# ls -al |grep test drwxr-xr-x 2 root root 141 Apr 29 09:15 test-1 drwxr-xr-x 2 root root 141 Apr 29 09:15 test-2 drwxr-xr-x 2 root root 141 Apr 29 09:15 test-3slave2节点信息如下:
[root@slave2 kafka-logs]# ls -al |grep test drwxr-xr-x 2 root root 141 Apr 29 09:15 test-0 drwxr-xr-x 2 root root 141 Apr 29 09:15 test-2 drwxr-xr-x 2 root root 141 Apr 29 09:15 test-3三个节点一共8个文件夹,8=4(分区数)*2(每一个分区的副本数)
-
通过zookeeper来查看
[zk: localhost:2181(CONNECTED) 6] get /kafka/brokers/topics/test {"version":1,"partitions":{"2":[1,2],"1":[0,1],"3":[2,1],"0":[2,0]}} cZxid = 0x300000137 ctime = Mon Apr 29 09:15:46 EDT 2019 mZxid = 0x300000137 mtime = Mon Apr 29 09:15:46 EDT 2019 pZxid = 0x300000139 cversion = 1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 68 numChildren = 1其中
"partitions":{"2":[1,2],"1":[0,1],"3":[2,1],"0":[2,0]}表示:分区分配情况 -
通过describe命令来查看
[root@slave1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test Topic:test PartitionCount:4 ReplicationFactor:2 Configs: Topic: test Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0 Topic: test Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1 Topic: test Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: test Partition: 3 Leader: 2 Replicas: 2,1 Isr: 2,1 -
参数 --topics-with-overrides的作用:列出包含了和集群配置不一样的主题
[root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topics-with-overrides Topic:test-config PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact,max.message.bytes=10000 -
参数 --under-replicated-partitions 的作用:找出所有包含失效副本的分区
将node2下线:
[root@slave1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test --under-replicated-partitions Topic: test Partition: 0 Leader: 0 Replicas: 2,0 Isr: 0 Topic: test Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1 Topic: test Partition: 3 Leader: 1 Replicas: 2,1 Isr: 1将node2上线:
[root@slave1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test --under-replicated-partitions没有任何信息输出
-
参数 --unavailable-partitions 的作用:查看主题中没有leader副本的分区,这些分区已经处于离线状态,对于外界的生产者和消费者来说处于不可用状态
将node1,node2下线:
[root@slave1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test --unavailable-partitions Topic: test Partition: 2 Leader: -1 Replicas: 1,2 Isr: 1 Topic: test Partition: 3 Leader: -1 Replicas: 2,1 Isr: 1
3. 主题,分区,副本,日志之间的关系
一个主题:多个分区, 一个分区:多个副本, 一个副本:一个日志

4. 分区副本的分配
- 无机架感知
- 有机架感知
5. 修改分区(alter指令)
-
修改分区个数
## 先查看主题 test-config [root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test-config Topic:test-config PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact,max.message.bytes=10000 Topic: test-config Partition: 0 Leader: 0 Replicas: 0 Isr: 0 ## 修改 test-config 分区数 [root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic test-config --partitions 3 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded! ## 再查看主题 test-config [root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test-config Topic:test-config PartitionCount:3 ReplicationFactor:1 Configs:cleanup.policy=compact,max.message.bytes=10000 Topic: test-config Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test-config Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: test-config Partition: 2 Leader: 2 Replicas: 2 Isr: 2
变更主题配置:alter指令配置config参数
-
修改一个已经覆盖默认配置的参数
## 修改参数 --config max.message.bytes=20000 [root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic test-config --config max.message.bytes=20000 WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases. Going forward, please use kafka-configs.sh for this functionality Updated config for topic test-config. ## 再查看 [root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test-config Topic:test-config PartitionCount:3 ReplicationFactor:1 Configs:max.message.bytes=20000,cleanup.policy=compact Topic: test-config Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test-config Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: test-config Partition: 2 Leader: 2 Replicas: 2 Isr: 2 -
修改一个没有覆盖默认配置的参数
[root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic test-config --config segment.bytes=1048577 WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases. Going forward, please use kafka-configs.sh for this functionality Updated config for topic test-config. [root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test-config Topic:test-config PartitionCount:3 ReplicationFactor:1 Configs:segment.bytes=1048577,cleanup.policy=compact,max.message.bytes=20000 Topic: test-config Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test-config Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: test-config Partition: 2 Leader: 2 Replicas: 2 Isr: 2 -
删除配置参数:--delete-config
[root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic test-config --delete-config segment.bytes WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases. Going forward, please use kafka-configs.sh for this functionality Updated config for topic test-config. [root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test-config Topic:test-config PartitionCount:3 ReplicationFactor:1 Configs:max.message.bytes=20000,cleanup.policy=compact Topic: test-config Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test-config Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: test-config Partition: 2 Leader: 2 Replicas: 2 Isr: 2
6. 配置管理: 通过kafka-configs.sh脚本来实现
-
查看配置参数说明:
[root@master kafka_2.11-2.2.0]# bin/kafka-configs.sh This tool helps to manipulate and describe entity config for a topic, client, user or broker Option Description ------ ----------- --add-config <String> Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1, k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: For entity-type 'topics': cleanup.policy compression.type delete.retention.ms file.delete.delay.ms flush.messages flush.ms follower.replication.throttled. replicas index.interval.bytes leader.replication.throttled.replicas max.message.bytes message.downconversion.enable message.format.version message.timestamp.difference.max.ms message.timestamp.type min.cleanable.dirty.ratio min.compaction.lag.ms min.insync.replicas preallocate retention.bytes retention.ms segment.bytes segment.index.bytes segment.jitter.ms segment.ms unclean.leader.election.enable For entity-type 'brokers': log.message.timestamp.type ssl.client.auth log.retention.ms sasl.login.refresh.window.jitter sasl.kerberos.ticket.renew.window. factor log.preallocate log.index.size.max.bytes sasl.login.refresh.window.factor ssl.truststore.type ssl.keymanager.algorithm log.cleaner.io.buffer.load.factor sasl.login.refresh.min.period.seconds ssl.key.password background.threads log.retention.bytes ssl.trustmanager.algorithm log.segment.bytes max.connections.per.ip.overrides log.cleaner.delete.retention.ms log.segment.delete.delay.ms min.insync.replicas ssl.keystore.location ssl.cipher.suites log.roll.jitter.ms log.cleaner.backoff.ms sasl.jaas.config principal.builder.class log.flush.interval.ms log.cleaner.dedupe.buffer.size log.flush.interval.messages advertised.listeners num.io.threads listener.security.protocol.map log.message.downconversion.enable sasl.enabled.mechanisms sasl.login.refresh.buffer.seconds ssl.truststore.password listeners metric.reporters ssl.protocol sasl.kerberos.ticket.renew.jitter ssl.keystore.password sasl.mechanism.inter.broker.protocol log.cleanup.policy sasl.kerberos.principal.to.local.rules sasl.kerberos.min.time.before.relogin num.recovery.threads.per.data.dir log.cleaner.io.max.bytes.per.second log.roll.ms ssl.endpoint.identification.algorithm unclean.leader.election.enable message.max.bytes log.cleaner.threads log.cleaner.io.buffer.size max.connections.per.ip sasl.kerberos.service.name ssl.provider follower.replication.throttled.rate log.index.interval.bytes log.cleaner.min.compaction.lag.ms log.message.timestamp.difference.max. ms ssl.enabled.protocols log.cleaner.min.cleanable.ratio replica.alter.log.dirs.io.max.bytes. per.second ssl.keystore.type ssl.secure.random.implementation ssl.truststore.location sasl.kerberos.kinit.cmd leader.replication.throttled.rate num.network.threads compression.type num.replica.fetchers For entity-type 'users': request_percentage producer_byte_rate SCRAM-SHA-256 SCRAM-SHA-512 consumer_byte_rate For entity-type 'clients': request_percentage producer_byte_rate consumer_byte_rate Entity types 'users' and 'clients' may be specified together to update config for clients of a specific user. --alter Alter the configuration for the entity. --bootstrap-server <String: server to The Kafka server to connect to. This connect to> is required for describing and altering broker configs. --command-config <String: command Property file containing configs to be config property file> passed to Admin Client. This is used only with --bootstrap-server option for describing and altering broker configs. --delete-config <String> config keys to remove 'k1,k2' --describe List configs for the given entity. --entity-default Default entity name for clients/users/brokers (applies to corresponding entity type in command line) --entity-name <String> Name of entity (topic name/client id/user principal name/broker id) --entity-type <String> Type of entity (topics/clients/users/brokers) --force Suppress console prompts --help Print usage information. --zookeeper <String: urls> REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.
-
查看主题
[root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test-config Topic:test-config PartitionCount:3 ReplicationFactor:1 Configs:max.message.bytes=20000,cleanup.policy=compact Topic: test-config Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test-config Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: test-config Partition: 2 Leader: 2 Replicas: 2 Isr: 2 [root@master kafka_2.11-2.2.0]# bin/kafka-configs.sh --zookeeper localhost:2181/kafka --describe --entity-type topics --entity-name test-config Configs for topic 'test-config' are max.message.bytes=20000,cleanup.policy=compact -
增加配置 add-config的用法
[root@slave1 kafka_2.11-2.2.0]# bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --entity-type topics --entity-name test-config --add-config max.message.bytes=20000,cleanup.policy=compact Completed Updating config for entity: topic 'test-config'. [root@slave1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test-config Topic:test-config PartitionCount:3 ReplicationFactor:1 Configs:cleanup.policy=compact,max.message.bytes=20000 Topic: test-config Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test-config Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: test-config Partition: 2 Leader: 2 Replicas: 2 Isr: 2 -
从zk中查看配置
[zk: localhost:2181(CONNECTED) 6] get /kafka/config/topics/test-config {"version":1,"config":{"max.message.bytes":"20000","cleanup.policy":"compact"}} cZxid = 0x300000161 ctime = Mon Apr 29 09:58:22 EDT 2019 mZxid = 0x700000065 mtime = Wed May 08 09:53:41 EDT 2019 pZxid = 0x300000161 cversion = 0 dataVersion = 5 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 79 numChildren = 0参数修改,会在/config/changes/下面生成对应的持久顺序节点,可以查看得到:
[zk: localhost:2181(CONNECTED) 7] ls /kafka/config/changes [config_change_0000000003, config_change_0000000004] [zk: localhost:2181(CONNECTED) 8] get /kafka/config/changes/config_change_0000000003 {"version":2,"entity_path":"topics/test-config"} cZxid = 0x700000060 ctime = Wed May 08 09:45:14 EDT 2019 mZxid = 0x700000060 mtime = Wed May 08 09:45:14 EDT 2019 pZxid = 0x700000060 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 48 numChildren = 0 [zk: localhost:2181(CONNECTED) 9] get /kafka/config/changes/config_change_0000000004 {"version":2,"entity_path":"topics/test-config"} cZxid = 0x700000067 ctime = Wed May 08 09:53:41 EDT 2019 mZxid = 0x700000067 mtime = Wed May 08 09:53:41 EDT 2019 pZxid = 0x700000067 cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 48 numChildren = 0
-
删除配置 delete-config的用法
[root@slave1 kafka_2.11-2.2.0]# bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --entity-type topics --entity-name test-config --delete-config max.message.bytes,cleanup.policy Completed Updating config for entity: topic 'test-config'. [root@slave1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test-config Topic:test-config PartitionCount:3 ReplicationFactor:1 Configs: Topic: test-config Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic: test-config Partition: 1 Leader: 1 Replicas: 1 Isr: 1 Topic: test-config Partition: 2 Leader: 2 Replicas: 2 Isr: 2
7. 主题端参数
8. 删除主题
-
1、删除主题的基本使用
[root@slave1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --delete --topic test-config Topic test-config is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.需要注意的是:这里必须将broker端配置参数delete.topic.enable设置为true,这才能删除主题。而这个值默认就是true。
-
2、删除内部主题:报错,
-
3、删除不存在的主题:--if-exists
[root@slave1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --delete --topic test-config Error while executing topic command : Topics in [] does not exist [2019-05-08 10:19:24,526] ERROR java.lang.IllegalArgumentException: Topics in [] does not exist at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416) at kafka.admin.TopicCommand$ZookeeperTopicService.deleteTopic(TopicCommand.scala:377) at kafka.admin.TopicCommand$.main(TopicCommand.scala:68) at kafka.admin.TopicCommand.main(TopicCommand.scala) (kafka.admin.TopicCommand$)[root@slave1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --delete --topic test-config --if-exists ##不输出任何信息
-
4、手动删除主题
主题的元数据存在zookeeper的
/brokers/topics和/config/topics路径下。主题中的消息数据存储在log.dir或log.dirs路径下。我们只需要删除这些内容即可
-
5、通过zk客户端删除
通过kafka-topics.sh脚本删除主题,本质上是在zk中的/admin/delete-config路径下创建一个与待删除主题同名的节点,以此标记该主题为待删除状态。