序
这里简单展示一下如何使用kafka0.8的client去消费一个topic。
maven
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.2</version>
</dependency>
初始化客户端
Properties props = new Properties();
props.put("zookeeper.connect", zk);
// props.put("auto.offset.reset","smallest");
props.put("group.id",group);
props.put("zookeeper.session.timeout.ms", "10000");
props.put("zookeeper.sync.time.ms", "2000");
props.put("auto.commit.interval.ms", "10000");
props.put("consumer.timeout.ms","10000"); //设置ConsumerIterator的hasNext的超时时间,不设置则永远阻塞直到有新消息来
props.put(org.apache.kafka.clients.consumer.ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");
ConsumerConfig consumerConfig = new kafka.consumer.ConsumerConfig(props);
ConsumerConnector consumerConnector = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, consumerCount);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumerConnector
.createMessageStreams(topicCountMap);
并发消费
consumerMap.get(topic).stream().forEach(stream -> {
pool.submit(new Runnable() {
@Override
public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
//it.hasNext()取决于consumer.timeout.ms的值,默认为-1
try{
while (it.hasNext()) {
System.out.println(Thread.currentThread().getName()+" hello");
//是hasNext抛出异常,而不是next抛出
System.out.println(Thread.currentThread().getName()+":"+new String(it.next().message()));
}
}catch (ConsumerTimeoutException e){
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" end");
}
});
});
注意事项
消费者实例数*每个实例的消费线程数 <= topic的partition数量,否则多余的就浪费了。