spark streaming源码解读之job动态生成和深度思考

      输入的ds有很多来源Kafka、Socket、Flume,输出的DStream其实是逻辑级别的Action,是Spark Streaming框架提出的,其底层翻译成为物理级别的额Action,是RDD的Action,中间是处理过程是transformations,状态转换也就是业务处理逻辑的过程。

       Spark Streaming二种数据来源:

             1、基于DStream数据源。

             2、基于其他DStream产生的数据源。

      关键性的观点:做大数据的时候不是流失处理,一般会有定时任务,定时任务一般十分钟触发一次、一天触发一次,做大数据的定时任务就是流失处理的感觉,虽然不规范,一切和流处理没有关系的数据都是没有价值的。即使做批处理或数据挖掘其实也是在做数据流处理,只不过是隐形的流处理,所有的数据都会变成流处理。

      所以就有统一的抽象,所有处理都是流处理的方式,所有的处理都将会被纳入流处理。企业大型开发项目都有j2ee后台支撑来提供各种人操作大数据中心。

      Spark streaming程序入口就有batchDuration时间窗口,每隔五秒钟JobGeneration都会产生一个job,这个job是逻辑级别的,所以逻辑级别要有这个job,并且这个job该琢磨做,但环没有做,由底层物理级别的action去做,底层物理级别是基于rdd的依赖关系。Ds的action操作也是逻辑级别的。Ss根据axtion操作产生逻辑级别的job,但是不会运行,就相当线程runnable接口。逻辑级别的暂时没有身材物理级别的,所以可以去调度和优化,假设讲ds的操作翻译成rdd的action,最后一个操作是rdd的action操作,是不是已翻译就立即触发job,纪要完成翻译又不要生成job的话需要

JavaStreamingContext jsc =newJavaStreamingContext(conf, Durations.seconds(5));

下面主要从三个类进行解析:

1、JobGenerator类:根据batchDuration及内部默认的时间间隔生成Jobs;

2、JobScheduler:根据batchDuration负责Spark Streaming Job的调度;

3、ReceiverTracker:负责Driver端元数据的接收和启动executor中的接收数据线程;

1、JobGenerator类:

**

* This class generates jobs from DStreams as well as drives checkpointing and cleaning

* up DStream metadata.

*/

private[streaming]

classJobGenerator(jobScheduler: JobScheduler)extendsLogging {

注释说基于dsg产生数据源,JobGenerator随着时间推移产生很多jobs,ss中除了定时身材的job,患有其他方式身材的job,例如基于各种聚合和状态的操作,状态操作不是基于batchd,他会对很多btchd处理。为了窗口之类的操作会触发JobGenerator,元素局的清理,作业生成的类。

// eventLoop is created when generator starts.

// This not being null means the scheduler has been started and not stopped

private var eventLoop: EventLoop[JobGeneratorEvent] = null //消息循环体定义

// last batch whose completion,checkpointing and metadata cleanup has been completed

private var lastProcessedBatch: Time = null

/** Start generation of jobs */

def start(): Unit = synchronized {

if (eventLoop != null) return// generator has already been started

// Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.

// See SPARK-10125

checkpointWriter //执行checkpoint检查点

eventLoop = newEventLoop[JobGeneratorEvent]("JobGenerator") {//内部匿名类创建

override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) //事件处理逻辑

override protected def onError(e: Throwable): Unit = {

jobScheduler.reportError("Error in job generator", e)

}

}

eventLoop.start()//启动事件处理线程对队列事件进行处理

if (ssc.isCheckpointPresent) {

restart()

} else {

startFirstTime()

}

}

/**

* An event loop to receive events from the caller and process all events in the event thread. It

* will start an exclusive event thread to process all events.

*

* Note: The event queue will grow indefinitely. So subclasses should make sure `onReceive` can

* handle events in time to avoid the potential OOM.

*/

