Spark系列(十)TaskSchedule工作原理

Spark系列(十)TaskSchedule工作原理 - 会飞的纸盒 - 博客园
http://www.cnblogs.com/jianyuan/p/Spark%E7%B3%BB%E5%88%97%E4%B9%8BTaskSchedule%E5%B7%A5%E4%BD%9C%E5%8E%9F%E7%90%86.html

工作原理图


源码分析:

1、submitTasks

在submitTasks方法中最后调用backend.reviveOffers()进行下一步的task调度分配

1
override def submitTasks(taskSet: TaskSet) {

2
val tasks = taskSet.tasks

3
logInfo("****Adding**** ****task**** ****set**** ****" + taskSet.id + "**** ****with**** ****" + tasks.length + "**** ****tasks****")

4
this.synchronized {

5
// 为taskSet创建TaskSetManager

6
// TaskSetManager用于对TaskSet的执行状况进行管理和监控

7
val manager = createTaskSetManager(taskSet, maxTaskFailures)

8
// 将manager加入activeTaskSets缓存中

9
activeTaskSets(taskSet.id) = manager

10
// 将manager加入schedulableBuilder中

11
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

12

13
if (!isLocal && !hasReceivedTask) {

14
starvationTimer.scheduleAtFixedRate(new TimerTask() {

15
override def run() {

16
if (!hasLaunchedTask) {

17
logWarning("****Initial**** ****job**** ****has**** ****not**** ****accepted**** ****any**** ****resources;**** ****" +

18
"****check**** ****your**** ****cluster**** ****UI**** ****to**** ****ensure**** ****that**** ****workers**** ****are**** ****registered**** ****" +

19
"****and**** ****have**** ****sufficient**** ****resources****")

20
} else {

21
this.cancel()

22
}

23
}

24
}, STARVATION_TIMEOUT, STARVATION_TIMEOUT)

25
}

26
hasReceivedTask = true

27
}

28
backend.reviveOffers()

29
}

2、makeOffers

调用过程:收到reviveOffers消息后调用makeOffers方法。

所属包:org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend

1
def makeOffers() {

2
// resourceOffers方法用于实现任务分配算法,将各个task分配到executor上

3
// launchTasks方法用于将所分配的task发送到对应的executor中执行

4
// WorkerOffer封装了Application可用的资源

5
launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>

6
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)

7
}.toSeq))

8
}

3、resourceOffers

1
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {

2
// Mark each slave as alive and remember its hostname

3
// Also track if new executor is added

4
var newExecAvail = false

5
// 遍历Application可用的资源FreeCores获取节点主机信息

6
for (o <- offers) {

7
executorIdToHost(o.executorId) = o.host

8
activeExecutorIds += o.executorId

9
if (!executorsByHost.contains(o.host)) {

10
executorsByHost(o.host) = new HashSetString

11
executorAdded(o.executorId, o.host)

12
newExecAvail = true

13
}

14
for (rack <- getRackForHost(o.host)) {

15
hostsByRack.getOrElseUpdate(rack, new HashSetString) += o.host

16
}

17
}

18

19
// Randomly shuffle offers to avoid always placing tasks on the same set of workers.

20
val shuffledOffers = Random.shuffle(offers)

21
// Build a list of tasks to assign to each worker.

22
val tasks = shuffledOffers.map(o => new ArrayBufferTaskDescription)

23
// executor可用的cores序列(每个executor最多可用多少个cores)

24
val availableCpus = shuffledOffers.map(o => o.cores).toArray

25
// rootPool中取出排好序的TaskSet,TaskScheduler初始化时,创建完TaskSchedulerImpl、

26
// SparkDeploySchedulerBackend之后,会执行initialize()方法,在该方法中会创建一个调度池,

27
// 所有提交的TaskSet先会放入该调度池,后面执行task分配分配算法时就从该调度池中取出排好序的TaskSet

28
val sortedTaskSets = rootPool.getSortedTaskSetQueue

29
for (taskSet <- sortedTaskSets) {

30
logDebug("****parentName:**** ****%s,**** ****name:**** ****%s,**** ****runningTasks:**** ****%s****".format(

31
taskSet.parent.name, taskSet.name, taskSet.runningTasks))

32
if (newExecAvail) {

33
taskSet.executorAdded()

34
}

35
}

36

37
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order

38
// of locality levels so that it gets a chance to launch local tasks on all of them.

39
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY

40

41
// 本地化级别

42
// PROCESS_LOCAL:进程本地化,rdd的partition和task在同一个executor中(最快)

43
// NODE_LOCAL:rdd的partition和task,不在一个同一个executor中,但在同一个节点中

44
// NO_PREF:没有本地化级别

45
// RACK_LOCAL:机架本地化,至少rdd的partition和task在一个机架中

46
// ANY:任意本地化级别

47

48
// 按照从最小本地化级别到最大本地化级别的顺序,尝试把taskSet中的task在executor上启动,

49
// 直到task在某种本地化级别下task全部启动

50
var launchedTask = false

51
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {

52
do {

53
launchedTask = resourceOfferSingleTaskSet(

54
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)

55
} while (launchedTask)

56
}

57

58
if (tasks.size > 0) {

59
hasLaunchedTask = true

60
}

61
return tasks

62
}

