Spark Core源码精读计划#11:Spark广播机制的实现

目录

前言

在RPC的领域里摸爬滚打了很长时间,是时候抽身出来看一看其他东西了。顺着SparkEnv初始化的思路继续看,下一个主要组件是广播管理器BroadcastManager。本文就主要讲解Spark中广播机制的实现。

广播变量是Spark两种共享变量中的一种(另一种是累加器)。它适合处理多节点跨Stage的共享数据,特别是输入数据量较大的集合,可以提高效率。

广播管理器BroadcastManager

BroadcastManager在SparkEnv中是直接初始化的,其代码逻辑也很短,如下。

代码#11.1 - o.a.s.broadcast.BroadcastManager类

private[spark] class BroadcastManager(
    val isDriver: Boolean,
    conf: SparkConf,
    securityManager: SecurityManager)
  extends Logging {
  private var initialized = false
  private var broadcastFactory: BroadcastFactory = null

  initialize()

  private def initialize() {
    synchronized {
      if (!initialized) {
        broadcastFactory = new TorrentBroadcastFactory
        broadcastFactory.initialize(isDriver, conf, securityManager)
        initialized = true
      }
    }
  }

  def stop() {
    broadcastFactory.stop()
  }

  private val nextBroadcastId = new AtomicLong(0)

  private[broadcast] val cachedValues = {
    new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
  }

  def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
    broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
  }

  def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
    broadcastFactory.unbroadcast(id, removeFromDriver, blocking)
  }
}

构造方法参数

BroadcastManager在构造时有三个参数,分别是isDriver(是否为Driver节点)、conf(对应的SparkConf配置)、securityManager(对应的SecurityManager)。非常简单,不再赘述。

属性成员

BroadcastManager内有四个属性成员:

  • initialized表示BroadcastManager是否已经初始化完成。
  • broadcastFactory持有广播工厂的实例(类型是BroadcastFactory特征的实现类)。
  • nextBroadcastId表示下一个广播变量的唯一标识(AtomicLong类型的)。
  • cachedValues用来缓存已广播出去的变量。它属于ReferenceMap类型,是apache-commons提供的一个弱引用映射数据结构。与我们常见的各种Map不同,它的键值对有可能会在GC过程中被回收。

初始化逻辑

initialize()方法做的事情也非常简单,它首先判断BroadcastManager是否已初始化。如果未初始化,就新建广播工厂TorrentBroadcastFactory,将其初始化,然后将初始化标记设为true。

对外提供的方法

BroadcastManager提供的方法有两个:newBroadcast()方法,用于创建一个新的广播变量;以及unbroadcast()方法,将已存在的广播变量取消广播。它们都是简单地调用了TorrentBroadcastFactory中的同名方法,因此我们必须通过阅读TorrentBroadcastFactory的相关源码,才能了解Spark广播机制的细节。

广播变量TorrentBroadcast

来看TorrentBroadcastFactory.newBroadcast()方法。

代码#11.2 - o.a.s.broadcast.TorrentBroadcastFactory.newBroadcast()方法

  override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] = {
    new TorrentBroadcast[T](value_, id)
  }

可见只是简单地(真的很简单吗?)创建了一个TorrentBroadcast对象实例,它就是前面一直在说的“广播变量”的庐山真面目。下面我们来仔细研究它。

属性成员及参数初始化

这个类中的属性不算少哦。

