Spark Core源码精读计划#29:BlockManager主从及RPC逻辑

目录

前言

通过前面几篇文章的讲解,我们就把Spark Core存储体系中的内存存储和磁盘存储逻辑基本上讲完了,而负责将这些组件统一管理并发挥作用的就是BlockManager,那么从本文开始,我们就来逐渐探索它的细节……

No,还不急,本文还是来看先于BlockManager初始化的组件,即BlockManagerMaster。顾名思义,它是负责管理各个BlockManager的。之前提到过一句,BlockManager是典型的主从架构设计,不管Driver还是Executor上都要有BlockManager实例,那么必然就得存在一个协调组件——Spark中就是BlockManagerMaster了。

既然BlockManager散落在不同的节点上,它们之间如何互通有无?当然就是借助很久之前讲过的RPC环境了。所以,如果看官对RPC端点RpcEndpoint、RPC端点引用RpcEndpointRef这些概念已经感到生疏了的话,就回去翻一翻吧。

初始化BlockManagerMaster与RPC端点

BlockManagerMaster在SparkEnv中创建,对应的代码如下。

代码#29.1 - o.a.s.SparkEnv.create()方法中创建BlockManagerMaster

    val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
      BlockManagerMaster.DRIVER_ENDPOINT_NAME,
      new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
      conf, isDriver)

    def registerOrLookupEndpoint(
        name: String, endpointCreator: => RpcEndpoint):
      RpcEndpointRef = {
      if (isDriver) {
        logInfo("Registering " + name)
        rpcEnv.setupEndpoint(name, endpointCreator)
      } else {
        RpcUtils.makeDriverRef(name, conf, rpcEnv)
      }
    }

由这一小段代码可以看出,BlockManagerMaster初始化时会接受一个RpcEndpoint作为参数,该RPC端点的类型为BlockManagerMasterEndpoint。如果当前节点是Driver所在节点,就调用RpcEnv.setupEndpoint()方法注册此RPC端点到RPC环境中。反之,如果当前节点是Executor所在节点,就调用RpcUtils.makeDriverRef()方法【进而调用的是RpcEnv.setupEndpointRef()方法】创建对Driver中BlockManagerMasterEndpoint的引用。

既然有了“主”节点持有的BlockManagerMasterEndpoint,那么“从”节点如果不持有一个RPC端点的话,仍然无法进行通信,因此相对地也会初始化名外BlockManagerSlaveEndpoint的组件。它的初始化则位于BlockManager的代码里,下一篇文章会看到,现在就不着急了。

接下来我们看BlockManagerMasterEndpoint的实现。

主RPC端点BlockManagerMasterEndpoint

构造方法与属性成员

代码#29.2 - o.a.s.storage.BlockManagerMasterEndpoint的构造方法与属性成员

private[spark]
class BlockManagerMasterEndpoint(
    override val rpcEnv: RpcEnv,
    val isLocal: Boolean,
    conf: SparkConf,
    listenerBus: LiveListenerBus)
  extends ThreadSafeRpcEndpoint with Logging {
  private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]

  private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]

  private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]

  private val askThreadPool = ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool")
  private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool)

  private val topologyMapper = {
    val topologyMapperClassName = conf.get(
      "spark.storage.replication.topologyMapper", classOf[DefaultTopologyMapper].getName)
    val clazz = Utils.classForName(topologyMapperClassName)
    val mapper =
      clazz.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[TopologyMapper]
    logInfo(s"Using $topologyMapperClassName for getting topology information")
    mapper
  }

  // ......
}

可见,该RPC端点需要RPC环境、SparkConf和事件总线的支持。下面将属性逐一解说一下:

  • blockManagerInfo:维护BlockManager的ID与其信息的映射关系。BlockManagerId类是对Driver/Executor ID、节点地址、端口等信息的简单封装,而BlockManagerInfo类则是定义在BlockManagerMasterEndpoint下方的私有类,维护BlockManager的一些基本信息,如ID、最后一次通信时间、块列表、堆内/堆外内存大小等。这两个类后面也会简略看一下。
  • blockManagerIdByExecutor:维护Executor ID与BlockManager ID的映射关系。
  • blockLocations:维护块ID与持有对应块的BlockManager ID的映射关系。
  • askThreadPool/askExecutionContext:目前没有实际的用途,从名称推测看,是用来处理RPC请求的线程池及其对应的ExecutionContext。
  • topologyMapper:通过反射创建的TopologyMapper类实例,用来记录节点对应的拓扑信息。默认的DefaultTopologyMapper是空实现,另外还有FileBasedTopologyMapper可以通过文件指定拓扑。它可能是方便今后来做机架感知等功能的,目前(2.3.3版本)仍然没有具体的用途。

接受并回复RPC消息

