kafka常用命令
- 列出(list)所有topic
$ /opt/kafka/bin/kafka-topics.sh --list \
--zookeeper zookeeper1:2181,zookeeper2:2181
test-topic
test-topic2
- 描述(describe)topic
$ /opt/kafka/bin/kafka-topics.sh --describe \
--zookeeper zookeeper1:2181,zookeeper2:2181 \
--topic test-topic
Topic:test2 PartitionCount:2 ReplicationFactor:3 Configs:
Topic: test-topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test-topic Partition: 1 Leader: 2 Replicas: 1,2,4 Isr: 1,2,4
- 查看偏移(offset)
$ /opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list kafka1:9092,kafka2:9092,kafka3:9092 \
--topic test-topic
test-topic:0:5
test-topic:1:2
// topicName:partitionID:offset
有点遗憾的是,如果配置了SASL,GetOffsetShell好像不能工作了:GetOffsetShell command doesn't work with SASL enabled Kafka.
还可以显式指定参数--time,表示获取现存最早(-2)的或者最新(-1)的。缺省值是最新的(-1):
-2: 还存在topic中的最早(the first available message)的偏移:
$ /opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list kafka1:9092,kafka2:9092 \
--topic test-topic \
--time -2
test-topic:0:0
-1: 还存在topic中的最新(latest available message)的偏移:
$ /opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list kafka1:9092,kafka2:9092 \
--topic test-topic \
--time -1
test-topic:0:5
- 创建(create)topic
$ /opt/kafka/bin/kafka-topics.sh --create \
--zookeeper zookeeper1:2181,zookeeper2:2181 \
--replication-factor 2 \
--partitions 2 \
--topic new-topic
在创建topic的时候还可以指定topic级别的属性(--config x=y),例如:
/opt/kafka/bin/kafka-topics.sh --create \
--topic new-topic \
... \
--config max.message.bytes=64000 \
--config flush.messages=1
在这个例子中,我们指定了new-topic的两个topic级别的属性:max message size和flush rate。
另外如果配置了SASL ACL那么需要指定JVM参数java.security.auth.login.config,例如:
$ export KAFKA_OPTS="-Djava.security.auth.login.config=sasl/kafka_client_jaas.conf"
- 删除topic
$ /opt/kafka/bin/kafka-topics.sh --delete \
--zookeeper zookeeper1:2181,zookeeper2:2181,zookeeper3:2181 \
--topic old-topic
- 启动生产者(producer)
$ /opt/kafka/bin/kafka-console-producer.sh \
--broker-list kafka1:9092,kafka2:9092 \
--topic mytopic
如果启动有SASL ACL那么需要配置:
export KAFKA_OPTS="-Djava.security.auth.login.config=sasl/kafka_client_jaas.conf"
/opt/kafka/bin/kafka-console-producer.sh \
--broker-list kafka:9092 \
--topic mytopic \
--producer.config sasl/client-sasl.properties
- 启动消费者(consumer)
$ /opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka1:9092,kafka2:9092 \
--topic mytopic --from-beginning --partition 0
注意此时消费者会一直挂着等待新消息,尽管已经处理完了消息队列中已有的消息,而不会退出;可以使用选项--timeout-ms 5000
来表示等待5秒没有新消息则退出。
从指定位置开始消费:
$ /opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka0:9092,kafka1:9092,kafka2:9092,kafka3:9092 \
--topic my-topic --partition 0 --offset 2
同样如果启用了SASL ACL,则需要配置:
$ export KAFKA_OPTS="-Djava.security.auth.login.config=sasl/kafka_client_jaas.conf"
/opt/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--topic my-topic \
--consumer.config sasl/client-sasl.properties \
--from-beginning
- 修改配置项
查看现在的配置项值:
$ /opt/kafka/bin/kafka-configs.sh \
--zookeeper zookeeper0:2181 \
--describe \
--entity-type topics \
--entity-name my-topic
修改配置项值:
$ /opt/kafka/bin/kafka-configs.sh \
--zookeeper zookeeper0:2181 \
--alter \
--entity-type topics \
--entity-name my-topic \
--add-config retention.ms=1000
删除配置项,即恢复到缺省值:
$ /opt/kafka/bin/kafka-configs.sh \
--zookeeper zookeeper0:2181 \
--alter \
--entity-type topics \
--entity-name my-topic \
--delete-config retention.ms
- consumer group相关的命令
9.1. 列出所有的consumer group
/opt/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--list
如果配置了SASL,则需要指定SASL认证信息:
$ export KAFKA_OPTS="-Djava.security.auth.login.config=sasl/kafka_client_jaas.conf"
$ /opt/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--list \
--command-config sasl/client-sasl.properties
# cat sasl/kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafkaclient"
password="kafkaclientpwd";
};
# cat sasl/client-sasl.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
9.2. 列出所有的consumer group的offset
$ /opt/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--describe \
--group group-name
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-topic 0 - 6 - consumer-1-ce6960ac-0e30-4eff-ab02-13c202f9182f /172.19.0.9 consumer-1
选项--describe,还有一些辅助参数:
--members list of all active members in the consumer group
--members --verbose
--offsets
--state
9.3. 删除 consumer group
/opt/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--delete --group my-group \
--group my-other-group
9.4. 重置consumer group的offset
/opt/kafka/bin/kafka-consumer-groups.sh \
--bootstrap-server kafka:9092 \
--reset-offsets \
--group my-group \
--topic my-topic
--to-latest
最后一个--to-latest表示如何重置:
--to-datetime <String: datetime> : Reset offsets to offsets from datetime. \
Format: 'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest : Reset offsets to earliest offset.
--to-latest : Reset offsets to latest offset.
--shift-by <Long: number-of-offsets> : Reset offsets shifting current offset by 'n',
where 'n' can be positive or negative.
--from-file : Reset offsets to values defined in CSV file.
--to-current : Resets offsets to current offset.
--by-duration <String: duration> : Reset offsets to offset by duration from current timestamp.
Format: 'PnDTnHnMnS'
--to-offset : Reset offsets to a specific offset.