代码#11.3 - o.a.s.broadcast.TorrentBroadcast类的属性成员

  @transient private lazy val _value: T = readBroadcastBlock()
  @transient private var compressionCodec: Option[CompressionCodec] = _
  @transient private var blockSize: Int = _

  private val broadcastId = BroadcastBlockId(id)
  private val numBlocks: Int = writeBlocks(obj)
  private var checksumEnabled: Boolean = false
  private var checksums: Array[Int] = _

  private def setConf(conf: SparkConf) {
    compressionCodec = if (conf.getBoolean("spark.broadcast.compress", true)) {
      Some(CompressionCodec.createCodec(conf))
    } else {
      None
    }
    blockSize = conf.getSizeAsKb("spark.broadcast.blockSize", "4m").toInt * 1024
    checksumEnabled = conf.getBoolean("spark.broadcast.checksum", true)
  }
  setConf(SparkEnv.get.conf)
  • _value:广播块的具体数据。注意它由lazy关键字定义,因此是懒加载的,也就是在TorrentBroadcast构造时不会调用readBroadcastBlock()方法获取数据,而会推迟到第一次访问_value时。
  • compressionCodec:广播块的压缩编解码逻辑。当配置项spark.broadcast.compress为true时,会启用压缩。
  • blockSize:广播块的大小。由spark.broadcast.blockSize配置项来控制,默认值4MB。
  • broadcastId:广播变量的ID。BroadcastBlockId是个结构非常简单的case class,每产生一个新的广播变量就会自增。
  • numBlocks:该广播变量包含的块数量。它与_value不同,并没有lazy关键字定义,因此在TorrentBroadcast构造时就会直接调用writeBlocks()方法。
  • checksumEnabled:是否允许对广播块计算校验值,由spark.broadcast.checksum配置项控制,默认值true。
  • checksums:广播块的校验值。

广播变量的写入

上面已经提到在TorrentBroadcast构造时会直接调用writeBlocks()方法,来看一看它的代码。

代码#11.4 - o.a.s.broadcast.TorrentBroadcast.writeBlocks()方法

  private def writeBlocks(value: T): Int = {
    import StorageLevel._

    val blockManager = SparkEnv.get.blockManager
    if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) {
      throw new SparkException(s"Failed to store $broadcastId in BlockManager")
    }
    val blocks =
      TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
    if (checksumEnabled) {
      checksums = new Array[Int](blocks.length)
    }
    blocks.zipWithIndex.foreach { case (block, i) =>
      if (checksumEnabled) {
        checksums(i) = calcChecksum(block)
      }
      val pieceId = BroadcastBlockId(id, "piece" + i)
      val bytes = new ChunkedByteBuffer(block.duplicate())
      if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) {
        throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager")
      }
    }
    blocks.length
  }

这个方法中涉及到了块管理器BlockManager,它是Spark存储子系统中的基础组件,我们现在暂时不考虑它,后面还会对它进行十分详尽的分析。writeBlocks()方法的执行逻辑如下:

  1. 获取BlockManager实例,调用其putSingle()方法将广播数据作为单个对象写入本地存储。注意StorageLevel为MEMORY_AND_DISK,亦即在内存不足时会溢写到磁盘,且副本数为1,不会进行复制。
  2. 调用blockifyObject()方法将广播数据转化为块,即Spark存储的基本单元。使用的序列化器为SparkEnv中初始化的JavaSerializer。
  3. 如果校验值开关有效,就用calcChecksum()方法为每个块计算校验值。
  4. 为广播数据切分成的每个块(称为piece)都生成一个带"piece"的广播ID,调用BlockManager.putBytes()方法将各个块写入MemoryStore(内存)或DiskStore(磁盘)。StorageLevel为MEMORY_AND_DISK_SER,写入的数据会序列化。
  5. 最终返回块的计数值。

上面提到的blockifyObject()、calcChecksum()方法的实现都比较简单,就不再赘述。

广播变量的读取

先来看readBroadcastBlock()方法。

