3.4 RDD的计算

3.4 RDD的计算

3.4.1 Ta s k简介

原始的RDD经过一系列转换后,会在最后一个RDD上触发一个动作,这个动作会生成一个Job。在Job被划分为一批计算任务(Task)后,这批Task会被提交到集群上的计算节点去计算。计算节点执行计算逻辑的部分称为Executor。Executor在准备好Task的运行时环境后,会通过调用org.apache.spark.scheduler.Task#run来执行计算。Spark的Task分为两种:

1)org.apache.spark.scheduler.ShuffleMapTask

2)org.apache.spark.scheduler.ResultTask

简单来说,DAG的最后一个阶段会为每个结果的Partition生成一个ResultTask,其余所有的阶段都会生成ShuffleMapTask。生成的Task会被发送到已经启动的Executor上,由Executor来完成计算任务的执行,执行过程的实现在org.apache. spark.executor.Executor.TaskRunner#run。第6章会介绍这一部分的实现原理和设计思想。

3.4.2 Task的执行起点

org.apache.spark.scheduler.Task#run会 调 用ShuffleMapTask或 者ResultTask的runTask;runTask会调用RDD的org.apache.spark.rdd.RDD#iterator。计算由此开始。

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {

if(storageLevel != StorageLevel.NONE) {

//如果存储级别不是NONE,那么先检查是否有缓存;没有缓存则要进行计算

SparkEnv.get.cacheManager.getOrCompute(this, split, context,storageLevel)

} else {

//如果有checkpoint,那么直接读取结果;否则直接进行计算

computeOrReadCheckpoint(split, context)

}

}

其中,SparkEnv中包含了一个运行时节点所需要的所有的环境信息。cache-Manager是org.apache.spark.CacheManager,它负责调用BlockManager来管理RDD的缓存,如果当前RDD原来计算过并且把结果缓存起来,那么接下来的运算都可以通过BlockManager来直接读取缓存后返回。SparkEnv除了cacheManager,还包括以下重要的成员变量:

1)akka.actor.ActorSystem:运行在该节点的Actor System,其中运行在Driver上的名字是sparkDriver;运行在Executor上的是sparkExecutor。

2)org.apache.spark.serializer.Serializer:序列化和发序列化的工具。

3)org.apache.spark.MapOutputTracker;保存Shuffle Map Task输出的位置信息。其中在Driver上的Tracer是org.apache.spark.MapOutputTrackerMaster;而在Executor上的Tracker是org.apache.spark.MapOutputTrackerWorker,它会从org.apache.spark. MapOutputTrackerMaster获取信息。

4)org.apache.spark.shuffle.ShuffleManager:Shuffle的管理者,其中Driver端会注册Shuffle的信息,而Executor端会上报和获取Shuffle的信息。现阶段内置支持Hash Based Shuffle和Sort Based Shuffle,具体实现细节请参阅第7章。

5)org.apache.spark.broadcast.BroadcastManager:广播变量的管理者。

6)org.apache.spark.network.BlockTransferService:Executor读取Shuffle数据的Client。当前支持netty和nio,可以通过spark.shuffle.blockTransferService来设置。具体详情可以参阅第7章。

7)org.apache.spark.storage.BlockManager:提供了Storage模块与其他模块的交互接口,管理Storage模块。

8)org.apache.spark.SecurityManager:Spark对于认证授权的实现。

9)org.apache.spark.HttpFileServer:可以提供HTTP服务的Server。当前主要用于Executor端下载依赖。

10)org.apache.spark.metrics.MetricsSystem:用于搜集统计信息。

