我们从这个入口开始分析,task对rdd开始处理。
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
/**
* 如果StorageLevel不为NONE,
说明RDD之前被我们持久化过了,就不需要重新计算了,
尝试使用cacheManager,去获取持久化的数据。
*
* */
if (storageLevel != StorageLevel.NONE) {
// 从缓存中获取
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
// 进行rdd的计算或者从checkPoit获取。
computeOrReadCheckpoint(split, context)
}
}
SparkEnv.get.cacheManager.getOrCompute 方法追踪
// 从缓存读取
def getOrCompute[T](
rdd: RDD[T],
partition: Partition,
context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = {
// 由rdd的id和partion的组成唯一key,形式为"rddId_partionIndex",即blockId
val key = RDDBlockId(rdd.id, partition.index)
logDebug(s"Looking for partition $key")
// 如果能够通过BlockManager获取到数据的话,那么就直接返回了。
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
val existingMetrics = context.taskMetrics
.getInputMetricsForReadMethod(blockResult.readMethod)
existingMetrics.incBytesRead(blockResult.bytes)
val iter = blockResult.data.asInstanceOf[Iterator[T]]
new InterruptibleIterator[T](context, iter) {
override def next(): T = {
existingMetrics.incRecordsRead(1)
delegate.next()
}
}
//虽然RDD已经持久化过,
//但是有可能一些原因会获取不到数据,数据既不在本地内存
// 或磁盘,也不在 remote BlockManger的
// 本地磁盘
case None =>
/**acquireLockForPartition 这个方法中还是在尝试通过
*调用BlockManger的get方法去获取数据,如果获取到了,那么就直接返回
* */
val storedValues = acquireLockForPartition[T](key)
if (storedValues.isDefined) {
return new InterruptibleIterator[T](context, storedValues.get)
}
try {
logInfo(s"Partition $key not found, computing it")
/**如果rdd之前checkpoint过,那么尝试读取它的checkpoint,但是如果没有checkpoint过,那么此时只能
*执行算子,重新计算。
* */
val computedValues = rdd.computeOrReadCheckpoint(partition, context)
if (context.isRunningLocally) {
return computedValues
}
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
/**因为我们在获取数据的时候走的CacheManager,这说明之前肯定是有持久化级别的,被cache过,
* 只是由于某些原因,持久化的数据没有找到,
* 所以读取checkpoint数据,或者重新计算数据之后,再用putInBlockManager方法,将数据在blockmanager中
* 持久化一份。
* */
val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
val metrics = context.taskMetrics
val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
new InterruptibleIterator(context, cachedValues)
} finally {
// 释放锁
loading.synchronized {
loading.remove(key)
loading.notifyAll()
}
}
}
}
BlockManager get方法
/**
* 通过BlockManager获取数据的入口方法
* 获取的时候优先从本地获取,如果本地没有,那么从远程获取
*
* */
def get(blockId: BlockId): Option[BlockResult] = {
// 从本地获取
val local = getLocal(blockId)
if (local.isDefined) {
logInfo(s"Found block $blockId locally")
return local
}
// 从远端拉取
val remote = getRemote(blockId)
if (remote.isDefined) {
logInfo(s"Found block $blockId remotely")
return remote
}
None
}
再追踪下getLocal方法,可以看到起内部调用了doGetLocal方法
private def doGetLocal(blockId: BlockId, asBlockResult: Boolean):
Option[Any] = {
//每个blockManager,维护一个map blockInfo
/*
* 这里维护的blockInfo,可以用于代表一个block
* 然后blockInfo最大的作用,就是说用于作为多线程并发访问同一个Block的同步监视器
* 保证安全
* */
// 首先尝试获取block对应的blockInfo的锁
val info = blockInfo.get(blockId).orNull
if (info != null) {
/*
* 对所有的blockInfo都会进行多线程并发访问的同步操作
* 所以BlockInfo,相当于一个block,用于作为多线程并发访问的同步监视器
* */
info.synchronized {
// 再次确认block是否存在。
if (blockInfo.get(blockId).isEmpty) {
logWarning(s"Block $blockId had been removed")
return None
}
// 如果其他线程在操作这个block
// 那么其实会卡住 ,获取blockInfo的排他锁
//如果始终没有获取到,返回false,那么就直接返回
if (!info.waitForReady()) {
// If we get here, the block write failed.
logWarning(s"Block $blockId was marked as failure.")
return None
}
val level = info.level
logDebug(s"Level for block $blockId is $level")
// 持久化级别使用了内存
// 尝试从memoryStore中获取数据
if (level.useMemory) {
logDebug(s"Getting block $blockId from memory")
val result = if (asBlockResult) {
// 从缓存中通过blockManager反序列化的数据返回
memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))
} else {
// 获得序列化后的数据
memoryStore.getBytes(blockId)
}
result match {
case Some(values) =>
return result
case None =>
logDebug(s"Block $blockId not found in memory")
}
}
if (level.useOffHeap) {
logDebug(s"Getting block $blockId from ExternalBlockStore")
if (externalBlockStore.contains(blockId)) {
val result = if (asBlockResult) {
externalBlockStore.getValues(blockId)
.map(new BlockResult(_, DataReadMethod.Memory, info.size))
} else {
externalBlockStore.getBytes(blockId)
}
result match {
case Some(values) =>
return result
case None =>
logDebug(s"Block $blockId not found in ExternalBlockStore")
}
}
}
// 尝试将磁盘上的数据缓存至内存
if (level.useDisk) {
logDebug(s"Getting block $blockId from disk")
val bytes: ByteBuffer = if (diskStore.contains(blockId)) {
// DiskStore.getBytes() always returns Some, so this .get() is guaranteed to be safe
// 通过java的nio进行文件读取数据
diskStore.getBytes(blockId).get
} else {
// Remove the missing block so that its unavailability is reported to the driver
// 删除丢失的block
removeBlock(blockId)
throw new BlockException(
blockId, s"Block $blockId not found on disk, though it should be")
}
assert(0 == bytes.position())
if (!level.useMemory) {
//没有使用内存级别的话,直接将数据返回
if (asBlockResult) {
return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk,
info.size))
} else {
return Some(bytes)
}
} else {
/*
* 如果使用了Disk级别,也使用了memory级别
* 那么从disk读取出来之后,其实会尝试将其放入memoryStore中,也就是缓存到内存中
* */
if (!level.deserialized || !asBlockResult){
memoryStore.putBytes(blockId, bytes.limit, () => {
val copyForMemory = ByteBuffer.allocate(bytes.limit)
copyForMemory.put(bytes)
})
bytes.rewind()
}
if (!asBlockResult) {
return Some(bytes)
} else {
val values = dataDeserialize(blockId, bytes)
if (level.deserialized) {
val putResult = memoryStore.putIterator(
blockId, values, level, returnValues = true, allowPersistToDisk = false)
putResult.data match {
case Left(it) =>
return Some(new BlockResult(it, DataReadMethod.Disk, info.size))
case _ =>
throw new SparkException("Memory store did not return an iterator!")
}
} else {
return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
}
}
}
}
}
} else {
logDebug(s"Block $blockId not registered locally")
}
None
}
getRemote 方法 内部调用了 doGetRemote方法:
// 从远端获取数据
private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
require(blockId != null, "BlockId is null")
/*
* 首先BlcokMangerMaster上,获取每个BlockId对应的BlockManger的信息
* 然后随机打乱
* */
val locations = Random.shuffle(master.getLocations(blockId))
var numFetchFailures = 0
// 遍历每一个blockManager
for (loc <- locations) {
logDebug(s"Getting remote block $blockId from $loc")
val data = try {
// 使用BlockTransferService 进行异步的远程网络获取,将block数据传输回来
// 连接的时候,使用Blockmanager的唯一的标识,host,port,executorId
blockTransferService.fetchBlockSync(
loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
} catch {
case NonFatal(e) =>
numFetchFailures += 1
// 如果拉取的次数和副本数相等,抛出异常。
if (numFetchFailures == locations.size) {
// An exception is thrown while fetching this block from all locations
logWarning(s"Failed to fetch block from" +
s" ${locations.size} locations. Most recent failure cause:", e)
return None
} else {
// This location failed, so we retry fetch from a different one by returning null here
logWarning(s"Failed to fetch remote block $blockId " +
s"from $loc (failed attempt $numFetchFailures)", e)
null
}
}
if (data != null) {
if (asBlockResult) {
return Some(new BlockResult(
dataDeserialize(blockId, data),
DataReadMethod.Network,
data.limit()))
} else {
return Some(data)
}
}
logDebug(s"The value of block $blockId is null")
}
logDebug(s"Block $blockId not found")
None
}