kafka读书笔记-主题管理

参考图书:深入理解Kafka:核心设计与实践原理
京东购买:https://item.jd.com/12489649.html

本节结构目录:


kafka主题管理.png

主题管理

主题管理是通过kafka-topics.sh来实现的,该shell脚本的代码如下:

exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"

1. 主题的创建

  • 分区副本按照kafka既定逻辑创建,示例

    bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic test --partitions 4 --replication-factor 2
    

    注意事项:
    这里需要注意的是:--zookeeper localhost:2181/kafka 这里zookeeper的配置必须和配置文件server.properties中zookeeper.connect所配置的属性值一致,如果不一致,创建主题报错:

    Error while executing topic command : Replication factor: 2 larger than available brokers: 0.
    
  • 自定义分区逻辑创建

    命令说明如下:

    --replica-assignment <String:            A list of manual partition-to-broker   
      broker_id_for_part1_replica1 :           assignments for the topic being      
      broker_id_for_part1_replica2 ,           created or altered.                  
      broker_id_for_part2_replica1 :                                                
      broker_id_for_part2_replica2 , ...>
    

    这种方式是按照分区号的大小从小到大排列
    分区与分区之间用逗号隔开。
    分区内多个副本用冒号隔开

    使用--replica-assignment创建主题,不需要执行partitions和replication-factor 这两个参数

    示例:

    [root@slave2 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic test02 --replica-assignment 0:1,1:2,2:1,2:0 
    Created topic test02.
    

    注意事项:
    1). 同一个分区,副本不能重复,不能这样写:

    bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic test03 --replica-assignment 0:0,1:1,2:1,2:0
    2). 分区的副本数不一样,报异常:

    Partition 1 has different replication factor bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic test02 --replica-assignment 0:1,1,2:1,2:0
    3). 跳过一个分区,报异常:
    bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic test02 --replica-assignment 0:1,,2:1,2:0

  • 带配置参数的主题创建

    [root@slave2 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic test-config --partitions 1 --replication-factor 1 --config cleanup.policy=compact --config max.message.bytes=10000
    Created topic test-config.
    
  • 创建主题的注意事项

    • 创建已经存在主题, 参数 --if-not-exists的作用
    • 创建主题的时候 点 . 和 下划线_ 的注意问题
    • 机架感知主题的创建: 配置信息:broker.rack

2. 主题的查看

  • 查看所有主题

    [root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --list
    test
    test-config
    test02
    
  • 通过日志信息查看

    示例:查看前文创建的test信息

    master节点信息如下:

    [root@master kafka-logs]# pwd
    /tmp/kafka-logs
    [root@master kafka-logs]# ls -al |grep test
    drwxr-xr-x   2 root root  141 Apr 29 09:15 test-0
    drwxr-xr-x   2 root root  141 Apr 29 09:15 test-1
    

    slave1节点信息如下:

    [root@slave1 kafka-logs]# ls -al |grep test
    drwxr-xr-x   2 root root  141 Apr 29 09:15 test-1
    drwxr-xr-x   2 root root  141 Apr 29 09:15 test-2
    drwxr-xr-x   2 root root  141 Apr 29 09:15 test-3
    

    slave2节点信息如下:

    [root@slave2 kafka-logs]# ls -al |grep test
    drwxr-xr-x   2 root root  141 Apr 29 09:15 test-0
    drwxr-xr-x   2 root root  141 Apr 29 09:15 test-2
    drwxr-xr-x   2 root root  141 Apr 29 09:15 test-3
    

    三个节点一共8个文件夹,8=4(分区数)*2(每一个分区的副本数)

  • 通过zookeeper来查看

    [zk: localhost:2181(CONNECTED) 6] get /kafka/brokers/topics/test
    {"version":1,"partitions":{"2":[1,2],"1":[0,1],"3":[2,1],"0":[2,0]}}
    cZxid = 0x300000137
    ctime = Mon Apr 29 09:15:46 EDT 2019
    mZxid = 0x300000137
    mtime = Mon Apr 29 09:15:46 EDT 2019
    pZxid = 0x300000139
    cversion = 1
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 68
    numChildren = 1
    

    其中"partitions":{"2":[1,2],"1":[0,1],"3":[2,1],"0":[2,0]}表示:分区分配情况

  • 通过describe命令来查看

    [root@slave1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test
    Topic:test    PartitionCount:4    ReplicationFactor:2 Configs:
      Topic: test Partition: 0    Leader: 2   Replicas: 2,0   Isr: 2,0
      Topic: test Partition: 1    Leader: 0   Replicas: 0,1   Isr: 0,1
      Topic: test Partition: 2    Leader: 1   Replicas: 1,2   Isr: 1,2
      Topic: test Partition: 3    Leader: 2   Replicas: 2,1   Isr: 2,1
    
  • 参数 --topics-with-overrides的作用:列出包含了和集群配置不一样的主题

    [root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topics-with-overrides
    Topic:test-config PartitionCount:1    ReplicationFactor:1 Configs:cleanup.policy=compact,max.message.bytes=10000
    
  • 参数 --under-replicated-partitions 的作用:找出所有包含失效副本的分区

    将node2下线:

    [root@slave1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test --under-replicated-partitions
      Topic: test Partition: 0    Leader: 0   Replicas: 2,0   Isr: 0
      Topic: test Partition: 2    Leader: 1   Replicas: 1,2   Isr: 1
      Topic: test Partition: 3    Leader: 1   Replicas: 2,1   Isr: 1
    

    将node2上线:

    [root@slave1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test --under-replicated-partitions
    

    没有任何信息输出

  • 参数 --unavailable-partitions 的作用:查看主题中没有leader副本的分区,这些分区已经处于离线状态,对于外界的生产者和消费者来说处于不可用状态

    将node1,node2下线:

    [root@slave1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test --unavailable-partitions
      Topic: test Partition: 2    Leader: -1  Replicas: 1,2   Isr: 1
      Topic: test Partition: 3    Leader: -1  Replicas: 2,1   Isr: 1
    

3. 主题,分区,副本,日志之间的关系

一个主题:多个分区, 一个分区:多个副本, 一个副本:一个日志

123.png

4. 分区副本的分配

  • 无机架感知
  • 有机架感知

5. 修改分区(alter指令)

  • 修改分区个数

    ## 先查看主题 test-config
    [root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test-config
    Topic:test-config PartitionCount:1    ReplicationFactor:1 Configs:cleanup.policy=compact,max.message.bytes=10000
      Topic: test-config  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    
    ## 修改 test-config 分区数
    [root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic test-config --partitions 3
    WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
    Adding partitions succeeded!
    
    ## 再查看主题 test-config
    [root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test-config
    Topic:test-config PartitionCount:3    ReplicationFactor:1 Configs:cleanup.policy=compact,max.message.bytes=10000
      Topic: test-config  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
      Topic: test-config  Partition: 1    Leader: 1   Replicas: 1 Isr: 1
      Topic: test-config  Partition: 2    Leader: 2   Replicas: 2 Isr: 2
    
  • 变更主题配置:alter指令配置config参数

  • 修改一个已经覆盖默认配置的参数

    ## 修改参数 --config max.message.bytes=20000
    [root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic test-config --config max.message.bytes=20000
    WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
             Going forward, please use kafka-configs.sh for this functionality
    Updated config for topic test-config.
    
    ## 再查看
    [root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test-config
    Topic:test-config   PartitionCount:3    ReplicationFactor:1 Configs:max.message.bytes=20000,cleanup.policy=compact
        Topic: test-config  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
        Topic: test-config  Partition: 1    Leader: 1   Replicas: 1 Isr: 1
        Topic: test-config  Partition: 2    Leader: 2   Replicas: 2 Isr: 2
    
  • 修改一个没有覆盖默认配置的参数

    [root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic test-config --config segment.bytes=1048577
    WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
             Going forward, please use kafka-configs.sh for this functionality
    Updated config for topic test-config.
    
    [root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test-config
    Topic:test-config   PartitionCount:3    ReplicationFactor:1 Configs:segment.bytes=1048577,cleanup.policy=compact,max.message.bytes=20000
        Topic: test-config  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
        Topic: test-config  Partition: 1    Leader: 1   Replicas: 1 Isr: 1
        Topic: test-config  Partition: 2    Leader: 2   Replicas: 2 Isr: 2
    
  • 删除配置参数:--delete-config

    [root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic test-config --delete-config segment.bytes
    WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
             Going forward, please use kafka-configs.sh for this functionality
    Updated config for topic test-config.
    [root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test-config
    Topic:test-config   PartitionCount:3    ReplicationFactor:1 Configs:max.message.bytes=20000,cleanup.policy=compact
        Topic: test-config  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
        Topic: test-config  Partition: 1    Leader: 1   Replicas: 1 Isr: 1
        Topic: test-config  Partition: 2    Leader: 2   Replicas: 2 Isr: 2
    

6. 配置管理: 通过kafka-configs.sh脚本来实现

  • 查看配置参数说明:

    [root@master kafka_2.11-2.2.0]# bin/kafka-configs.sh 
    This tool helps to manipulate and describe entity config for a topic, client, user or broker
    Option                                 Description                            
    ------                                 -----------                            
    --add-config <String>                  Key Value pairs of configs to add.     
                                             Square brackets can be used to group 
                                             values which contain commas: 'k1=v1, 
                                             k2=[v1,v2,v2],k3=v3'. The following  
                                             is a list of valid configurations:   
                                             For entity-type 'topics':            
                                              cleanup.policy                        
                                              compression.type                      
                                              delete.retention.ms                   
                                              file.delete.delay.ms                  
                                              flush.messages                        
                                              flush.ms                              
                                              follower.replication.throttled.       
                                             replicas                             
                                              index.interval.bytes                  
                                              leader.replication.throttled.replicas 
                                              max.message.bytes                     
                                              message.downconversion.enable         
                                              message.format.version                
                                              message.timestamp.difference.max.ms   
                                              message.timestamp.type                
                                              min.cleanable.dirty.ratio             
                                              min.compaction.lag.ms                 
                                              min.insync.replicas                   
                                              preallocate                           
                                              retention.bytes                       
                                              retention.ms                          
                                              segment.bytes                         
                                              segment.index.bytes                   
                                              segment.jitter.ms                     
                                              segment.ms                            
                                              unclean.leader.election.enable        
                                           For entity-type 'brokers':             
                                              log.message.timestamp.type            
                                              ssl.client.auth                       
                                              log.retention.ms                      
                                              sasl.login.refresh.window.jitter      
                                              sasl.kerberos.ticket.renew.window.    
                                             factor                               
                                              log.preallocate                       
                                              log.index.size.max.bytes              
                                              sasl.login.refresh.window.factor      
                                              ssl.truststore.type                   
                                              ssl.keymanager.algorithm              
                                              log.cleaner.io.buffer.load.factor     
                                              sasl.login.refresh.min.period.seconds 
                                              ssl.key.password                      
                                              background.threads                    
                                              log.retention.bytes                   
                                              ssl.trustmanager.algorithm            
                                              log.segment.bytes                     
                                              max.connections.per.ip.overrides      
                                              log.cleaner.delete.retention.ms       
                                              log.segment.delete.delay.ms           
                                              min.insync.replicas                   
                                              ssl.keystore.location                 
                                              ssl.cipher.suites                     
                                              log.roll.jitter.ms                    
                                              log.cleaner.backoff.ms                
                                              sasl.jaas.config                      
                                              principal.builder.class               
                                              log.flush.interval.ms                 
                                              log.cleaner.dedupe.buffer.size        
                                              log.flush.interval.messages           
                                              advertised.listeners                  
                                              num.io.threads                        
                                              listener.security.protocol.map        
                                              log.message.downconversion.enable     
                                              sasl.enabled.mechanisms               
                                              sasl.login.refresh.buffer.seconds     
                                              ssl.truststore.password               
                                              listeners                             
                                              metric.reporters                      
                                              ssl.protocol                          
                                              sasl.kerberos.ticket.renew.jitter     
                                              ssl.keystore.password                 
                                              sasl.mechanism.inter.broker.protocol  
                                              log.cleanup.policy                    
                                              sasl.kerberos.principal.to.local.rules
                                              sasl.kerberos.min.time.before.relogin 
                                              num.recovery.threads.per.data.dir     
                                              log.cleaner.io.max.bytes.per.second   
                                              log.roll.ms                           
                                              ssl.endpoint.identification.algorithm 
                                              unclean.leader.election.enable        
                                              message.max.bytes                     
                                              log.cleaner.threads                   
                                              log.cleaner.io.buffer.size            
                                              max.connections.per.ip                
                                              sasl.kerberos.service.name            
                                              ssl.provider                          
                                              follower.replication.throttled.rate   
                                              log.index.interval.bytes              
                                              log.cleaner.min.compaction.lag.ms     
                                              log.message.timestamp.difference.max. 
                                             ms                                   
                                              ssl.enabled.protocols                 
                                              log.cleaner.min.cleanable.ratio       
                                              replica.alter.log.dirs.io.max.bytes.  
                                             per.second                           
                                              ssl.keystore.type                     
                                              ssl.secure.random.implementation      
                                              ssl.truststore.location               
                                              sasl.kerberos.kinit.cmd               
                                              leader.replication.throttled.rate     
                                              num.network.threads                   
                                              compression.type                      
                                              num.replica.fetchers                  
                                           For entity-type 'users':               
                                              request_percentage                    
                                              producer_byte_rate                    
                                              SCRAM-SHA-256                         
                                              SCRAM-SHA-512                         
                                              consumer_byte_rate                    
                                           For entity-type 'clients':             
                                              request_percentage                    
                                              producer_byte_rate                    
                                              consumer_byte_rate                    
                                           Entity types 'users' and 'clients' may 
                                             be specified together to update      
                                             config for clients of a specific     
                                             user.                                
    --alter                                Alter the configuration for the entity.
    --bootstrap-server <String: server to  The Kafka server to connect to. This   
      connect to>                            is required for describing and       
                                             altering broker configs.             
    --command-config <String: command      Property file containing configs to be 
      config property file>                  passed to Admin Client. This is used 
                                             only with --bootstrap-server option  
                                             for describing and altering broker   
                                             configs.                             
    --delete-config <String>               config keys to remove 'k1,k2'          
    --describe                             List configs for the given entity.     
    --entity-default                       Default entity name for                
                                             clients/users/brokers (applies to    
                                             corresponding entity type in command 
                                             line)                                
    --entity-name <String>                 Name of entity (topic name/client      
                                             id/user principal name/broker id)    
    --entity-type <String>                 Type of entity                         
                                             (topics/clients/users/brokers)       
    --force                                Suppress console prompts               
    --help                                 Print usage information.               
    --zookeeper <String: urls>             REQUIRED: The connection string for    
                                             the zookeeper connection in the form 
                                             host:port. Multiple URLS can be      
                                             given to allow fail-over.            
    
    
  • 查看主题

    [root@master kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test-config
    Topic:test-config PartitionCount:3    ReplicationFactor:1 Configs:max.message.bytes=20000,cleanup.policy=compact
      Topic: test-config  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
      Topic: test-config  Partition: 1    Leader: 1   Replicas: 1 Isr: 1
      Topic: test-config  Partition: 2    Leader: 2   Replicas: 2 Isr: 2
    [root@master kafka_2.11-2.2.0]# bin/kafka-configs.sh --zookeeper localhost:2181/kafka --describe --entity-type topics --entity-name test-config
    Configs for topic 'test-config' are max.message.bytes=20000,cleanup.policy=compact
    
  • 增加配置 add-config的用法

    [root@slave1 kafka_2.11-2.2.0]# bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --entity-type topics --entity-name test-config --add-config max.message.bytes=20000,cleanup.policy=compact
    Completed Updating config for entity: topic 'test-config'.
    [root@slave1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test-config
    Topic:test-config PartitionCount:3    ReplicationFactor:1 Configs:cleanup.policy=compact,max.message.bytes=20000
      Topic: test-config  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
      Topic: test-config  Partition: 1    Leader: 1   Replicas: 1 Isr: 1
      Topic: test-config  Partition: 2    Leader: 2   Replicas: 2 Isr: 2
    
  • 从zk中查看配置

    [zk: localhost:2181(CONNECTED) 6] get /kafka/config/topics/test-config
    {"version":1,"config":{"max.message.bytes":"20000","cleanup.policy":"compact"}}
    cZxid = 0x300000161
    ctime = Mon Apr 29 09:58:22 EDT 2019
    mZxid = 0x700000065
    mtime = Wed May 08 09:53:41 EDT 2019
    pZxid = 0x300000161
    cversion = 0
    dataVersion = 5
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 79
    numChildren = 0
    

    参数修改,会在/config/changes/下面生成对应的持久顺序节点,可以查看得到:

    [zk: localhost:2181(CONNECTED) 7] ls /kafka/config/changes            
    [config_change_0000000003, config_change_0000000004]
    [zk: localhost:2181(CONNECTED) 8] get /kafka/config/changes/config_change_0000000003
    {"version":2,"entity_path":"topics/test-config"}
    cZxid = 0x700000060
    ctime = Wed May 08 09:45:14 EDT 2019
    mZxid = 0x700000060
    mtime = Wed May 08 09:45:14 EDT 2019
    pZxid = 0x700000060
    cversion = 0
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 48
    numChildren = 0
    [zk: localhost:2181(CONNECTED) 9] get /kafka/config/changes/config_change_0000000004
    {"version":2,"entity_path":"topics/test-config"}
    cZxid = 0x700000067
    ctime = Wed May 08 09:53:41 EDT 2019
    mZxid = 0x700000067
    mtime = Wed May 08 09:53:41 EDT 2019
    pZxid = 0x700000067
    cversion = 0
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 48
    numChildren = 0
    
  • 删除配置 delete-config的用法

    [root@slave1 kafka_2.11-2.2.0]# bin/kafka-configs.sh --zookeeper localhost:2181/kafka --alter --entity-type topics --entity-name test-config --delete-config max.message.bytes,cleanup.policy
    Completed Updating config for entity: topic 'test-config'.
    [root@slave1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic test-config
    Topic:test-config PartitionCount:3    ReplicationFactor:1 Configs:
      Topic: test-config  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
      Topic: test-config  Partition: 1    Leader: 1   Replicas: 1 Isr: 1
      Topic: test-config  Partition: 2    Leader: 2   Replicas: 2 Isr: 2
    
    

7. 主题端参数

8. 删除主题

  • 1、删除主题的基本使用

    [root@slave1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --delete --topic test-config
    Topic test-config is marked for deletion.
    Note: This will have no impact if delete.topic.enable is not set to true.
    

    需要注意的是:这里必须将broker端配置参数delete.topic.enable设置为true,这才能删除主题。而这个值默认就是true。

  • 2、删除内部主题:报错,

    
    
  • 3、删除不存在的主题:--if-exists

    [root@slave1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --delete --topic test-config
    Error while executing topic command : Topics in [] does not exist
    [2019-05-08 10:19:24,526] ERROR java.lang.IllegalArgumentException: Topics in [] does not exist
      at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416)
      at kafka.admin.TopicCommand$ZookeeperTopicService.deleteTopic(TopicCommand.scala:377)
      at kafka.admin.TopicCommand$.main(TopicCommand.scala:68)
      at kafka.admin.TopicCommand.main(TopicCommand.scala)
     (kafka.admin.TopicCommand$)
    
    [root@slave1 kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181/kafka --delete --topic test-config --if-exists
    ##不输出任何信息
    
  • 4、手动删除主题

    主题的元数据存在zookeeper的 /brokers/topics/config/topics路径下。主题中的消息数据存储在 log.dirlog.dirs路径下。我们只需要删除这些内容即可

  • 5、通过zk客户端删除

    通过kafka-topics.sh脚本删除主题,本质上是在zk中的/admin/delete-config路径下创建一个与待删除主题同名的节点,以此标记该主题为待删除状态。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容