importjava.util.HashMap;
importjava.util.List;
importjava.util.Map;
importjava.util.Properties;
importkafka.consumer.Consumer;
importkafka.consumer.ConsumerConfig;
importkafka.consumer.ConsumerIterator;
importkafka.consumer.KafkaStream;
importkafka.javaapi.consumer.ConsumerConnector;
importkafka.message.MessageAndMetadata;
/**
* offset在zookeeper中记录,以group.id为key 分区和customer的对应关系由Kafka维护
*
* @author 崔磊
* @date 2015年11月4日 上午11:44:15
*/
publicclassMyHighLevelConsumer {
/**
* 该consumer所属的组ID
*/
privateString groupid;
/**
* 该consumer的ID
*/
privateString consumerid;
/**
* 每个topic开几个线程?
*/
privateintthreadPerTopic;
publicMyHighLevelConsumer(String groupid, String consumerid,intthreadPerTopic) {
super();
this.groupid = groupid;
this.consumerid = consumerid;
this.threadPerTopic = threadPerTopic;
}
publicvoidconsume() {
Properties props =newProperties();
props.put("group.id", groupid);
props.put("consumer.id", consumerid);
props.put("zookeeper.connect", KafkaProperties.ZK_CONNECT);
props.put("zookeeper.session.timeout.ms","60000");
props.put("zookeeper.sync.time.ms","2000");
// props.put("auto.commit.interval.ms", "1000");
ConsumerConfig config =newConsumerConfig(props);
ConsumerConnector connector = Consumer.createJavaConsumerConnector(config);
Map topicCountMap =newHashMap();
// 设置每个topic开几个线程
topicCountMap.put(KafkaProperties.TOPIC, threadPerTopic);
// 获取stream
Map>> streams = connector.createMessageStreams(topicCountMap);
// 为每个stream启动一个线程消费消息
for(KafkaStream stream : streams.get(KafkaProperties.TOPIC)) {
newMyStreamThread(stream).start();
}
}
/**
* 每个consumer的内部线程
*
* @author cuilei05
*
*/
privateclassMyStreamThreadextendsThread {
privateKafkaStream stream;
publicMyStreamThread(KafkaStream stream) {
super();
this.stream = stream;
}
@Override
publicvoidrun() {
ConsumerIterator streamIterator = stream.iterator();
// 逐条处理消息
while(streamIterator.hasNext()) {
MessageAndMetadata message = streamIterator.next();
String topic = message.topic();
intpartition = message.partition();
longoffset = message.offset();
String key =newString(message.key());
String msg =newString(message.message());
// 在这里处理消息,这里仅简单的输出
// 如果消息消费失败,可以将已上信息打印到日志中,活着发送到报警短信和邮件中,以便后续处理
System.out.println("consumerid:"+ consumerid +", thread : "+ Thread.currentThread().getName()
+", topic : "+ topic +", partition : "+ partition +", offset : "+ offset +" , key : "
+ key +" , mess : "+ msg);
}
}
}
publicstaticvoidmain(String[] args) {
String groupid ="myconsumergroup";
MyHighLevelConsumer consumer1 =newMyHighLevelConsumer(groupid,"myconsumer1",3);
MyHighLevelConsumer consumer2 =newMyHighLevelConsumer(groupid,"myconsumer2",3);
consumer1.consume();
consumer2.consume();
}
}