揭开Spark Streaming神秘面纱③ - 动态生成 job

JobScheduler有两个重要成员,一是上文介绍的 ReceiverTracker,负责分发 receivers 及源源不断地接收数据;二是本文将要介绍的 JobGenerator,负责定时的生成 jobs 并 checkpoint。

定时逻辑

在 JobScheduler 的主构造函数中,会创建 JobGenerator 对象。在 JobGenerator 的主构造函数中,会创建一个定时器:

  private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

该定时器每隔 ssc.graph.batchDuration.milliseconds 会执行一次 eventLoop.post(GenerateJobs(new Time(longTime))) 向 eventLoop 发送 GenerateJobs(new Time(longTime))消息,eventLoop收到消息后会进行这个 batch 对应的 jobs 的生成及提交执行,eventLoop 是一个消息接收处理器。
需要注意的是,timer 在创建之后并不会马上启动,将在 StreamingContext#start() 启动 Streaming Application 时间接调用到 timer.start(restartTime.milliseconds)才启动。

为 batch 生成 jobs

eventLoop 在接收到 GenerateJobs(new Time(longTime))消息后的主要处理流程有以上图中三步:

  1. 将已接收到的 blocks 分配给 batch
  2. 生成该 batch 对应的 jobs
  3. 将 jobs 封装成 JobSet 并提交执行

接下来我们就将逐一展开这三步进行分析

将已接受到的 blocks 分配给 batch

上图是根据源码画出的为 batch 分配 blocks 的流程图,这里对 『获得 batchTime 各个 InputDStream 未分配的 blocks』作进一步说明:
在文章 『文章链接』 中我们知道了各个 ReceiverInputDStream 对应的 receivers 接收并保存的 blocks 信息会保存在 ReceivedBlockTracker#streamIdToUnallocatedBlockQueues,该成员 key 为 streamId,value 为该 streamId 对应的 InputDStream 已接收保存但尚未分配的 blocks 信息。
所以获取某 InputDStream 未分配的 blocks 只要以该 InputDStream 的 streamId 来从 streamIdToUnallocatedBlockQueues 来 get 就好。获取之后,会清楚该 streamId 对应的value,以保证 block 不会被重复分配。

在实际调用中,为 batchTime 分配 blocks 时,会从streamIdToUnallocatedBlockQueues取出未分配的 blocks 塞进 timeToAllocatedBlocks: mutable.HashMap[Time, AllocatedBlocks] 中,以在之后作为该 batchTime 对应的 RDD 的输入数据。

通过以上步骤,就可以为 batch 的所有 InputDStream 分配 blocks。也就是为 batch 分配了 blocks。

生成该 batch 对应的 jobs

为指定 batchTime 生成 jobs 的逻辑如上图所示。你可能会疑惑,为什么 DStreamGraph#generateJobs(time: Time)为什么返回 Seq[Job],而不是单个 job。这是因为,在一个 batch 内,可能会有多个 OutputStream 执行了多次 output 操作,每次 output 操作都将产生一个 Job,最终就会产生多个 Jobs。

我们结合上图对执行流程进一步分析。
DStreamGraph#generateJobs(time: Time)中,对于DStreamGraph成员ArrayBuffer[DStream[_]]的每一项,调用DStream#generateJob(time: Time)来生成这个 outputStream 在该 batchTime 的 job。该生成过程主要有三步:

Step1: 获取该 outputStream 在该 batchTime 对应的 RDD

每个 DStream 实例都有一个 generatedRDDs: HashMap[Time, RDD[T]] 成员,用来保存该 DStream 在每个 batchTime 生成的 RDD,当 DStream#getOrCompute(time: Time)调用时

  • 首先会查看generatedRDDs中是否已经有该 time 对应的 RDD,若有则直接返回

  • 若无,则调用compute(validTime: Time)来生成 RDD,这一步根据每个 InputDStream继承 compute 的实现不同而不同。例如,对于 FileInputDStream,其 compute 实现逻辑如下:

    1. 先通过一个 findNewFiles() 方法,找到多个新 file
    2. 对每个新 file,都将其作为参数调用 sc.newAPIHadoopFile(file),生成一个 RDD 实例
    3. 将 2 中的多个新 file 对应的多个 RDD 实例进行 union,返回一个 union 后的 UnionRDD

Step2: 根据 Step1中得到的 RDD 生成最终 job 要执行的函数 jobFunc

jobFunc定义如下:

val jobFunc = () => {
  val emptyFunc = { (iterator: Iterator[T]) => {} }
  context.sparkContext.runJob(rdd, emptyFunc)
}

可以看到,每个 outputStream 的 output 操作生成的 Job 其实与 RDD action 一样,最终调用 SparkContext#runJob 来提交 RDD DAG 定义的任务

Step3: 根据 Step2中得到的 jobFunc 生成最终要执行的 Job 并返回

Step2中得到了定义 Job 要干嘛的函数-jobFunc,这里便以 jobFunc及 batchTime 生成 Job 实例:

Some(new Job(time, jobFunc))

该Job实例将最终封装在 JobHandler 中被执行

至此,我们搞明白了 JobScheduler 是如何通过一步步调用来动态生成每个 batchTime 的 jobs。下文我们将分析这些动态生成的 jobs 如何被分发及如何执行。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,874评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,102评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,676评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,911评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,937评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,935评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,860评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,660评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,113评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,363评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,506评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,238评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,861评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,486评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,674评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,513评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,426评论 2 352

推荐阅读更多精彩内容