这个自然是通过覆写RpcEndpoint.receiveAndReply()方法来实现。它的方法体比较长(RPC消息类型很多)。

代码#29.3 - o.a.s.storage.BlockManagerMasterEndpoint.receiveAndReply()方法

  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint) =>
      context.reply(register(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
    case _updateBlockInfo @
        UpdateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size) =>
      context.reply(updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size))
      listenerBus.post(SparkListenerBlockUpdated(BlockUpdatedInfo(_updateBlockInfo)))
    case GetLocations(blockId) =>
      context.reply(getLocations(blockId))
    case GetLocationsAndStatus(blockId) =>
      context.reply(getLocationsAndStatus(blockId))
    case GetLocationsMultipleBlockIds(blockIds) =>
      context.reply(getLocationsMultipleBlockIds(blockIds))
    case GetPeers(blockManagerId) =>
      context.reply(getPeers(blockManagerId))
    case GetExecutorEndpointRef(executorId) =>
      context.reply(getExecutorEndpointRef(executorId))
    case GetMemoryStatus =>
      context.reply(memoryStatus)
    case GetStorageStatus =>
      context.reply(storageStatus)
    case GetBlockStatus(blockId, askSlaves) =>
      context.reply(blockStatus(blockId, askSlaves))
    case GetMatchingBlockIds(filter, askSlaves) =>
      context.reply(getMatchingBlockIds(filter, askSlaves))
    case RemoveRdd(rddId) =>
      context.reply(removeRdd(rddId))
    case RemoveShuffle(shuffleId) =>
      context.reply(removeShuffle(shuffleId))
    case RemoveBroadcast(broadcastId, removeFromDriver) =>
      context.reply(removeBroadcast(broadcastId, removeFromDriver))
    case RemoveBlock(blockId) =>
      removeBlockFromWorkers(blockId)
      context.reply(true)
    case RemoveExecutor(execId) =>
      removeExecutor(execId)
      context.reply(true)
    case StopBlockManagerMaster =>
      context.reply(true)
      stop()
    case BlockManagerHeartbeat(blockManagerId) =>
      context.reply(heartbeatReceived(blockManagerId))
    case HasCachedBlocks(executorId) =>
      blockManagerIdByExecutor.get(executorId) match {
        case Some(bm) =>
          if (blockManagerInfo.contains(bm)) {
            val bmInfo = blockManagerInfo(bm)
            context.reply(bmInfo.cachedBlocks.nonEmpty)
          } else {
            context.reply(false)
          }
        case None => context.reply(false)
      }
  }

BlockManager RPC消息的类型统一在对象BlockManagerMessages中来定义,并且它们的名称可以自解释,这里就不再专门列出源码了。下面挑选两个处理方法作为例子来看看是怎样处理的。

例:处理BlockManager注册

代码#29.4 - o.a.s.storage.BlockManagerMasterEndpoint.register()方法

  private def register(
      idWithoutTopologyInfo: BlockManagerId,
      maxOnHeapMemSize: Long,
      maxOffHeapMemSize: Long,
      slaveEndpoint: RpcEndpointRef): BlockManagerId = {
    val id = BlockManagerId(
      idWithoutTopologyInfo.executorId,
      idWithoutTopologyInfo.host,
      idWithoutTopologyInfo.port,
      topologyMapper.getTopologyForHost(idWithoutTopologyInfo.host))

    val time = System.currentTimeMillis()
    if (!blockManagerInfo.contains(id)) {
      blockManagerIdByExecutor.get(id.executorId) match {
        case Some(oldId) =>
          logError("Got two different block manager registrations on same executor - "
              + s" will replace old one $oldId with new one $id")
          removeExecutor(id.executorId)
        case None =>
      }
      logInfo("Registering block manager %s with %s RAM, %s".format(
        id.hostPort, Utils.bytesToString(maxOnHeapMemSize + maxOffHeapMemSize), id))

      blockManagerIdByExecutor(id.executorId) = id

      blockManagerInfo(id) = new BlockManagerInfo(
        id, System.currentTimeMillis(), maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint)
    }
    listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxOnHeapMemSize + maxOffHeapMemSize,
        Some(maxOnHeapMemSize), Some(maxOffHeapMemSize)))
    id
  }

该方法的执行流程如下:

  1. 构造BlockManagerId实例。
  2. 如果BlockManagerInfo中没有维护这个BlockManagerId,但是却存在与它对应的Executor,那么就移除该Executor(认为它已经死掉了)。
  3. 将新的BlockManagerId和BlockManagerInfo放入对应的映射中。
  4. 向事件总线发送SparkListenerBlockManagerAdded信息,飙戏BlockManager注册成功,并最终返回它的ID。

例:处理BlockManager心跳

