注:本文转自我的个人博客(Spark - 利用WeakReference来清理对象)。
最近在stackoverflow上看到有人好奇Spark是在什么时机对Accumulator或者Broadcast这样的变量进行回收的。自己在看源码的时候发现了这个有趣的地方。
Spark ContextCleaner
我们在Spark闲置(没有任务执行)时,容易看到下面的日志:
19/02/12 05:19:51 INFO Spark Context Cleaner org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54): Cleaned accumulator 108284023
19/02/12 05:19:51 INFO Spark Context Cleaner org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54): Cleaned accumulator 108283658
追溯到源码中,可以得知SparkContext在启动时,初始化了ContextCleaner,并以daemon方式启动了一个cleaningThread线程,这个线程的作用就是不断循环,回收清理RDD、Broadcast变量、Accumulator等无效对象。
这个时候可以提出一个问题:以Accumulator为例,当某个Accumulator不再使用(没有被任何对象引用)时,ContextCleaner是如何知道这个信息的?
先看一下整个清理过程:
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
// Synchronize here to avoid being interrupted on stop()
synchronized {
reference.foreach { ref =>
logDebug("Got cleaning task " + ref.task)
referenceBuffer.remove(ref)
ref.task match {
case CleanRDD(rddId) =>
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =>
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
case CleanBroadcast(broadcastId) =>
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
case CleanCheckpoint(rddId) =>
doCleanCheckpoint(rddId)
}
}
}
可以看到,ContextCleaner是通过一个referenceQueue找到了需要回收的对象(CleanAccum)。接下来,从referenceQueue入手,看看JVM中的WeakReference是什么样的存在。
Java WeakReference
Java中对Reference有几种不同的分类:
- StrongReference: 通常我们定义的对象就属于这种,较难被GC。
- WeakReference: 如Spark中封装的CleanupTaskWeakReference(task, objectForCleanup, referenceQueue) ,如果引用的对象(task)只和当前的WeakReference对象联结,那么在GC中会被回收,并放入referenceQueue中。
- SoftReference: 相对WeakReference较强的引用,可以回收,但不一定是在下次GC中。
所以在ContextCleaner中,Spark采用了WeakReference + referenceQueue的方式来实现对象的回收。当我们注册一个Accumulator时,会同时调用registerForCleanup:
/** Register an object for cleanup. */
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = {
referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue))
}
referenceBuffer的作用是保证WeakReference在处理前不被GC。
Spark将注册的Accumulator封装到CleanupTask,并基于task初始化了一个WeakReference。当Accumulator不再被引用时,task会被放入referenceQueue中,而此时cleaningThread从referenceQueue中提取即将要GC的对象做处理(见上面的清理过程代码)。