11)org.apache.spark.shuffle.ShuffleMemoryManager:管理Shuffle过程中使用的内存。ExternalAppendOnlyMap 和ExternalSorter都会从ShuffleMemoryManager中申请内存,在数据spill到Disk后会释放内存。当然了,当Task退出时这个内存也会被回收。为了使得每个thread都会比较公平地获取内存资源,避免一个thread申请了大量内存后造成其他的thread需要频繁地进行spill操作,它采取的内存分配策略是:对于N个thread,每个thread可以至少申请1/(2*N)的内存,但是至多申请1/N。这个N是动态变化的,感兴趣的读者可以查阅这个类的具体实现。

在用户创建org.apache.spark.SparkContext时会创建org.apache.spark.SparkEnv。

3.4.3 缓存的处理

如果存储级别不是NONE,那么先检查是否有缓存;没有缓存则要进行计算。什么是存储级别?从用户的角度来看就是缓存保存到不同的存储位置,比如内存、硬盘、Tachyon;还有缓存的数据是否需要序列化等。详细的存储级别的介绍可以参阅第8章。

cacheManager对Storage模块进行了封装,使得RDD可以更加简单地从Storage模块读取或者写入数据。RDD的每个Partition对应Storage模块的一个Block,只不过Block是Partition经过处理后的数据。在系统实现的层面上,可以认为Partition和Block是一一对应的。cacheManager会通过getOrCompute来判断当前的RDD是否需要进行计算。

首先,cacheManager会通过RDD的ID和当前计算的Partition的ID向Storage模块的BlockManager发起查询请求,如果能够获得Block的信息,会直接返回Block的信息。否则,代表该RDD是需要计算的。这个RDD以前可能计算过并且被存储到了内存中,但是后来由于内存紧张,这部分内存被清理了。在计算结束后,计算结果会根据用户定义的存储级别,写入BlockManager中。这样,下次就可以不经过计算而直接读取该RDD的计算结果了。核心实现逻辑如下:

def getOrCompute[T](

rdd: RDD[T],

partition: Partition,

context: TaskContext,

storageLevel: StorageLevel): Iterator[T] = {

//获取RDD的BlockId

val key = RDDBlockId(rdd.id, partition.index)

logDebug(s"Looking for partition $key")

blockManager.get(key) match { //向BlockManager查询是否有缓存

case Some(blockResult) => //缓存命中

//更新统计信息,将缓存作为结果返回

context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)

new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])


case None => //没有缓存命中,需要计算

// 判断当前是否有线程在处理当前的Partition,如果有那么等待它结束后,直接从Block

// Manager中读取处理结果如果没有线程在计算,那么storedValues就是None,否则

// 就是计算的结果

val storedValues = acquireLockForPartition[T](key)

if (storedValues.isDefined) { // 已经被其他线程处理了,直接返回结果

return new InterruptibleIterator[T](context, storedValues.get)

}


// 需要计算

try {

// 如果被checkpoint过,那么读取checkpoint的数据;否则调用rdd的compute()开始


  // 计算

        val computedValues = rdd.computeOrReadCheckpoint(partition,context)

// Task是在Driver端执行的话就不需要缓存结果,这个主要是为了first() 或者take()

// 这种仅仅有一个执行阶段的任务的快速执行。这类任务由于没有Shuffle阶段,直接运行

// 在Driver端可能会更省时间

if (context.isRunningLocally) {

return computedValues


}

// 将计算结果写入到BlockManager

val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]

val cachedValues =

putInBlockManager(key, computedValues, storageLevel, updatedBlocks)

// 更新任务的统计信息

val metrics = context.taskMetrics

val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(

Seq[(BlockId, BlockStatus)]())

metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)

new InterruptibleIterator(context, cachedValues)

} finally {

loading.synchronized {

loading.remove(key)

// 如果有其他的线程在等待该Partition的处理结果,那么通知它们计算已经完成,结果已

// 经存到BlockManager中(注意前面那类不会写入BlockManager的本地任务)

// loading.notifyAll()

}

}

}

}

3.4.4 checkpoint的处理

在缓存没有命中的情况下,首先会判断是否保存了RDD的checkpoint,如果有,则读取checkpoint。为了理解checkpoint的RDD是如何读取计算结果的,需要先看一下checkpoint的数据是如何写入的。