代码#11.5 - o.a.s.broadcast.TorrentBroadcast.readBroadcastBlock()方法

  private def readBroadcastBlock(): T = Utils.tryOrIOException {
    TorrentBroadcast.synchronized {
      val broadcastCache = SparkEnv.get.broadcastManager.cachedValues

      Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse {
        setConf(SparkEnv.get.conf)
        val blockManager = SparkEnv.get.blockManager
        blockManager.getLocalValues(broadcastId) match {
          case Some(blockResult) =>
            if (blockResult.data.hasNext) {
              val x = blockResult.data.next().asInstanceOf[T]
              releaseLock(broadcastId)
              if (x != null) {
                broadcastCache.put(broadcastId, x)
              }
              x
            } else {
              throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId")
            }
          case None =>
            logInfo("Started reading broadcast variable " + id)
            val startTimeMs = System.currentTimeMillis()
            val blocks = readBlocks()
            logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs))

            try {
              val obj = TorrentBroadcast.unBlockifyObject[T](
                blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec)
              val storageLevel = StorageLevel.MEMORY_AND_DISK
              if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
                throw new SparkException(s"Failed to store $broadcastId in BlockManager")
              }
              if (obj != null) {
                broadcastCache.put(broadcastId, obj)
              }
              obj
            } finally {
              blocks.foreach(_.dispose())
            }
        }
      }
    }
  }

其执行逻辑如下:

  1. 获取BlockManager实例,调用其getLocalValues()方法将之前写入的广播数据对象取出。
  2. 如果能够直接取得广播数据,就调用releaseLock()方法【实际上对应BlockManager.releaseLock(),又对应Object.notifyAll()】解开当前块的锁。这个锁用来保证块读写的互斥性。
  3. 如果不能直接取得广播数据,说明数据都已经序列化,并且有可能不在本地存储。此时调用readBlocks()方法从本地和远端同时获取块,然后调用unBlockifyObject()方法将块转换回广播数据的对象。
  4. 再次调用BlockManager.putSingle()方法将广播数据作为单个对象写入本地存储,再将其加入广播缓存Map中,下次读取时就不用大费周章了。

readBlocks()方法的具体实现如下所示。

代码#11.6 - o.a.s.broadcast.TorrentBroadcast.readBlocks()方法

  private def readBlocks(): Array[BlockData] = {
    val blocks = new Array[BlockData](numBlocks)
    val bm = SparkEnv.get.blockManager

    for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
      val pieceId = BroadcastBlockId(id, "piece" + pid)
      logDebug(s"Reading piece $pieceId of $broadcastId")
      bm.getLocalBytes(pieceId) match {
        case Some(block) =>
          blocks(pid) = block
          releaseLock(pieceId)
        case None =>
          bm.getRemoteBytes(pieceId) match {
            case Some(b) =>
              if (checksumEnabled) {
                val sum = calcChecksum(b.chunks(0))
                if (sum != checksums(pid)) {
                  throw new SparkException(s"corrupt remote block $pieceId of $broadcastId:" +
                    s" $sum != ${checksums(pid)}")
                }
              }
              if (!bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) {
                throw new SparkException(
                  s"Failed to store $pieceId of $broadcastId in local BlockManager")
              }
              blocks(pid) = new ByteBufferBlockData(b, true)
            case None =>
              throw new SparkException(s"Failed to get $pieceId of $broadcastId")
          }
      }
    }
    blocks
  }

该方法会首先对所有广播数据的piece进行打散,然后对打散之后的每个piece执行以下步骤:

  1. 调用BlockManager.getLocalBytes()方法,从本地获取序列化的广播数据块。将获取到的块放入对应下标的位置,并释放该块的锁。
  2. 如果本地没有广播数据,就调用BlockManager.getRemoteBytes()方法从远端(其他Executor或者Driver)获取广播数据块。
  3. 对远程获取的块计算校验值,并与之前写入时计算的校验值比对。如果不同,说明传输发生错误,抛异常出去。
  4. 若一切正常,调用BlockManager.putBytes()方法,将各个块写入MemoryStore(内存)或DiskStore(磁盘),并将其放入对应下标的位置。最终返回所有读取的块。

广播变量读取的流程图描述

上面单单通过文字叙述可能会令人费解,因此下面画一个标准的Flow chart来描述它的过程。


图#11.1 - 广播数据的读取流程

总结

本文从广播管理器BroadcastManager的初始化入手,揭示了广播变量的本质——TorrentBroadcast,并通过引入块管理器BlockManager的相关知识,详细分析了广播数据的写入和读取流程。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容