-
Task Locality
Data locality can have a major impact on the performance of Spark jobs. If data and the code that operates on it are together then computation tends to be fast. But if code and data are separated, one must move to the other. Typically it is faster to ship serialized code from place to place than a chunk of data because code size is much smaller than data. Spark builds its scheduling around this general principle of data locality.
Data locality is how close data is to the code processing it. There are several levels of locality based on the data’s current location. In order from closest to farthest:
PROCESS_LOCAL - data is in the same JVM as the running code. This is the best locality possible
NODE_LOCAL - data is on the same node. Examples might be in HDFS on the same node, or in another executor on the same node. This is a little slower than PROCESS_LOCAL because the data has to travel between processes
NO_PREF - data is accessed equally quickly from anywhere and has no locality preference
RACK_LOCAL - data is on the same rack of servers. Data is on a different server on the same rack so needs to be sent over the network, typically through a single switch
ANY - data is elsewhere on the network and not in the same rack
Spark prefers to schedule all tasks at the best locality level, but this is not always possible. In situations where there is no unprocessed data on any idle executor, Spark switches to lower locality levels. There are two options:
a) wait until a busy CPU frees up to start a task on data on the same server, or
b) immediately start a new task in a farther away place that requires moving data there.
What Spark typically does is wait a bit in the hopes that a busy CPU frees up. Once that timeout expires, it starts moving the data from far away to the free CPU. The wait timeout for fallback between each level can be configured individually or all together in one parameter; see the spark.locality parameters on the configuration page for details. You should increase these settings if your tasks are long and see poor locality, but the default usually works well.
-
1、获取partition位置信息
[DAGScheduler]->private def submitMissingTasks(stage: Stage, jobId: Int)
...
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id =>
/**
getPreferredLocs - 获取partition数据的位置信息,下文将分析不
同情况下获取该信息的方式。
**/
(id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
val job = s.activeJob.getpartitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
}
...
[DAGScheduler]->submitMissingTasks->getPreferredLocsInternal
private def getPreferredLocsInternal(
rdd: RDD[_],
partition: Int,
visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
...
/**
cacheLocs 维护RDD的partitions 的 location信息,该信息是
TaskLocation的实例。
如果cacheLocs没有当前partition的location信息,则会执行如下逻辑:
如果RDD的storageLevel为空返回nil,并填入cacheLocs,否则会通过
blocakManagerMaster来获取持有该partition信息的 blockManager
并实例化ExecutorCacheTaskLocation放入cacheLocs中。
具体参看getCecheLocs方法。关于block及cache细节将在Storage章
节具体分析。
**/
val cached = getCacheLocs(rdd)(partition)
if (cached.nonEmpty) {
return cached
}
/**
RDD有个方法preferredLocations,该方法首先尝试获取
partition的checkPoint信息,如果未进行过checkPoint则调用
getPreferredLocations(split),不同的RDD有不同的实现。例如:
HadoopRdd即通过Hadoop InputSplit 来获取当前partition的位置。
如果当前RDD既未cache也不是输入RDD,则进行下一个逻辑。
**/
val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
if (rddPrefs.nonEmpty) {
return rddPrefs.map(TaskLocation(_))
}
/**
当RDD未cache也不是输入RDD即无法通过preferredLocations来获取
partition位置信息时则通过递归寻找父RDD对应的partition的位置信息
,该方式只对窄依赖有效。
**/
rdd.dependencies.foreach {
case n: NarrowDependency[_] =>
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
if (locs != Nil) {
return locs
}
}
case _ =>
}
Nil}
-
2、Task构造
[DAGScheduler]->private def submitMissingTasks(stage: Stage, jobId: Int)
...
/**
根据不同的stage类型构造不同类型的task。
每个partition对应一个task且每个task都包含目标partition的location信
息,最终所有tasks将作为taskSet进行提交。
注:task的实际执行逻辑已经序列化到taskBinary中并broadcast到每个
executor上,此处构造的tasks只是加上了location信息,目的是通过driver端的TaskScheduler进行调度,并不会将该taskSet进行序列化和广播。
**/
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
...
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.latestInfo.taskMetrics, properties)
}
case stage: ResultStage =>
val job = stage.activeJob.get
partitionsToCompute.map { id =>
...
new ResultTask(stage.id, stage.latestInfo.attemptId,taskBinary,
part, locs, id, properties, stage.latestInfo.taskMetrics)
}
}
}
...
if (tasks.size > 0) {
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
stage.pendingPartitions ++= tasks.map(_.partitionId)
logDebug("New pending partitions: " + stage.pendingPartitions)
/**
构造TaskSet进行提交,大部分情况下使用的是
TaskSchedulerImpl,DAG通过调用其实例进行task提交,而
TaskSchedulerImpl通过TaskSetManager的实例
对taskSet进行管理(
TaskSetmanager在实例化过程中会对
task进行executor分配,有且仅有两种分配方式根据上述
preferedLocation类型而定:
ExecutorCacheTaskLocation即cache在executor上的RDD:
HDFSCacheTaskLocation:及hdfs输入数据或者checkpoint数据
详见[TaskSetManager]->addPendingTask
)。细节在'spark调度'章节
会描述。
**/
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
}
...
-
2、Task提交
[TaskSchedulerImpl]->def submitTasks(taskSet: TaskSet)
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " +
tasks.length + " tasks")
this.synchronized {
/**Schedules the tasks within a single TaskSet in the
TaskSchedulerImpl. This class keeps track of each task, retries
tasks if they fail (up to a limited number of times), and handles
locality-aware scheduling for this TaskSet via delay scheduling.
The main interfaces to it are resourceOffer, which asks the
TaskSet whether it wants to run a task on one node,and
statusUpdate, which tells it that one of its tasks changed state
(e.g. finished).
**/
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
...
/**
有两种实现对应不同的task调度算法(与OS中的调度一样):
1、FIFOSchedulableBuilder
2、FairSchedulableBuilder
schedulableBuilder中持有Pool用于管理taskmanager,并根据不
同的调度算法返回不同顺序的taskmanager。
同时该pool的checkSpeculatableTasks方法用于对开启了speculate
的job进行task的重复执行。
此处实际操作是将taskManager放入pool中,进行异步调度。
**/
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
/**
on yarn的情况下此处的backend为CoarseGrainedSchedulerBackend
的实例,该backend持有当前job执行状态下所有executor信息,并可
对其进行管理,该backend启动在driver端。此处调用最终会调用
backend.makeOffers进行资源申请并触发task调度。
**/
backend.reviveOffers()
}
-
3、Task 资源申请及调度
[CoarseGrainedSchedulerBackend]->private def makeOffers()**
/**
该方法会在每次系统资源发生变化时被调用,例如executor向backend
进行注册时,task完成时。executor向backend发送消息,backend在
处理逻辑最后一步触发一次task调度逻辑。spark中的所有异步调度
都是类似的处理方法,例如standalone模式下对executor的调度、
waitingStage的调度等。
**/
private def makeOffers() {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
/**
获取系统当前每个executor的可用cpu资源以case class workerOffer
返回。在后面对task进行分配时以此为依据执行locality逻辑。
**/
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq
/**
该方法会序列化TaskDescription并发送到相应的executor上进行
逻辑执行。
scheduler.resourceOffers(workOffers):进行具体的task分配
逻辑。
**/
launchTasks(scheduler.resourceOffers(workOffers))
}
[CoarseGrainedSchedulerBackend]->private def makeOffers()
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]]
= synchronized {
var newExecAvail = false
/**
如果当前系统executors中有新加入的,这里
executorAdded(o.executorId, o.host)最终会调用DAGScheduler中
的submitWaitingStages()进行一次stage的提交,正如上限task
的调度一样,当系统资源发生变化时即触发一次调度逻辑。
**/
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
newExecAvail = true
}
for (rack <- getRackForHost(o.host)) {
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
}
}
// Randomly shuffle offers to avoid always placing tasks on the same set of workers.
val shuffledOffers = Random.shuffle(offers)
/**
Build a list of tasks to assign to each worker.
根据每个executor可用core的数量进行task分配,每个core对应一个
task。
**/
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
/**
此处如上所述根据不同的调度算法得到不同排序的task,也即
最终的执行顺序。
FIFO或者FAIR。
**/
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
/**
当启动了新的executor,这里会重新计算task的locality。
TaskSetManager.recomputeLocality
**/
if (newExecAvail) {
taskSet.executorAdded()
}
}
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
launchedTask = resourceOfferSingleTaskSet(
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)
}
if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
}