准备测试环境
创建测试 topic:
bin/kafka-topics.sh --zookeeper localhost:2181 --create --partitions 5 --replication-factor 1 --topic test02
生产测试数据:
bin/kafka-producer-perf-test.sh --topic test02 --num-records 5000000 --throughput -1 --record-size 100 --producer-props bootstrap.servers=localhost:9092 acks=-1
然后在该终端下启动一个console consumer程序,组名设置为 test-group:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test02 --from-beginning --consumer-property group.id=test-group
待运行一段时间后按下 Ctrl+C
关闭 consumer,之后运行 kafka-consumer-groups.sh 脚本,查看当前group的消费进度:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --describe
重设位移示例
(1)--to-earliest 把位移调整到分区当前最早位移处
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-earliest --execute
输出信息:
TOPIC PARTITION NEW-OFFSET
test02 2 0
test02 1 0
test02 0 0
test02 3 0
test02 4 0
所有分区的位移都已经被重设到0,即当前最早位移处。
(2)--to-latest 把位移调整到分区当前最新位移处
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-latest --execute
输出信息:
TOPIC PARTITION NEW-OFFSET
test02 2 1000000
test02 1 1000000
test02 0 1000000
test02 3 1000000
test02 4 1000000
所有分区的位移都已经被重设为最新位移,即1000000。
(3)--to-offset <offset> 把位移调整到指定位移处
下面将所有分区的位移都重设到当前的一半,即1000000/2=500000:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-offset 500000 --execute
输出信息:
TOPIC PARTITION NEW-OFFSET
test02 2 500000
test02 1 500000
test02 0 500000
test02 3 500000
test02 4 500000
(4)--to-current 把位移调整到分区当前位移处
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-current --execute
输出信息:
TOPIC PARTITION NEW-OFFSET
test02 2 500000
test02 1 500000
test02 0 500000
test02 3 500000
test02 4 500000
实际上位移未移动,距上一步没有变动。
(5)--shift-by N 把位移调整到当前位移+N处(N可以是负值)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --shift-by -100000 --execute
输出信息:
TOPIC PARTITION NEW-OFFSET
test02 2 400000
test02 1 400000
test02 0 400000
test02 3 400000
test02 4 400000
所有分区的位移被移动到(500000-100000)=400000。
(6)--to-datetime 把位移调整到大于给定时间的最早位移处
datatime 格式是 yyyy-MM-ddTHH:mm:ss.xxx
,比如 2017-08-04T00:00:00.000
。
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --to-datetime 2018-11-06T14:30:00.000
(7)--by-duration<duration> 把位移调整到距离当前时间指定间隔的位移处
duration 格式是 PnDTnHnMnS
,比如 PT0H5M0S
。
将所有分区位移调整为30分钟之前的最早位移:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --all-topics --by-duration PT0H30M0S