最近又要用上kafka,发现原来趟过的坑又趟过了一次
在这里做一下笔记,提醒以后注意:
- 如果连不上brokers,consumer.poll()会阻塞
- 同一个topic、同一个group的consumer,会彼此影响,哪怕前面的Test跑完了,后面创建的是新的consumer(这个至关重要)
- poll的正确使用方式是在死循环里面一直调用它
- 不同partition取到的records是乱序的
最后贴一段代码:
val LINES2 = Array[String]("hello", "world", "bye", "world");
val ROWS2 = LINES2.map(Row(_));
class ConsumerThread(topic: String, groupId: String, buffer: ArrayBuffer[String]) extends Thread {
//consumer
val props = new Properties();
props.put("group.id", groupId);
props.put("bootstrap.servers", "vm105:9092,vm106:9092,vm107:9092,vm181:9092,vm182:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
val consumer = new KafkaConsumer[String, String](props);
consumer.subscribe(Arrays.asList(topic));
override def run {
while (true) {
val records = consumer.poll(100);
records.foreach(record ⇒
println("key:" + record.key() + " value=" + record.value() + " partition:" + record.partition() + " offset=" + record.offset()));
buffer ++= records.map(_.value()).toSeq;
Thread.sleep(100);
}
}
}
@Test
def testKafka() {
val propsProducer = new Properties();
propsProducer.put("bootstrap.servers", "vm105:9092,vm106:9092,vm107:9092,vm181:9092,vm182:9092");
propsProducer.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
propsProducer.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
val producer = new KafkaProducer[String, String](propsProducer);
val topic = "kafka-topic2";
var index = -1;
for (row ← ROWS2) {
index += 1;
val key = "" + index;
//TODO: send an array instead of a string value?
val value = row(0).toString();
val record = new ProducerRecord[String, String](topic, key, value);
producer.send(record, new Callback() {
def onCompletion(metadata: RecordMetadata, e: Exception) = {
if (metadata != null) {
val offset = metadata.offset();
val partition = metadata.partition();
val topic = metadata.topic();
println(s"record is sent to kafka: topic=$topic, key=$key, value=$value, partition=$partition, offset=$offset");
}
}
});
}
Thread.sleep(1000);
val buffer = ArrayBuffer[String]();
val thread = new ConsumerThread("kafka-topic2", "g1", buffer);
thread.start();
Thread.sleep(10000);
val records = buffer.toArray;
thread.stop();
println(records.toSeq);
Assert.assertArrayEquals(LINES2.sorted.asInstanceOf[Array[Object]], records.sorted.asInstanceOf[Array[Object]]);
}