4、resourceOfferSingleTaskSet

1
private def resourceOfferSingleTaskSet(

2
taskSet: TaskSetManager,

3
maxLocality: TaskLocality,

4
shuffledOffers: Seq[WorkerOffer],

5
availableCpus: Array[Int],

6
tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {

7
var launchedTask = false

8
for (i <- 0 until shuffledOffers.size) {

9
val execId = shuffledOffers(i).executorId

10
val host = shuffledOffers(i).host

11
// 当前executor的可用cpu数量至少大于每个task要使用的cpu数量(默认是1)

12
if (availableCpus(i) >= CPUS_PER_TASK) {

13
try {

14
// 查找在executor上用那种本地化级别启动taskSet中的task

15
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {

16
// 给指定的executor加上要启动task

17
tasks(i) += task

18
// 更新分配信息

19
val tid = task.taskId

20
taskIdToTaskSetId(tid) = taskSet.taskSet.id

21
taskIdToExecutorId(tid) = execId

22
executorsByHost(host) += execId

23
availableCpus(i) -= CPUS_PER_TASK

24
assert(availableCpus(i) >= 0)

25
launchedTask = true

26
}

27
} catch {

28
case e: TaskNotSerializableException =>

29
logError(s"****Resource**** ****offer**** ****failed,**** ****task**** ****set**** ****${taskSet.name}**** ****was**** ****not**** ****serializable****")

30
// Do not offer resources for this task, but don't throw an error to allow other

31
// task sets to be submitted.

32
return launchedTask

33
}

34
}

35
}

36
return launchedTask

37
}

5、launchTasks

1
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {

2
for (task <- tasks.flatten) {

3
// 将每个executor要执行的task信息进行序列化

4
val ser = SparkEnv.get.closureSerializer.newInstance()

5
val serializedTask = ser.serialize(task)

6
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {

7
val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)

8
scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>

9
try {

10
var msg = "****Serialized**** ****task**** ****%s:%d**** ****was**** ****%d**** ****bytes,**** ****which**** ****exceeds**** ****max**** ****allowed:**** ****" +

11
"****spark.akka.frameSize**** ****(%d**** ****bytes)**** ****-**** ****reserved**** ****(%d**** ****bytes).**** ****Consider**** ****increasing**** ****" +

12
"****spark.akka.frameSize**** ****or**** ****using**** ****broadcast**** ****variables**** ****for**** ****large**** ****values.****"

13
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,

14
AkkaUtils.reservedSizeBytes)

15
taskSet.abort(msg)

16
} catch {

17
case e: Exception => logError("****Exception**** ****in**** ****error**** ****callback****", e)

18
}

19
}

20
}

21
else {

22
val executorData = executorDataMap(task.executorId)

23
// 在对应的executor的资源中减去要使用的cpu资源

24
executorData.freeCores -= scheduler.CPUS_PER_TASK

25
// 向executor发送launchTask消息来启动task

26
executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask))

27
}

28
}

29
}

说明:

1、resourceOffer方法功能:判断executor本地化级别的等待时间是否在一定范围内,如果在就认为task使用本地化级别可以在executor上启动。

2、TaskSetManager功能:对一个单独的TaskSet的任务进行调度,该类负责追踪每个task,如果task失败会负责重试,知道超过重试次数的限制,且会通过延迟调度为该TaskSet处理本地化调度机制,它主要接口是resourceOffer,在这个接口中,TaskSet会希望在一个节点上运行一个任务,并接受任务的状态改变消息,来知道它负责的task的状态改变了。

3、本地化级别种类: PROCESS_LOCAL:进程本地化,rdd的partition和task在同一个executor中(最快) NODE_LOCAL:rdd的partition和task,不在一个同一个executor中,但在同一个节点中 NO_PREF:没有本地化级别 RACK_LOCAL:机架本地化,至少rdd的partition和task在一个机架中 ANY:任意本地化级别

分类: Spark

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,001评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,210评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,874评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,001评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,022评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,005评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,929评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,742评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,193评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,427评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,583评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,305评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,911评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,564评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,731评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,581评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,478评论 2 352

推荐阅读更多精彩内容