Kafka 删除一个topic的旧消息

1.高版本的kafka,提供了直接删除n条消息的操作方法。
脚本内容地址:

https://github.com/apache/kafka/blob/trunk/bin/kafka-delete-records.sh

使用这个脚本, 配套的还有一个json文件。 新建一个json文件,内容如下,里面指定了partition和offset. 然后把这个文件保存为 offset.json

{"partitions": [{"topic": "mytest", "partition": 0, "offset": 90}], "version":1 }

这时候调用脚本,可以做到删除

./kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file ./offset.json

2.如果上述方法,提示错误:

Error: Could not find or load main class kafka.admin.DeleteRecordsCommand

则说明kafka版本过低,这时候可以使用另一种方法。

./kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic testTopic --config retention.ms=时间(微秒)
动态地更新消息保留时间,假如只保留一小时之内的消息 ,60 x 60 x 1000 = 360000 就设置为retention.ms=3600000
然后kafka需要轮询,之后会执行删除
注意:kafka执行完删除后 你需要再次调用这个脚本 把时间还原回去

这种方法,误差根据你服务器的配置来决定的。由kafka配置文件的log.retention.check.interval.ms参数控制。 并且因为有很多个节点,经常是某个节点删除了数据之后,其他的节点还没有轮询到删除操作。 总的来说精确度不是很高。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容