sparkstreaming读取kafka数据源流程解析(0.10版本)

针对0.10及以上版本的kafka, spark推出了更简洁的模式进行数据读取, jar包名称为spark-streaming-kafka-0-10_2.12. 这种方式可以使得读取的rdd的分区和kafka的分区保持一致, 从而实现高效地读取. 本文对这种读取方式进行解析, 不关注过多源码细节, 主要关注官方是如何利用KafkaConsumer这个类来实现这种读取模式的, 本文的源码版本为2.4.0
先说结论 :
(1) 在启动的时候, spark在driver端生成一个consumer, 并获得所订阅主题及分区的当前offset值
(2) 根据设置的参数去判断每个分区需要拉取的数据量, 即每个分区的untilOffset值, 此时driver端获得了每个分区需要被消费的数据量, spark将这些值保存在OffsetRange的类里面, 该类的定义如下

final class OffsetRange private(
    val topic: String,
    val partition: Int,
    val fromOffset: Long,
    val untilOffset: Long) extends Serializable {

(3) 在executor端创建新的consumer (和driver端的consumer属于不同的消费组), 然后将上一步获得的 OffsetRange分配到各个executor, 这些executor上的consumer根据指定的分区及起始offset进行消费.
(4) 待executor消费成功后, driver端的consumer即seek每个分区的offset到最新的位置, 然后重复第二步的过程, 同时提交当前offset(可自动/手动提交)

总的来说, spark分别在driver端和executor端创建consumer连接, driver端的consumer负责确定当前需要消费数据offset范围, 并分配到各个executor, 然后executor端的consumer根据分配到的offset范围进行消费, 最后在driver端进行offset的提交
下面的详细流程:

  1. 创建DirectKafkaInputDStream
    官方的实例代码如下:
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

这里创建的stream对象为DirectKafkaInputDStream, 这个类还是继承类InputDStream, 所以我们直接看它重写的startcompute方法即可
start方法定义如下:

  override def start(): Unit = {
    // 这里的consume类型为KafkaConsumer, 通过上一步传入的Subscribe类型来设置不同的KafkaConsumer属性
    val c = consumer
    // 对offset进行矫正, 主要是为了考虑缓存中有数据, 但是还未poll的情况
    paranoidPoll(c)
    if (currentOffsets.isEmpty) {
      currentOffsets = c.assignment().asScala.map { tp =>
        tp -> c.position(tp)
      }.toMap
    }
  }

start方法主要就是在driver端创建了一个KafkaConsumer, 并对当前的offset进行更新/矫正
接下来是compute方法, 主要是生成KafkaRDD, 代码如下:

 override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
    val untilOffsets = clamp(latestOffsets())
    val offsetRanges = untilOffsets.map { case (tp, uo) =>
      val fo = currentOffsets(tp)
      OffsetRange(tp.topic, tp.partition, fo, uo)
    }
    val useConsumerCache = context.conf.getBoolean("spark.streaming.kafka.consumer.cache.enabled",
      true)
    val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray,
      getPreferredHosts, useConsumerCache)

    // Report the record number and metadata of this batch interval to InputInfoTracker.
    val description = offsetRanges.filter { offsetRange =>
      // Don't display empty ranges.
      offsetRange.fromOffset != offsetRange.untilOffset
    }.map { offsetRange =>
      s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
        s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
    }.mkString("\n")
    // Copy offsetRanges to immutable.List to prevent from being modified by the user
    val metadata = Map(
      "offsets" -> offsetRanges.toList,
      StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
    val inputInfo = StreamInputInfo(id, rdd.count, metadata)
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

    currentOffsets = untilOffsets
    commitAll()
    Some(rdd)
  }

