【Spark】Spark Streaming 指定消费Topic中某个Partition的数据

最近接触到一个需求:通过Driect API消费Kafka topic中指定partiton中的数据。这个partition中的数据是用户通过某种算法过滤出的有效数据,其他partition数据不作处理,代码片段实现如下:

val kafkaParams = Map[String, String]("metadata.broker.list" -> "ip:port", "auto.offset.reset" -> "largest")

val kc = new KafkaCluster(kafkaParams)
val arrTopicPartition = Array(
  TopicAndPartition("topic1", 1),
  TopicAndPartition("topic2", 0))

val arrLastOffsets  = for (tp <- arrTopicPartition)
  yield kc.getLatestLeaderOffsets(Set(tp)).right.get(tp).offset

val messages = KafkaUtils.createDirectStream[
  String, String, StringDecoder, StringDecoder,(String, Int, String)](
  ssc,
  kafkaParams,
  Map(arrTopicPartition(0) -> arrLastOffsets(0),
    arrTopicPartition(1) -> arrLastOffsets(1)),
  (m: MessageAndMetadata[String, String]) => (m.topic, m.partition, m.message()))

var offsetRanges = Array.empty[OffsetRange]
messages.transform { rdd =>
  offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd
}.foreachRDD { rdd =>
  for (o <- offsetRanges) {
    println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset} ${rdd.collect().foreach(r => print(r + "##"))}")
  }
}

实现消费topic1中partiton为1和topic2中partition为0 的数据。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。