事情起因是一位同事写的SparkStreaming程序,总会出现部分Executor上请求broadcast不成功的错误,鉴于此,我专门走读了一下broadcast的相关代码,尝试找到原因
主要计算流程是,一个独立的线程在dirver端扫描HDFS,如果配置文件修改了,那就读入并创建broadcast,executor使用该broadcast处理接下来的流处理请求。类似于ip黑名单,但是黑名单是变化的,每隔一段时间需要生成新的广播变量。
1 Broadcast简介
broadcast-广播变量,常用于MapJoin及一些配置文件的全局传递,使用方式很简单:
val blackIp=Set(ip1,ip2...)
#sc.broadcast创建广播变量
val blackIpBC=sc.broadcast(blackIp)
# 广播变量.value在task内获取广播变量的实际内容
rdd.filter(row=>!blackIpBC.value.contains(row.ip))
1.1 广播变量的优势
为什么不直接使用blackIp,非要包装一层广播变量呢?
事实上,广播变量在使用的时候,是被拉取到Executor上的BlockManager中,只需要第一个task使用的时候拉取一次,之后其他task使用就可以复用blockManager中的变量,不需要重新拉取,也不需要在task中保存这个数据结构。
另外,广播变量在拉取的时候是基于Torrent协议的,即executor可以从其他executor上拉取该广播变量。如果不使用广播变量,那么所有请求都需要从driver进行,数据量大的时候,driver会表示很有压力。
说到Torrent协议,其实很多下载器以及媒体播放器都是基于Torrent的,比如经常能看到后台的迅雷或者腾讯视频、爱奇艺客户端在上传数据,实际上这个时候我们的电脑也相当于一个中间server,给其他用户传资源呢。如果你的电脑性能不行或网络比较烂,记得要手动限速一下。
2 广播变量的创建过程
2.1 driver端做了什么?
sc.braodcast(value) 在driver端做了哪些操作?能确保executor端能访问到这个变量呢?
2.1.1 SparkContext的broadcast方法
SparkContext.broadcast代码如下:
/**
* Broadcast a read-only variable to the cluster, returning a
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
* The variable will be sent to each cluster only once.
*
* @param value value to broadcast to the Spark nodes
* @return `Broadcast` object, a read-only variable cached on each machine
*/
def broadcast[T: ClassTag](value: T): Broadcast[T] = {
assertNotStopped()
require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
"Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
val callSite = getCallSite
logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
cleaner.foreach(_.registerBroadcastForCleanup(bc))
bc
}
- 首先是不允许RDD作为广播变量
- 调用BroadcastManger.newBroadcast创建广播变量
- getCallSite是注册相关堆栈信息,用于做跟踪,和具体逻辑没啥关系,不需要关注
- cleaner.foreach这个部分很重要,注册了一个需要回收的句柄
2.1.2 BroadcastManager类
BroadcastManager是在SparkEnv中初始化的
private[spark] class BroadcastManager(val isDriver: Boolean,conf: SparkConf,securityManager: SecurityManager)
extends Logging {
private var initialized = false
private var broadcastFactory: BroadcastFactory = null
initialize()
// Called by SparkContext or Executor before using Broadcast
private def initialize(): Unit = {synchronized {
if (!initialized) {
broadcastFactory = new TorrentBroadcastFactory
broadcastFactory.initialize(isDriver, conf, securityManager)
initialized = true
} }}
def stop(): Unit = {broadcastFactory.stop()}
private val nextBroadcastId = new AtomicLong(0)
private[broadcast] val cachedValues = Collections.synchronizedMap(new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK) .asInstanceOf[java.util.Map[Any, Any]])
def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
val bid = nextBroadcastId.getAndIncrement()
value_ match {
case pb: PythonBroadcast => pb.setBroadcastId(bid)
case _ => // do nothing
}
broadcastFactory.newBroadcast[T](value_, isLocal, bid)
}
def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit = {
broadcastFactory.unbroadcast(id, removeFromDriver, blocking)
}
}
构造方法
BroadcastManager在构造时有三个参数,分别是isDriver(是否为Driver节点)、conf(对应的SparkConf配置)、securityManager(对应的SecurityManager
属性成员
BroadcastManager内有四个属性成员:
- initialized表示BroadcastManager是否已经初始化完成。
- broadcastFactory持有广播工厂的实例(类型是BroadcastFactory特征的实现类)。
- nextBroadcastId表示下一个广播变量的唯一标识(AtomicLong类型的)。
- cachedValues用来缓存已广播出去的变量。它属于ReferenceMap类型,是apache-commons提供的一个弱引用映射数据结构。与我们常见的各种Map不同,它的键值对有可能会在GC过程中被回收。
对方提供的方法
提供了两方法,最终都是调用BroadcastFactory的同名方法。
newBroadcast方法:创建广播变量
unbroadcast方法:注销广播变量
实际上,BroadcastFactory是一个trait,只有TorrentBroadcastFactory一个实现类。
2.1.3 TorrentBroadcastFactory类
private[spark] class TorrentBroadcastFactory extends BroadcastFactory {
override def initialize(isDriver: Boolean, conf: SparkConf,
securityMgr: SecurityManager): Unit = { }
override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] = {
new TorrentBroadcast[T](value_, id)
}
override def stop(): Unit = { }
/**
* Remove all persisted state associated with the torrent broadcast with the given ID.
* @param removeFromDriver Whether to remove state from the driver.
* @param blocking Whether to block until unbroadcasted
*/
override def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit = {
TorrentBroadcast.unpersist(id, removeFromDriver, blocking)
}
}
创建广播变量就是初始化了一个TorrentBroadcast对象,并且isLocal这个变量是没有被使用的,它代表了--master 是否是local的选项;
卸载广播变量直接调用了TorrentBroadcast.unpersist方法;
stop什么都不做。
2.1.4 TorrentBroadcast
继承于Broadcast,这个类代码很多,我们分开来说,
属性成员
/**
* 这是个软连接,方便之后垃圾回收同步删除
*/
@transient private var _value: SoftReference[T] = _
@transient private var compressionCodec: Option[CompressionCodec] = _
@transient private var blockSize: Int = _
private def setConf(conf: SparkConf): Unit = {
compressionCodec = if (conf.get(config.BROADCAST_COMPRESS)) {
Some(CompressionCodec.createCodec(conf))
} else {
None
}
// Note: use getSizeAsKb (not bytes) to maintain compatibility if no units are provided
blockSize = conf.get(config.BROADCAST_BLOCKSIZE).toInt * 1024
checksumEnabled = conf.get(config.BROADCAST_CHECKSUM)
}
setConf(SparkEnv.get.conf)
private val broadcastId = BroadcastBlockId(id)
/** Total number of blocks this broadcast variable contains. */
private val numBlocks: Int = writeBlocks(obj)
/** Whether to generate checksum for blocks or not. */
private var checksumEnabled: Boolean = false
/** The checksum for all the blocks. */
private var checksums: Array[Int] = _
- _value:广播块的具体数据。调用readBroadcastBlock()方法获取数据进行数据拉取,在driver端,如果需要访问这个值,需要通过懒加载方式读取blockManager。另外_value是一个软连接,方便之后在GC同时进行回收
- compressionCodec:广播块的压缩编解码逻辑。当配置项spark.broadcast.compress为true时,会启用压缩。
- blockSize:广播块的大小。由spark.broadcast.blockSize配置项来控制,默认值4MB。
- broadcastId:广播变量的ID。BroadcastBlockId是个结构非常简单的case class,每产生一个新的广播变量就会自增。
- numBlocks:该广播变量包含的块数量。此在TorrentBroadcast构造时就会直接调用writeBlocks()方法。
- checksumEnabled:是否允许对广播块计算校验值,由spark.broadcast.checksum配置项控制,默认值true。
- checksums:广播块的校验值。
另外还调用了setConf方法进行部分变量的初始化;writeBlocks(obj)进行了实际的数据写入。
writeBlocks方法
private def writeBlocks(value: T): Int = {
import StorageLevel._
// Store a copy of the broadcast variable in the driver so that tasks run on the driver
// do not create a duplicate copy of the broadcast variable's value.
val blockManager = SparkEnv.get.blockManager
if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) {
throw new SparkException(s"Failed to store $broadcastId in BlockManager")
}
val blocks =
TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
if (checksumEnabled) {
checksums = new Array[Int](blocks.length)
}
blocks.zipWithIndex.foreach { case (block, i) =>
if (checksumEnabled) checksums(i) = calcChecksum(block)
val pieceId = BroadcastBlockId(id, "piece" + i)
val bytes = new ChunkedByteBuffer(block.duplicate())
if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) {
throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager")
}
}
blocks.length
}
- 调用blockManager.putSingle方法,将变量作为一个独立对象写入到BlockManager中,putSingle方法这里不做赘述,并且使用MEMORY_AND_DISK的方式,占用Storage部分内存,如果内存不足,会进行写磁盘操作。
- 调用blockifyObject()方法将广播数据转化为块,即Spark存储的基本单元。使用的序列化器为SparkEnv中初始化的JavaSerializer。
- 如果校验值开关有效,就用calcChecksum()方法为每个块计算校验值。
- 为广播数据切分成的每个块(称为piece)都生成一个带"piece"的广播ID,调用BlockManager.putBytes()方法将各个块以MEMORY_AND_DISK_SER模式序列化保存到BlockManager中。
- 最终返回块的计数值。
上述流程就是在driver端进行广播变量的创建过程,需要注意的是,广播变量被存储了两次,一次是Memory+Disk作为单个Java对象存储,一次是切分块后Memory+Disk并且序列化作为二进制存储。
2.2 executor端做了什么
在调用sc.broadcast之后,会返回一个Broadcast对象,之后在rdd算子内调用broadcast对象.value就可以拿到这个值,具体发生了什么呢
2.2.1 Broadcast的value方法
value方法调用了assertValid,先确保该broadcast还没有被卸载掉
@volatile private var _isValid = true
def value: T = {
assertValid()
getValue()
}
/** Check if this broadcast is valid. If not valid, exception is thrown. */
protected def assertValid(): Unit = {
if (!_isValid) throw new SparkException("Attempted to use %s after it was destroyed (%s) ".format(toString, _destroySite))
}
getValue是一个抽象方法,在TorrentBroadcast中做了具体实现。
2.2.2 TorrentBroadcast的getValue方法
override protected def getValue() = synchronized {
val memoized: T = if (_value == null) null.asInstanceOf[T] else _value.get
if (memoized != null) {
memoized
} else {
val newlyRead = readBroadcastBlock()
_value = new SoftReference[T](newlyRead)
newlyRead
}
}
如果_value不存在,则说明executor还没有读取过这个广播变量,那么调用readBroadcastBlock读取数据,同时为_value创建软连接,指向读取过来的广播变量。
2.2.3 TorrentBroadcast的readBroadcastBlock方法
private def readBroadcastBlock(): T = Utils.tryOrIOException {
TorrentBroadcast.torrentBroadcastLock.withLock(broadcastId) {
// As we only lock based on `broadcastId`, whenever using `broadcastCache`, we should only
// touch `broadcastId`.
val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse {
setConf(SparkEnv.get.conf)
val blockManager = SparkEnv.get.blockManager
blockManager.getLocalValues(broadcastId) match {
case Some(blockResult) =>
if (blockResult.data.hasNext) {
val x = blockResult.data.next().asInstanceOf[T]
releaseBlockManagerLock(broadcastId)
if (x != null) broadcastCache.put(broadcastId, x)
x
} else {
throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId")
}
case None =>
val estimatedTotalSize = Utils.bytesToString(numBlocks * blockSize)
logInfo(s"Started reading broadcast variable $id with $numBlocks pieces (estimated total size $estimatedTotalSize)")
val startTimeNs = System.nanoTime()
val blocks = readBlocks()
logInfo(s"Reading broadcast variable $id took ${Utils.getUsedTimeNs(startTimeNs)}")
try {
val obj = TorrentBroadcast.unBlockifyObject[T](
blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec)
// Store the merged copy in BlockManager so other tasks on this executor don't need to re-fetch it.
val storageLevel = StorageLevel.MEMORY_AND_DISK
if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
throw new SparkException(s"Failed to store $broadcastId in BlockManager")
}
if (obj != null)broadcastCache.put(broadcastId, obj)
obj
} finally {
blocks.foreach(_.dispose())
}
}
}
}
}
- 获取BlockManager实例,调用其getLocalValues()方法将之前写入的广播数据对象取出。
- 如果能够直接取得广播数据,就调用releaseBlockManagerLock()方法【实际上对应BlockManager.releaseLock(),又对应Object.notifyAll()】解开当前块的锁。这个锁用来保证块读写的互斥性。
- 如果不能直接取得广播数据,说明数据都已经序列化,并且有可能不在本地存储。此时调用readBlocks()方法从本地和远端同时获取块,然后调用unBlockifyObject()方法将块转换回广播数据的对象。
- 再次调用BlockManager.putSingle()方法将广播数据作为单个对象写入本地存储,再将其加入广播缓存Map中,下次读取时就不用大费周章了。
2.2.4 TorrentBroadcast的readBlocks方法
private def readBlocks(): Array[BlockData] = {
val blocks = new Array[BlockData](numBlocks)
val bm = SparkEnv.get.blockManager
for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
val pieceId = BroadcastBlockId(id, "piece" + pid)
logDebug(s"Reading piece $pieceId of $broadcastId")
bm.getLocalBytes(pieceId) match {
case Some(block) =>
blocks(pid) = block
releaseBlockManagerLock(pieceId)
case None =>
bm.getRemoteBytes(pieceId) match {
case Some(b) =>
if (checksumEnabled) {
val sum = calcChecksum(b.chunks(0))
if (sum != checksums(pid)) {
throw new SparkException(s"corrupt remote block $pieceId of $broadcastId:" +
s" $sum != ${checksums(pid)}")
}
}
// We found the block from remote executors/driver's BlockManager, so put the block
// in this executor's BlockManager.
if (!bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) {
throw new SparkException(
s"Failed to store $pieceId of $broadcastId in local BlockManager")
}
blocks(pid) = new ByteBufferBlockData(b, true)
case None =>
throw new SparkException(s"Failed to get $pieceId of $broadcastId")
}
}
}
blocks
}
该方法会首先对所有广播数据的piece进行打散,然后对打散之后的每个piece执行以下步骤:
- 调用BlockManager.getLocalBytes()方法,从本地获取序列化的广播数据块。将获取到的块放入对应下标的位置,并释放该块的锁。
- 如果本地没有广播数据,就调用BlockManager.getRemoteBytes()方法从远端(其他Executor或者Driver)获取广播数据块。
- 对远程获取的块计算校验值,并与之前写入时计算的校验值比对。如果不同,说明传输发生错误,抛异常出去。
- 若一切正常,调用BlockManager.putBytes()方法,将各个块写入MemoryStore(内存)或DiskStore(磁盘),并将其放入对应下标的位置。最终返回所有读取的块。
3 广播变量的清理
广播变量什么时候可以做清理呢?driver端和executor端的清理机制有什么不一样的地方呢?
3.1 主动清理
通过调用upersist方法即可手动清理
广播变量.unpersist() #只产出executor上的广播变量
广播变量.doDestroy() #同时删除driver和executor的广播变量
注意,目前unpersist的具体实现在TorrentBroadcast中,只能清理掉executor端的广播变量。
如果想清理掉driver端的广播变量,需要调用doDestroy方法。
3.1.1 Broadcast的unpersist方法
在Broadcast类中,有两个重载的unpersist方法,blocking代表是否在unpersist中加锁,直到unpersist完成,相当于异步执行还是同步执行,默认blocking是false,相当于异步。
def unpersist(): Unit = {
unpersist(blocking = false)
}
def unpersist(blocking: Boolean): Unit = {
assertValid()
doUnpersist(blocking)
}
protected def doUnpersist(blocking: Boolean): Unit
最终调用了doUnpersist方法,是一个抽象方法,目前只有TorrentBroadcast中有具体实现。
3.1.2 TorrentBrodcast的doUnpersist方法
override protected def doUnpersist(blocking: Boolean): Unit = {
TorrentBroadcast.unpersist(id, removeFromDriver = false, blocking)
}
override protected def doDestroy(blocking: Boolean): Unit = {
TorrentBroadcast.unpersist(id, removeFromDriver = true, blocking)
}
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit = {
logDebug(s"Unpersisting TorrentBroadcast $id")
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
}
doUnpersist方法用于清理executor端的广播变量,doDestroy方法用于清理driver端和executor端的广播变量,他们都调用了unpersist方法,unpersist方法是实际做清理的部分,它有三个参数:
- id :广播变量id
- removeFromDriver :是否清理掉driver端的广播变量
- blocking : 是否采用同步机制加锁清理
进一步调用了BlockManagerMaster类(SparkEnv.get.blockManager.master)的removeBroadcast方法。
3.1.3 BlockManagerMaster的removeBroadcast方法
具体的做法就是driver端先向BlockManagerMaster(在Driver端)发送一条rpc请求,请求删除指定broadcast的消息,BlockManagerMaster再向所有BlockManagerSlave(在Executor端)发送删除broadcast的请求,中间一共有两次RPC请求,。
/** Remove all blocks belonging to the given broadcast. */
def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean): Unit = {
val future = driverEndpoint.askSync[Future[Seq[Int]]](
RemoveBroadcast(broadcastId, removeFromMaster))
future.failed.foreach(e =>
logWarning(s"Failed to remove broadcast $broadcastId" +
s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e)
)(ThreadUtils.sameThread)
if (blocking) {
timeout.awaitResult(future)
}
}
如果blocking=true,那么就需要等待这个请求被所有executor处理完,才能返回。
具体消息总线的事件传输机制这里不深入讲解,最终这条rpc请求会传送到BlockManagerMasterEndpoint,被receiveAndReply方法处理。
3.1.4 BlockManagerMasterEndpoint端对removeBroadcast的处理
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
...
case RemoveBroadcast(broadcastId, removeFromDriver) => context.reply(removeBroadcast(broadcastId, removeFromDriver))
... }
进一步调用BlockManagerMasterEndpoint类的removeBroadcast方法,
private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = {
val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver)
val requiredBlockManagers = blockManagerInfo.values.filter { info =>
removeFromDriver || !info.blockManagerId.isDriver
}
val futures = requiredBlockManagers.map { bm =>
bm.slaveEndpoint.ask[Int](removeMsg).recover {
case e: IOException =>
logWarning(s"Error trying to remove broadcast $broadcastId from block manager " +
s"${bm.blockManagerId}", e)
0 // zero blocks were removed
}
}.toSeq
Future.sequence(futures)
}
首先会将删除broadcast的请求再封装发送给所有的BlockManagerSlaveEndpointr。具体操作流程是
- 封装一个RemoveBroadcast的case类
- 过滤出所有非driver的BlockManager,如果removeFromDriver为true,那么driver的BlockManger会被保留
- 向所有BlockManagerEndpoint发送removeBroadcast的RPC请求。
3.1.5 Executor端对删除Broadcast的操作
BlockManagerSlaveEndpoint的receiveAndReply方法
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
...
case RemoveBroadcast(broadcastId, _) =>doAsync[Int]("removing broadcast " + broadcastId, context) {blockManager.removeBroadcast(broadcastId, tellMaster = true)
...
}
executor端接收到removeBroadcast的请求后,会尝试调用BlockManager.removeBroadcast方法
3.1.6 BlockManager的removeBroadcast方法
这是最终的操作了,具体就是遍历BlockManager上所有的BlockId,如果是属于该Broadcast,则调用removeBlock方法删除具体block块,最终返回删除掉block块的数量。
def removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int = {
logDebug(s"Removing broadcast $broadcastId")
val blocksToRemove = blockInfoManager.entries.map(_._1).collect {
case bid @ BroadcastBlockId(`broadcastId`, _) => bid
}
blocksToRemove.foreach { blockId => removeBlock(blockId, tellMaster) }
blocksToRemove.size
}
removeBlock也是BlockManager的方法:
def removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit = {
logDebug(s"Removing block $blockId")
blockInfoManager.lockForWriting(blockId) match {
case None =>
// The block has already been removed; do nothing.
logWarning(s"Asked to remove block $blockId, which does not exist")
case Some(info) =>
removeBlockInternal(blockId, tellMaster = tellMaster && info.tellMaster)
addUpdatedBlockStatusToTaskMetrics(blockId, BlockStatus.empty)
}
}
具体流程是:
- 获取block的元数据信息,这个过程要对block加上写锁,防止其他线程同时修改数据。
- 如果获取到了元数据,那么调用removeBlockInternal将其删除
- 更新block的相关Metrics信息。
BlockManager的removeBlockInternal就不详细介绍了,具体就是尝试删除内存及磁盘的Block数据,最后删除掉BlockInfoManager(保存了该BlockMNager中所有的block信息)的该广播变量的blockId。
3.1.7 总结
这就是手动调用广播变量删除的方法了,doDestroy会删除driver及executor的广播变量,而unpersist只会删除executor上的广播变量。
中间涉及到两次RPC请求,分别是driver向同在driver上的BlockManagerMaster发送请求,以及BlockManagerMaster向BlockManagerSlave发送请求。
最终的删除是调用BlockManager的removeBlock来删除的,其实Spark中不管是RDD、Shuffle数据、最中都是以Block的形式做管理的,整体代码逻辑非常清晰,如果想深入理解Spark的设计,一定要把BlockManager这块搞清楚。
3.2 自动清理:用于存储MapOutputStatus
上文讲到的doDestroy可以在用户代码中显式调用。除此之外,它还被自己的destroy()方法做了调用,而它又被MapOutputTracker类的invalidateSerializedMapOutputStatusCache做了调用,进行driver及executor所有的广播变量删除。
def invalidateSerializedMapOutputStatusCache(): Unit = withWriteLock {
if (cachedSerializedBroadcast != null) {
// Prevent errors during broadcast cleanup from crashing the DAGScheduler (see SPARK-21444)
Utils.tryLogNonFatalError {
// Use `blocking = false` so that this operation doesn't hang while trying to send cleanup
// RPCs to dead executors.
cachedSerializedBroadcast.destroy()
}
cachedSerializedBroadcast = null
}
cachedSerializedMapStatus = null
}
cachedSerializedBroadcast的是一个存储了二进制数组的广播变量,如果它!=null,那么就回触发destroy的清理。
事实上,这个方法在是在Shuffle完成之后才进行调用的,这个广播变量存储的是Map端已经完成的Task的id、Shuffle数据存放位置等信息。用于传输给driver和下游的reduce端进行数据拉取和任务调度等操作。
说白了,Shuffle这部分的广播变量是我们用于自己触碰不到的,只要知道,shuffle的MaoOutputStatus相关信息是用广播来发送的即可。
3.3 自动清理:ContextCleaner
这个机制是默认开启的,可以自动回收变为弱引用的RDD、Shuffle、广播变量、累加器和checkpoint,Spark会创建一个定时线程,每隔一定时间,就调用System.gc()来回收变为弱引用的5种数据类型。默认30min,使用如下参数进行设置:
spark.cleaner.periodicGC.interval #默认30min
如果日志中出现了Spark Context Cleaner,那么证明Spark已经自动开启了clean操作。
开头[2.1.1 SparkContext的broadcast方法]部分代码中,有一行:
cleaner.foreach(_.registerBroadcastForCleanup(bc))
用cleaner来注册了一个广播变量的cleanup,注意这里cleaner不是什么数组或者链表来才调用foreach方法,而是一个Option[ContextCleaner],没想到吧,Option也能调用foreach方法,如果cleaner为None,那么就跳过了registerBroadcastForCleanup方法。
3.3.1 ContextCleaner的广播变量注册
/** Register a Broadcast for cleanup when it is garbage collected. */
def registerBroadcastForCleanup[T](broadcast: Broadcast[T]): Unit = {
registerForCleanup(broadcast, CleanBroadcast(broadcast.id))
}
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = {
referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue))
}
private class CleanupTaskWeakReference( val task: CleanupTaskreferent: AnyRef, referenceQueue: ReferenceQueue[AnyRef])
extends WeakReference(referent, referenceQueue)
简单来说,就是封装了一个CleanupTaskWeakReference弱引用类,与引用队列referenceQueue联合使用,并添加到referenceBuffer中(ConcurrentHashMap),referenceBuffer主要作用保存CleanupTaskWeakReference弱引用,确保在引用队列没处理前,弱引用不会被垃圾回收。当这个广播变量在可达性分析中变成弱引用时,就可以进行回收了。
3.3.2 ContextCleaner的清理
每隔30min,会调用keepCleaning方法,如果广播变量已经被引用队列处理了,就可以调用doCleanupBroadcast进行清理。
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
while (!stopped) {
try {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
// Synchronize here to avoid being interrupted on stop()
synchronized {
reference.foreach { ref =>
logDebug("Got cleaning task " + ref.task)
referenceBuffer.remove(ref)
ref.task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
case CleanCheckpoint(rddId) =>
doCleanCheckpoint(rddId)
}
}
}
} catch {
case ie: InterruptedException if stopped => // ignore
case e: Exception => logError("Error in cleaning thread", e)
}
}
}
从referenceQueue取出所有变需要清理的弱引用,将其一一删除。
doCleanupBroadcast调用了BroadcastManager的unbroadcast方法:
override def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit = {
TorrentBroadcast.unpersist(id, removeFromDriver, blocking)
}
最终又调用了unpersist方法,注意这里removeFromDriver是=true的,相当于调用destroy方法,将广播变量从driver以及executor全部删除。
3.4 自动清理:软引用
这种方式也是通过GC来清除软引用的方式,只是清理实际的广播变量内对应的对象,而广播变量依旧在BlockManager中,如果之后需要再使用该value,可以从BlockManger中重新读取广播变量对应的数据。
在调用broadcast.value时,会进一步调用TorrentBroadcast的getValue方法:
override protected def getValue() = synchronized {
val memoized: T = if (_value == null) null.asInstanceOf[T] else _value.get
if (memoized != null) {
memoized
} else {
val newlyRead = readBroadcastBlock()
_value = new SoftReference[T](newlyRead)
newlyRead
}
}
如果是第一次访问,那么会走else块的代码,为_value创建一个软引用。软引用在遇到GC且内存不足的时候会被删除。
那么就是说,如果广播变量在一个executor中被访问过,且遇到一次内存不足导致的GC时,就会删除该对象。
在driver和executor端都是如此,但并不会影响到已经存储在BlockManger中的广播变量数据。
所以,这里还可以引入一个优化点,在使用广播变量的时候,一个partition尽量只调用一次.value方法:
rdd.mapPartition(iter=>{
val blackIps=blackIpsBC.value
iter.filter(t=>!blackIps.contains(t.ip))
})
这种做法,可以跳过每条数据都需要做广播变量是否存在的判断,是比较好的编码习惯。
收工!