KafkaRDD`的构造方法为

val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray,      getPreferredHosts, useConsumerCache)

除了包含每个分区需要消费的数据范围offsetRanges参数外, 还包含一个executorKafkaParams, 此为executor端创建consumer时的参数,相关代码在

  val executorKafkaParams = {
    val ekp = new ju.HashMap[String, Object](consumerStrategy.executorKafkaParams)
    KafkaUtils.fixKafkaParams(ekp)
    ekp
  }

点进去我们可以发现, executor端的kafkaParams主要针对driver端的做了4点修改, 如下:

  private[kafka010] def fixKafkaParams(kafkaParams: ju.HashMap[String, Object]): Unit = {
    logWarning(s"overriding ${ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG} to false for executor")
    kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false: java.lang.Boolean)

    logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to none for executor")
    kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")

    // driver and executor should be in different consumer groups
    val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
    if (null == originalGroupId) {
      logError(s"${ConsumerConfig.GROUP_ID_CONFIG} is null, you should probably set it")
    }
    val groupId = "spark-executor-" + originalGroupId
    logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}")
    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)

    // possible workaround for KAFKA-3135
    val rbb = kafkaParams.get(ConsumerConfig.RECEIVE_BUFFER_CONFIG)
    if (null == rbb || rbb.asInstanceOf[java.lang.Integer] < 65536) {
      logWarning(s"overriding ${ConsumerConfig.RECEIVE_BUFFER_CONFIG} to 65536 see KAFKA-3135")
      kafkaParams.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
    }
  }

在成功poll到数据后, 会将currentOffsets更新为untilOffsets, 然后在下一次compute之前的latestOffsets()方法中会执行
c.seekToEnd(currentOffsets.keySet.asJava), 将driver端consumer的offset刷到最新.
最后还会有一个commitAll()方法, 这个方法只在手动调用stream.commitAsync(offsetRanges)后才会起作用, 因为这个方法是从一个队列里取出要提交的offset值, 然后调用kafkaConsumer.commitAsync()进行提交, 而stream.commitAsync(offsetRanges)方法会将要提交的offset保存至队列.

  1. KafkaRDD的实现
    KafkaRDD是继承自RDD的类, 用于在上一步的compute方法中返回的RDD类型, 构造如下
private[spark] class KafkaRDD[K, V](
    sc: SparkContext,
    val kafkaParams: ju.Map[String, Object],
    val offsetRanges: Array[OffsetRange],
    val preferredHosts: ju.Map[TopicPartition, String],
    useConsumerCache: Boolean
) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges

先看它重写的getPartition方法, 这是定义RDD分区的依据

  override def getPartitions: Array[Partition] = {
    offsetRanges.zipWithIndex.map { case (o, i) =>
        new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset)
    }.toArray
  }

在这里我们可以看到, RDD的分区数是根据offsetRanges来的, 和订阅Kafka的分区数是保持一致.
接下来是compute方法的重写, 这一步返回的是一个KafkaRDDIterator, 而在KafkaRDDIteratornext()方法如下

  override def next(): ConsumerRecord[K, V] = {
    if (!hasNext) {
      throw new ju.NoSuchElementException("Can't call getNext() once untilOffset has been reached")
    }
    val r = consumer.get(requestOffset, pollTimeout)
    requestOffset += 1
    r
  }

我们可以看到, 在这一步就是调用consumer.get()方法来返回数据, 每次返回一个ConsumerRecord[K, V], 这里的consumer对象是KafkaDataConsumer, get方法就是从buffer缓冲区中返回数据, buffer数据来源在poll方法中, 定义如下:

  private def poll(timeout: Long): Unit = {
    val p = consumer.poll(timeout)
    val r = p.records(topicPartition)
    logDebug(s"Polled ${p.partitions()}  ${r.size}")
    buffer = r.listIterator
  }

这里的consumer即是真正的KafkaConsumer了, 其定义在

  /** Create a KafkaConsumer to fetch records for `topicPartition` */
  private def createConsumer: KafkaConsumer[K, V] = {
    val c = new KafkaConsumer[K, V](kafkaParams)
    val topics = ju.Arrays.asList(topicPartition)
    c.assign(topics)
    c
  }

在这里我们可以看到, 这里采用的其实就是assign指定分区的方式进行数据拉取.

大致的流程就是这些了, 主要设计到了DirectKafkaInputDStream, KafkaRDD, KafkaRDDIterator这么几个类, 其中还涉及到了是否允许数据compacted、consumer缓存等概念, 因为这里主要关注KafkaConsumer消费的过程, 在这里不做进一步的讨论了, 如果不对的地方欢迎一起交流.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,539评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,911评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,337评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,723评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,795评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,762评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,742评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,508评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,954评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,247评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,404评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,104评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,736评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,352评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,557评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,371评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,292评论 2 352