上两节我们讲了普通shuffle的操作原理,与优化后的操作原理。并对比了他们各自的特别。那么我就了解到spark shuffle其实是进行了两步
第一步,ShuffleMapTask执行后把计算出来的数据写入ShuffleBlockFile里
第二步,ResultTask读取这些数据文件进行计算。
节章节就是深入剖析这两步的源码。
我们在前面讲过Executor在执行Task时,调用runTask方法,并返回MapStatus
try {
//获取shuffleManager,在用shuffleManager获取shuffleWriter对象
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
//最重要的代码在这里
/**
* 首先调用了rdd.iterator方法,参数是传入了当前 task要处理的partition
* 所以核心的逻辑,就是rdd的iterator方法中,在这里,就实现了针对RDD的某个partitione,执行
* 我们定义的算子或者函数
* 当执行完我们自定义的算子或者函数,是不是相当于针对rdd的partiton执行了处理,那么是不是有返回值
* ok,返回的数据,都是通过ShuffleWriter,经过HashPartitioner进行分区后,写入自己对应的bucket
*/
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
/**
* 最后,结果返回MapStatus
* Mapstatus封装了ShuffleMapTask计算 后的数据,存储在哪里,这其实就是BlockManager相关的信息
* BlockManager是spark底层的内存,数据,磁盘管理的组件
* 讲完shuffle,我们会讲blockManager
*/
return writer.stop(success = true).get
}
里面讲了
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
其实这个writer就是HashSuffleWriter.里write方法里,我们首先要判断是否需要在Map端本地聚会。是什么情况下可以聚合呢,这要看我们实际的业务,比如前面说的reduceBykey。
//将每个ShuffleMapTask计算出来的rdd的partition数据,写入本地磁盘
override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
//首先要判断,是否需要在map端本地聚合
//这里包括reduceByKey,这样的操作,dep.aggreatetor.isDefined是true
//包括dep.mapSideCombine也是true
val iter = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
//这里是执行本地聚会
//比如本地:(hello,1)(hello,1) ==> (hello,2)
dep.aggregator.get.combineValuesByKey(records, context)
} else {
records
}
} else {
require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
records
}
那么这里说到了本地聚合,也可以叫组合(combine),有什么用呢,大家想一相,是不是这里要走网络传递了,要是本地聚合后,大大减少了网络流量 了
接着我们看源码:
//如果本地聚会就本地聚会,
//然后遍历数据
//对每个数据,调用partitioner,默认是HashPartitioner,生成bucketId
//也就是决定 了,每一份数据写入哪个bucket中
for (elem <- iter) {
val bucketId = dep.partitioner.getPartition(elem._1)
// 调用shuffleBlockManager.forMapTask来生成bucketId对应的Writer,然后把数据写入bucket
shuffle.writers(bucketId).write(elem)
}
}
接着我们看forMapTask的方法。这个方法里就看到我们前面两节讲的普通shuffle与优化后的shuffle写入本地磁盘的区别。
ivate val shuffleState = shuffleStates(shuffleId)
private var fileGroup: ShuffleFileGroup = null
//这里很关键,前面我们讲过Shuffle有两种,一种是普通的,一种是优化后的
//这里会判断,如果开启了consolidate,就是consoldateShuffleFile是true
//这里不会为每个bucket都获取一个独立的文件
//而是为这个bucket,获取一个ShuffleGroup的Writer
val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
fileGroup = getUnusedFileGroup()
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
//首先用shuffleId, mapId, bucketId来生成一个唯一的blockId
//然后用bucket获取一个ShuffleGroup
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
//然后用BlockManager.getDiskWriter方法,针对ShuffleGroup获取一个Writer
/**
* 这样我们就清楚了,如果开启了consolidate机制
* 实际上,对于每一个bucket,都会获取一个针对ShuffleFileGroup的writer
* 而不是一个独立的ShuffleBlockFile的writer
*
* 这样就实现了多个ShuffleMapTask的输出 数据的合并
*/
blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize,
writeMetrics)
}}
上面讲了当开启了consolidate机制后,对于每一个bucket都会获取一个ShuffleFileGroup的writer
而普通的shuffle的源码如下:
else {
//如果没有开启consolicate机制,也就是普通的Shuffle
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
//同样生成一个blockId
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
//然后调用 blockManager.diskBlockManager,获取了一个代表要写入磁盘文件的blockFile
val blockFile = blockManager.diskBlockManager.getFile(blockId)
// Because of previous failures, the shuffle file may already exist on this machine.
// If so, remove it.
if (blockFile.exists) {
if (blockFile.delete()) {
logInfo(s"Removed existing shuffle file $blockFile")
} else {
logWarning(s"Failed to remove existing shuffle file $blockFile")
}
}
//然后调用 这个方法,针对那个blockFile生成writer
blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize, writeMetrics)
/**
*所以对于普通的Shuffle操作的话
* 对于每个ShuffleMapTask输出的bucket,都会在本地获取一个单独的ShuffleBlockFIle文件
*/
}
我们看到了不管是哪种的shuffle,最终调用blockMamager.getDiskWriter方法写数据到本地磁盘。这就是Shuffle的第一步,写文件数据操作。
接下来我们看第二步,
在前面分析Task是不是分析了task的计算 ,调用了compute方法,里面是不是通过getReader方法得到ShuffleMapReader,调用read方法
override def compute(split: Partition, context: TaskContext): Iterator[(K, C)] = {
val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
.read()
.asInstanceOf[Iterator[(K, C)]]
}
override def read(): Iterator[Product2[K, C]] = {
val ser = Serializer.getSerializer(dep.serializer)
//这里和我们以前分析的图串起来了吧DAGScheduler的MapoutputTrackerMaster中获取自己想要的数据信息
//然后底层用 blockMamger拉取自己的数据
val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)
val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))
} else {
new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))
}
} else {
require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
// Convert the Product2s to pairs since this is what downstream RDDs currently expect
iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))
}
用blockManager拉取自己的数据,调用ftech方法:
def fetch[T](
shuffleId: Int,
reduceId: Int,
context: TaskContext,
serializer: Serializer)
: Iterator[T] =
{
logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))
val blockManager = SparkEnv.get.blockManager
val startTime = System.currentTimeMillis
/**
* 拿到mapOutputTracker的引用 ,然后调用getServerStatuses
* suffleId表示这个stage的上一个stage的ID
* reduceId是bucketId
* 这两个参数可以限制找到当前resultTask获取所需要的那份数据
* getServerStatuses这个方法一定会走网络通信的,因为要联系dirver的
*/
val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)
//ShuffleBlockFetcherIterator构造 以后,就直接根据拉取地睛位置信息,通过BlockMamager
//去远程的ShuffleTask所以节点的blockManager去拉取数据
val blockFetcherItr = new ShuffleBlockFetcherIterator(
context,
SparkEnv.get.blockManager.shuffleClient,
blockManager,
blocksByAddress,
serializer,
SparkEnv.get.conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024)
val itr = blockFetcherItr.flatMap(unpackBlock)
//对拉取的数据进行封装
val completionIter = CompletionIterator[T, Iterator[T]](itr, {
context.taskMetrics.updateShuffleReadMetrics()
})
这就是ResultTask读取数据的过程。在spark中Shuffle是重点。