有的时候需要检出Kafka中某个topic的所有partition的offset range. 比如Spark Streaming在指定fromOffset时,如果不校验边界,可能会出错。Kafka提供了命令来check。这里提供一个基于Java API的方式
代码如下
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(config);
consumer.subscribe(topics);
ConsumerRecords<String, byte[]> records = consumer.poll(1000);
return records.partitions().parallelStream().map(topicPartition -> {
consumer.seekToBeginning(Collections.singletonList(topicPartition));
long offset = consumer.position(topicPartition);
return new TopicPartitionInfo(topicPartition.topic(), topicPartition.partition(), offset);
}).collect(Collectors.toList());
完整代码:See Here
依赖
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
2018.01.09更新
上述代码中,如果poll(1000)
获得的records
并没有包含所有的partition的record,records.partitions()
所获取的并非为全部的该topic的partition。
即records.partitions()
只会返回这段records中所含有的partition。
因此,你可能需要
Map<TopicPartition, Long> fromOffsets = new HashMap<>();
//do fill your fromOffsets with your own local offset-store here
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(kafkaParams);
consumer.subscribe(topics);
consumer.poll(100);
for (TopicPartition topicPartition : fromOffsets.keySet()) {
consumer.seekToBeginning(Collections.singletonList(topicPartition));
long offset = consumer.position(topicPartition);
long consumedOffset = fromOffsets.getOrDefault(topicPartition, 0L);
if (offset > consumedOffset) {
log.warn("At partition {}, our system has consumed to {} but we can start only from {} because of retention expiration.", topicPartition.partition(), consumedOffset, offset);
log.warn("At partition {}, start offset has been adjusted to {}", topicPartition.partition(), offset);
fromOffsets.put(topicPartition, offset);
}
}
consumer.unsubscribe();