首先贴代码:
config := cluster.NewConfig()
config.Group.Mode = cluster.ConsumerModePartitions
config.Consumer.Offsets.Initial = sarama.OffsetNewest
// init consumer
brokers := []string{"***.***.**.***:9092"}
topics := []string{"***"}
consumer, err := cluster.NewConsumer(brokers, "test-64", topics, config)
if err != nil {
panic(err)
}
defer consumer.Close()
这里已经设置了OffsetNewest,但是并没有从最新的一条开始消费。
然后在github上的issue上找到了答案:
https://github.com/bsm/sarama-cluster/issues/133
因为在同一个consumer-group下kafka会记录同一个offsets,不能因为其中的一个consumer重启而重新initial offsets。所以如果想从最新的一条开始消费的话,只能换一个consumer-group。