最近接触到一个需求:通过Driect API消费Kafka topic中指定partiton中的数据。这个partition中的数据是用户通过某种算法过滤出的有效数据,其他partition数据不作处理,代码片段实现如下:
val kafkaParams = Map[String, String]("metadata.broker.list" -> "ip:port", "auto.offset.reset" -> "largest")
val kc = new KafkaCluster(kafkaParams)
val arrTopicPartition = Array(
TopicAndPartition("topic1", 1),
TopicAndPartition("topic2", 0))
val arrLastOffsets = for (tp <- arrTopicPartition)
yield kc.getLatestLeaderOffsets(Set(tp)).right.get(tp).offset
val messages = KafkaUtils.createDirectStream[
String, String, StringDecoder, StringDecoder,(String, Int, String)](
ssc,
kafkaParams,
Map(arrTopicPartition(0) -> arrLastOffsets(0),
arrTopicPartition(1) -> arrLastOffsets(1)),
(m: MessageAndMetadata[String, String]) => (m.topic, m.partition, m.message()))
var offsetRanges = Array.empty[OffsetRange]
messages.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.foreachRDD { rdd =>
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset} ${rdd.collect().foreach(r => print(r + "##"))}")
}
}
实现消费topic1中partiton为1和topic2中partition为0 的数据。