kafka学习笔记(九)常见操作

(九)kafka常见操作

A. 基础操作

  • 创建主题

    • 命令:./kafka-topics.sh --create --zookeeper xxx.xxx.xxx.xxx:2181 --replication-factor 2 --partitions 3 --topic test
    • 注意事项:新版的kafka的kafka-topics脚本也用--bootstrap-server替代了--zookeeper,直接代入kafka的ip和端口(比如localhost:9092)。
  • 查看主题

    • 命令:./kafka-topics.sh --describe --topic test --zookeeper xxx.xxx.xxx.xxx:2181
  • 模拟生产者

    • 命令:./kafka-console-producer.sh --broker-list xxx.xxx.xxx.xxxx:9092 --topic test
  • 模拟消费者

    • 命令:./kafka-console-consumer.sh --bootstrap-server xxx.xxx.xxx.xxxx:9092 --topic test --from-beginning
    • 注意事项:该脚本下,只支持新的--bootstrap-server字段。

B. 集群扩容节点

  • 背景概述

    • kafka cluster若需要增加节点,必须将原有topic进行重新分区分配(主要是原有分区的数据迁移),不然会导致broker端负载不均衡(具体内容可参考第六章)。
    • 原集群共有3台brokers(0,1,2),现增加2台brokers(3 & 4)。原有topics两个,分别为push_event & third_event,replicas设置为2。
  • 操作步骤

    1. 预准备工作

      • 需要重新分区分配的topic,将其写入对应的json文本,move.json的具体内容为
      {
       "topics":[
        {
         "topic": "push_event"
        },
        {
         "topic": "third_event"
        }
       ]
      }
      
    2. 执行算法脚本获取分配方案

      • 使用kafka的kafka-reassign-partitions.sh脚本,将第一步的move.json作为参数获取新的分区分配方案。
      • 具体命令为./bin/kafka-reassign-partitions.sh --zookeeper your_zk_address:2181 --topics-to-move-json-file move.json --broker-list "0,1,2,3,4" --generate,其中broker list中的内容就是具体的broker id。
    3. 执行分配方案

      • 执行完第二步后会返回分配结果,由两部分组成current方案 & proposed方案

      • current方案作为备份为回滚做准备,存储为backup.json(存储时,第一行省略)。

        Current partition replica assignment
        {
          "version":1,
          "partitions":[
             {"topic":"open_push_event","partition":2,"replicas":[0,1]},              
             {"topic":"open_push_event","partition":4,"replicas":[2,1]},    
             {"topic":"open_push_event","partition":3,"replicas":[1,0]}, 
             {"topic":"open_push_event","partition":0,"replicas":[1,2]}, 
             {"topic":"open_push_event","partition":1,"replicas":[2,0]}
          ]
        }
        
      • proposed方案则是本次更新分区的方案,存储为update.json(存储时,第一行省略)。

        Proposed partition reassignment configuration
        {
          "version":1,
          "partitions":[
            {"topic":"think_tank","partition":2,"replicas":[2,0]},    
            {"topic":"think_tank","partition":4,"replicas":[4,2]},     
            {"topic":"think_tank","partition":3,"replicas":[3,1]},     
            {"topic":"think_tank","partition":0,"replicas":[0,3]}, 
            {"topic":"think_tank","partition":1,"replicas":[1,4]}
          ]
        }
        
      • 使用kafka-reassign-partitions.sh执行分区分配,传入update.json作为具体参数,命令为./bin/kafka-reassign-partitions.sh --zookeeper your_zk_address:2181 --reassignment-json-file update.json --execute,若数据较大则该动作会持续较久。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容