[spark streaming] DStream 和 DStreamGraph 解析

看 spark streaming 源码解析之前最好先了解spark core的内容。

前言

Spark Streaming 是基于Spark Core将流式计算分解成一系列的小批处理任务来执行。

在Spark Streaming里,总体负责任务的动态调度是JobScheduler,而JobScheduler有两个很重要的成员:JobGeneratorReceiverTrackerJobGenerator 负责将每个 batch 生成具体的 RDD DAG ,而ReceiverTracker负责数据的来源。

Spark Streaming里的DStream可以看成是Spark Core里的RDD的模板,DStreamGraph是RDD DAG的模板。

跟着例子看流程

DStream 也和 RDD 一样有着转换(transformation)和 输出(output)操作,通过 transformation 操作会产生新的DStream,典型的transformation 操作有map(), filter(), reduce(), join()等。RDD的输出操作会触发action,而DStream的输出操作也会新建一个ForeachDStream,用一个函数func来记录所需要做的操作。

下面看一个例子:

val conf = new SparkConf().setMaster("local[2]")
                          .setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))      
val pairs = words.map(word => (word, 1))    
val wordCounts = pairs.reduceByKey(_ + _)   
wordCounts.print()
ssc.start()
ssc.awaitTermination()

在创建 StreamingContext 的时候实创建了 graph: DStreamGraph:

private[streaming] val graph: DStreamGraph = {
    if (isCheckpointPresent) {
      _cp.graph.setContext(this)
      _cp.graph.restoreCheckpointData()
      _cp.graph
    } else {
      require(_batchDur != null, "Batch duration for StreamingContext cannot be null")
      val newGraph = new DStreamGraph()
      newGraph.setBatchDuration(_batchDur)
      newGraph
    }
  }

checkpoint 可用,会优先从 checkpoint 恢复 graph,否则新建一个。graph用来动态的创建RDD DAG,DStreamGraph有两个重要的成员:inputStreamsoutputStreams

private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()

Spark Streaming记录DStream DAG 的方式就是通过DStreamGraph实例记录所有的outputStreams ,因为outputStream会通过依赖
dependencies 来和parent DStream形成依赖链,通过outputStreams 向前追溯遍历就可以得到所有上游的DStream,另外,DStreamGraph 还会记录所有的inputStreams ,避免每次为查找 input stream 而对 output steam 进行 BFS 的消耗。

继续回到例子,这里通过ssc.socketTextStream 创建了一个ReceiverInputDStream,在其父类 InputDStream 中会将该ReceiverInputDStream添加到inputStream里。

接着调用了flatMap方法:

def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope {
    new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
  }

--------------------------------------------------------------------

private[streaming]
class FlatMappedDStream[T: ClassTag, U: ClassTag](
    parent: DStream[T],
    flatMapFunc: T => TraversableOnce[U]
  ) extends DStream[U](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[U]] = {
    parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
  }
}

创建了一个 FlatMappedDStream ,而该类的compute方法是在父 DStream(ReceiverInputDStream) 在对应batch时间的RDD上调用了flatMap方法,也就是构造了 rdd.flatMap(func)这样的代码,后面的操作类似,随后形成的是rdd.flatMap(func1).map(func2).reduceByKey(func3).take(),这不就是我们spark core里的东西吗。另外其dependencies是直接指向了其构造参数parent,也就是刚才的ReceiverInputDStream,每个新建的DStream的dependencies都是指向了其父DStream,这样就构成了一个依赖链,也就是形成了DStream DAG。

这里我们再看看最后的 print() 操作:

----
def print(num: Int): Unit = ssc.withScope {
    def foreachFunc: (RDD[T], Time) => Unit = {
      (rdd: RDD[T], time: Time) => {
        val firstNum = rdd.take(num + 1)
        // scalastyle:off println
        println("-------------------------------------------")
        println(s"Time: $time")
        println("-------------------------------------------")
        firstNum.take(num).foreach(println)
        if (firstNum.length > num) println("...")
        println()
        // scalastyle:on println
      }
    }
    foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
  }
----
private def foreachRDD(
      foreachFunc: (RDD[T], Time) => Unit,
      displayInnerRDDOps: Boolean): Unit = {
    new ForEachDStream(this,
      context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
  }
----
#ForEachDStream
override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }

在print() 方法里构建了一个foreachFunc方法:对一个rdd进行了take操作并打印(spark core中的action操作)。随后创建了ForEachDStream实例并调用了register()方法:

 private[streaming] def register(): DStream[T] = {
    ssc.graph.addOutputStream(this)
    this
  }

将 OutputStream 添加到DStreamGraphoutputStreams 里。可以看到刚才构建的 foreachFunc 方法最终用在了ForEachDStream实例的generateJob方法里,并创建了一个Streaming 中的Job,在job中的run方法中会调用这个方法,也就是会触发action操作。

注意这里Spark Streaming的Job和Spark Core里的Job是不一样的,Streaming的Job执行的是前面构造的方法,方法里面是Core里的Job,方法可以定义多个core里的Job,也可以一个core里的job都没有。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,386评论 6 479
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,939评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,851评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,953评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,971评论 5 369
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,784评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,126评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,765评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,148评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,744评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,858评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,479评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,080评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,053评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,278评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,245评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,590评论 2 343

推荐阅读更多精彩内容