kafka 命令行工具kcat&kafkactl简介

使用场景

  • 日常中使用工具替换繁杂的kafka命令
  • kafka安装配置依赖了JVM环境,对于需要在线上容器进行连接kafka 以排查问题场景,可以通过快速安装kafka命令行工具来实现。

下面介绍一下kcatkafkactl

1.kcat(mac 系统为例)

kcat(Kafkacat)是一个基于命令行的工具,用于与Apache Kafka进行交互。它提供了一种简单而灵活的方式来发送和接收消息,并执行其他与Kafka相关的操作。

(1)安装

brew install kcat

(2)发送消息

## 从标准输入读取
  ... | kcat -b <broker> -t <topic> -p <partition>
  
## 或者直接发送
  kcat -P -b ...

(3)消费消息

## Consumer mode (writes messages to stdout):
  kcat -b <broker> -t <topic> -p <partition>
## or:
  kcat -C -b ...

(4)示例
启动kafka

zookeeper-server-start /usr/local/etc/zookeeper/zoo.cfg
kafka-server-start /usr/local/etc/kafka/server.properties

创建一个test_topic

kafka-topics --bootstrap-server localhost:9092 --topic test_topic --create

启动一个消费者:

 ## Consume from all partitions from 'test_topic' topic
 kcat -C -b localhost:9092 -t test_topic

发送消息:

echo "hello,world" |kcat -b localhost:9092 -P   -t test_topic

[图片上传失败...(image-f68cbc-1719657461306)]

如果是ACL topic加上-X设置ACL参数

-- 消息发送(message.txt为kafka消息内容)

kcat -b localhost:9092 -P -X security.protocol=sasl_plaintext -X sasl.mechanism=SCRAM-SHA-256 -X sasl.username=username -X sasl.password=password  -t test_topic -l message.txt

-- 消费消息
kcat -b localhost:9092 -P -X security.protocol=sasl_plaintext -X sasl.mechanism=SCRAM-SHA-256 -X sasl.username=username -X sasl.password=password -G consumegroup   test_topic

2.kafkactl (ubuntu系统为例)

Kafkactl是一个开源的命令行工具,用于管理和操作Apache Kafka集群。它提供了一组命令和功能,使得与Kafka集群进行交互和管理变得更加方便

(1)安装

  • 下载kafkactl_5.0.6_linux_amd64.tar.gz
  • 解压
tar -xzf kafkactl_5.0.6_linux_amd64.tar.gz

(2)示例

新建config.yml

contexts:
  # default context without SASL
  default:
    brokers:
      - broker1:9092
    requestTimeout: 15s

  # sasl context as admin
  sasl-admin:
    brokers:
      - broker2:9092
    requestTimeout: 15s
    sasl:
      enabled: true
      username: username
      password: password
      mechanism: scram-sha512

current-context: sasl-admin

消息发送

cat message.json | ./kafkactl produce yourtopic --input-format=json --config-file=./config.yml

or 

./kafkactl produce shopee-luckyvideo-labeling-filtered-users-test-co --file=message.json --input-format=json --config-file=./config.yml

如果执行出现报错:can't unmarshal line: unexpected end of JSON input,留意message.json文件是不是放置了格式化后的json串,需要用compact形式存放。

消息消费:

./kafkactl consume yourtopic --from-beginning --config-file=./config.yml

切换context为default

./kafkactl config use-context default --config-file=config.yml

参考文档

https://github.com/edenhill/kcat
https://github.com/deviceinsight/kafkactl

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容