private[spark] abstract classEventLoop[E](name: String) extends Logging {

private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()//消息队列数据结构

private val stopped = new AtomicBoolean(false)//原子变量

private val eventThread = new Thread(name) {//封装线程对象

setDaemon(true) //后台为线程

override def run(): Unit = { //线程执行逻辑

try {

while (!stopped.get) {

val event = eventQueue.take() //从消息队列中逐一获取消息对象

try {

onReceive(event) //对获取的消息对象进行业务处理

} catch {

case NonFatal(e) => { //处理失败后回调错误逻辑执代码

try {

onError(e)

} catch {

case NonFatal(e) => logError("Unexpected error in " + name, e)

}

}

}

}

} catch {

case ie: InterruptedException => // exit even if eventQueue is not empty

case NonFatal(e) => logError("Unexpected error in " + name, e)

}

}

}

def start(): Unit = { //启动当前线程类

if (stopped.get) {

throw new IllegalStateException(name + " has already been stopped")

}

// Call onStart before starting the event thread to make sure it happens before onReceive

onStart()

eventThread.start() //启动当前线程类业务run方法的执行

}

/** Processes all events */

private defprocessEvent(event: JobGeneratorEvent) {//根据消息对象执行相应的处理业务代码

logDebug("Got event " + event)

event match {

case GenerateJobs(time) => generateJobs(time) //根据时间片生成Jobs

case ClearMetadata(time) => clearMetadata(time) //时间片内的Jobs执行完毕,清理Driver上的元数据

case DoCheckpoint(time, clearCheckpointDataLater) =>//时间片内的Jobs执行完毕,清理checkpint数据

doCheckpoint(time, clearCheckpointDataLater)

case ClearCheckpointData(time) => clearCheckpointData(time)

}

}

2、JobSchedule类:

/**

* This class schedules jobs to be run on Spark. It uses the JobGenerator to generate

* the jobs and runs them using a thread pool.

*/

private[streaming]

class JobScheduler(val ssc: StreamingContext) extends Logging {

// Use of ConcurrentHashMap.keySet later causes an odd runtime problem due to Java 7/8 diff

// https://gist.github.com/AlainODea/1375759b8720a3f9f094

private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]//在指定的时间片内生成Jobs集合数据结构

private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)

private val jobExecutor =

ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")//启动指定大小的线程池

private val jobGenerator = new JobGenerator(this)//启动JobGenerator对象

val clock = jobGenerator.clock //jobGenerator时钟

val listenerBus = new StreamingListenerBus() //linstenerBus消息总线

// These two are created only when scheduler starts.

// eventLoop not being null means the scheduler has been started and not stopped

var receiverTracker: ReceiverTracker = null //driver端的元数据接收跟踪器

// A tracker to track all the input stream information as well as processed record number

var inputInfoTracker: InputInfoTracker = null //输入流信息跟踪器

private var eventLoop: EventLoop[JobSchedulerEvent] = null //消息循环体对象

def start(): Unit = synchronized { JobScheudler类启动主方法

if (eventLoop != null) return // scheduler has already been started

logDebug("Starting JobScheduler")

eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {

override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)

}

eventLoop.start()

// attach rate controllers of input streams to receive batch completion updates

for {

inputDStream <- ssc.graph.getInputStreams  //数据流

rateController <- inputDStream.rateController //数据接收平率控制

} ssc.addStreamingListener(rateController)

listenerBus.start(ssc.sparkContext) //启动消息总线

receiverTracker = new ReceiverTracker(ssc) //创建接收器对象

inputInfoTracker = new InputInfoTracker(ssc) //创建数据输入对象

receiverTracker.start() //启动数据接收器线程

jobGenerator.start() //启动jobs产生器线程

logInfo("Started JobScheduler")

}

def submitJobSet(jobSet: JobSet) {

if (jobSet.jobs.isEmpty) {

logInfo("No jobs added for time " + jobSet.time)

} else {

listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))

jobSets.put(jobSet.time, jobSet)

jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))

logInfo("Added jobs for time " + jobSet.time)

}

}

private def handleJobStart(job: Job, startTime: Long) {

val jobSet = jobSets.get(job.time)

val isFirstJobOfJobSet = !jobSet.hasStarted

jobSet.handleJobStart(job)

if (isFirstJobOfJobSet) {

// "StreamingListenerBatchStarted" should be posted after calling "handleJobStart" to get the

// correct "jobSet.processingStartTime".

listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo))

}

job.setStartTime(startTime)

listenerBus.post(StreamingListenerOutputOperationStarted(job.toOutputOperationInfo))

logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)

}

private class JobHandler(job: Job) extends Runnable with Logging {

import JobScheduler._

def run() {

try {

val formattedTime = UIUtils.formatBatchTime(

job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)

val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"

val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

ssc.sc.setJobDescription(

s"""Streaming job from $batchLinkText""")

ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)

ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)

// We need to assign `eventLoop` to a temp variable. Otherwise, because

// `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then

// it's possible that when `post` is called, `eventLoop` happens to null.

var _eventLoop = eventLoop

if (_eventLoop != null) {

_eventLoop.post(JobStarted(job, clock.getTimeMillis()))

// Disable checks for existing output directories in jobs launched by the streaming

// scheduler, since we may need to write output to an existing directory during checkpoint

// recovery; see SPARK-4835 for more details.

PairRDDFunctions.disableOutputSpecValidation.withValue(true) {

job.run()

}

_eventLoop = eventLoop

if (_eventLoop != null) {

_eventLoop.post(JobCompleted(job, clock.getTimeMillis()))

}

} else {

// JobScheduler has been stopped.

}

} finally {

ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)

ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)

}

}

}

}

备注:

资料来源于:DT_大数据梦工厂(Spark发行版本定制)

更多私密内容,请关注微信公众号:DT_Spark

如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352

推荐阅读更多精彩内容