代码#29.5 - o.a.s.storage.BlockManagerMasterEndpoint.heartbeatReceived()方法

  private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = {
    if (!blockManagerInfo.contains(blockManagerId)) {
      blockManagerId.isDriver && !isLocal
    } else {
      blockManagerInfo(blockManagerId).updateLastSeenMs()
      true
    }
  }

这个方法就比较简单。如果blockManagerInfo中包含有对应的ID,就更新BlockManagerInfo中对应的最后一次心跳时间lastSeenMs。接下来看BlockManagerSlaveEndpoint。

从RPC端点BlockManagerSlaveEndpoint

BlockManagerSlaveEndpoint的实现比上面那位要简单不少,但总体逻辑大同小异。构造BlockManagerSlaveEndpoint时需要传入对本节点上BlockManager的引用,这是很自然的,否则就没办法将RPC消息与对块的操作打通了。下面直接看它覆写的receiveAndReply()方法。

代码#29.6 - o.a.s.storage.BlockManagerSlaveEndpoint.receiveAndReply()方法

  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case RemoveBlock(blockId) =>
      doAsync[Boolean]("removing block " + blockId, context) {
        blockManager.removeBlock(blockId)
        true
      }
    case RemoveRdd(rddId) =>
      doAsync[Int]("removing RDD " + rddId, context) {
        blockManager.removeRdd(rddId)
      }
    case RemoveShuffle(shuffleId) =>
      doAsync[Boolean]("removing shuffle " + shuffleId, context) {
        if (mapOutputTracker != null) {
          mapOutputTracker.unregisterShuffle(shuffleId)
        }
        SparkEnv.get.shuffleManager.unregisterShuffle(shuffleId)
      }
    case RemoveBroadcast(broadcastId, _) =>
      doAsync[Int]("removing broadcast " + broadcastId, context) {
        blockManager.removeBroadcast(broadcastId, tellMaster = true)
      }
    case GetBlockStatus(blockId, _) =>
      context.reply(blockManager.getStatus(blockId))
    case GetMatchingBlockIds(filter, _) =>
      context.reply(blockManager.getMatchingBlockIds(filter))
    case TriggerThreadDump =>
      context.reply(Utils.getThreadDump())
    case ReplicateBlock(blockId, replicas, maxReplicas) =>
      context.reply(blockManager.replicateBlock(blockId, replicas.toSet, maxReplicas))
  }

其中有一部分消息是同步处理的,其他的是异步处理的(因为上面的四个Remove消息对应的操作耗时都相对较长)。由于所有的动作都对应到BlockManager的方法调用,所以我们在讲解BlockManager时,再来看这部分的具体实现。

BlockManagerMaster

对主从RPC端点有了一定了解之后,就可以真正来看BlockManagerMaster是做什么的了。它的实现实际上比我们想象的简单太多,仅仅是对所有RPC消息代理了BlockManagerMasterEndpoint的EndpointRef.ask()/askSync()方法,向RPC端点发送与BlockManager相关的各类消息。由于消息类型很多,所以只看3个有代表性的。

代码#29.7 - o.a.s.storage.BlockManagerMaster.removeExecutor()/removeExecutorAsync()/registerBlockManager()方法

  def removeExecutor(execId: String) {
    tell(RemoveExecutor(execId))
    logInfo("Removed " + execId + " successfully in removeExecutor")
  }

  def removeExecutorAsync(execId: String) {
    driverEndpoint.ask[Boolean](RemoveExecutor(execId))
    logInfo("Removal of executor " + execId + " requested")
  }

  def registerBlockManager(
      blockManagerId: BlockManagerId,
      maxOnHeapMemSize: Long,
      maxOffHeapMemSize: Long,
      slaveEndpoint: RpcEndpointRef): BlockManagerId = {
    logInfo(s"Registering BlockManager $blockManagerId")
    val updatedId = driverEndpoint.askSync[BlockManagerId](
      RegisterBlockManager(blockManagerId, maxOnHeapMemSize, maxOffHeapMemSize, slaveEndpoint))
    logInfo(s"Registered BlockManager $updatedId")
    updatedId
  }

其中,driverEndpoint就是BlockManagerMasterEndpoint的端点引用,slaveEndpoint就是BlockManagerSlaveEndpoint的端点引用。这些方法的实现都大同小异,因此也就不再废话了。

总结

一张图总结,如下。

图29.1 - BlockManagerMaster与RPC端点间的关系

由本文的分析可见,BlockManagerMaster的名字有些许误导性:实际上在每个节点都会有一个BlockManagerMaster,而不是Driver上有BlockManagerMaster,Executor上有BlockManagerSlave(当然它是不存在的)。BlockManager的主从则是靠RPC端点体系来体现的。之所以叫这个名字,可能是为了避免出现“块管理器管理器”(BlockManagerManager)这样更奇怪的名字吧。

晚安。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352

推荐阅读更多精彩内容