首先在Job结束后,会判断是否需要checkpoint。如果需要,就调用org.apache. spark.rdd.RDDCheckpointData#doCheckpoint。doCheckpoint首先为数据创建一个目录;然后启动一个新的Job来计算,并且将计算结果写入新创建的目录;接着创建一个org.apache.spark.rdd.CheckpointRDD;最后,原始RDD的所有依赖被清除,这就意味着RDD的转换的计算链(compute chain)等信息都被清除。这个处理逻辑中,数据写入的实现在org.apache.spark.rdd.CheckpointRDD$#writeToFile。简要的核心逻辑如下:

// 创建一个保存checkpoint数据的目录

val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)

val fs = path.getFileSystem(rdd.context.hadoopConfiguration)

if (!fs.mkdirs(path)) {

throw new SparkException("Failed to create checkpoint path " + path)

}


// 创建广播变量

val broadcastedConf = rdd.context.broadcast(

new SerializableWritable(rdd.context.hadoopConfiguration))

//开始一个新的Job进行计算,计算结果存入路径path中

rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, broadcastedConf) _)

//根据结果的路径path来创建CheckpointRDD

val newRDD = new CheckpointRDD[T](rdd.context, path.toString)

//保存结果,清除原始RDD的依赖、Partition信息等

RDDCheckpointData.synchronized {

cpFile = Some(path.toString)

cpRDD = Some(newRDD) // RDDCheckpointData对应的CheckpointRDD

rdd.markCheckpointed(newRDD)      // 清除原始RDD的依赖,Partition

cpState = Checkpointed            //标记checkpoint的状态为完成

}

至此,RDD的checkpoint完成,其中checkpoint的数据可以通过checkpointRDD的readFromFile读取。但是,上述逻辑在清除了RDD的依赖后,并没有和check-pointRDD建立联系,那么Spark是如何确定一个RDD是否被checkpoint了,而且正确读取checkpoint的数据呢?

答案就在org.apache.spark.rdd.RDD#dependencies的实现,它会首先判断当前的RDD是否已经Checkpoint过,如果有,那么RDD的依赖就变成了对应的CheckpointRDD:

privatedefcheckpointRDD: Option[RDD[T]]=checkpointData.flatMap(_.checkpointRDD)

final def dependencies: Seq[Dependency[_]] = {

checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {

if (dependencies_ == null) { //没有checkpoint

dependencies_ = getDependencies

}

dependencies_

}

}

理解了Checkpoint的实现过程,接下来看一下computeOrReadCheckpoint的实现。前面提到了,它一共在两个地方被调用,org.apache.spark.rdd.RDD#iterator和org.apache. spark.CacheManager#getOrCompute。它实现的逻辑比较简单,首先检查当前RDD是否被Checkpoint过,如果有,读取Checkpoint的数据;否则开始计算。实现如下:

private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext)

: Iterator[T] =

{

if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)

}

firstParent[T].iterator(split,context)会调用对应CheckpointRDD的iterator,最终调用到它的compute:

override def compute(split: Partition, context: TaskContext): Iterator[T] = {

val file=new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))

CheckpointRDD.readFromFile(file, broadcastedConf, context) //读取Checkpoint的数据

}

3.4.5 RDD的计算逻辑

RDD的计算逻辑在org.apache.spark.rdd.RDD#compute中实现。每个特定的RDD都会实现compute。比如前面提到的CheckpointRDD的compute就是直接读取checkpoint数据。HadoopRDD就是读取指定Partition的数据。MapPartitionsRDD就是将用户的转换逻辑作用到指定的Partition上。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,752评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,100评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,244评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,099评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,210评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,307评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,346评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,133评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,546评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,849评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,019评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,702评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,331评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,030评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,260评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,871评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,898评论 2 351

推荐阅读更多精彩内容