spark BlockManager

我们从这个入口开始分析,task对rdd开始处理。

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
/**
* 如果StorageLevel不为NONE,
说明RDD之前被我们持久化过了,就不需要重新计算了,
 尝试使用cacheManager,去获取持久化的数据。
*
* */
if (storageLevel != StorageLevel.NONE) {
// 从缓存中获取
  SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
  // 进行rdd的计算或者从checkPoit获取。
  computeOrReadCheckpoint(split, context)
}
}

SparkEnv.get.cacheManager.getOrCompute 方法追踪

// 从缓存读取
def getOrCompute[T](
  rdd: RDD[T],
  partition: Partition,
  context: TaskContext,
  storageLevel: StorageLevel): Iterator[T] = {
 // 由rdd的id和partion的组成唯一key,形式为"rddId_partionIndex",即blockId
val key = RDDBlockId(rdd.id, partition.index)
logDebug(s"Looking for partition $key")
// 如果能够通过BlockManager获取到数据的话,那么就直接返回了。
blockManager.get(key) match {
  case Some(blockResult) =>
    // Partition is already materialized, so just return its values
    val existingMetrics = context.taskMetrics
      .getInputMetricsForReadMethod(blockResult.readMethod)
    existingMetrics.incBytesRead(blockResult.bytes)

    val iter = blockResult.data.asInstanceOf[Iterator[T]]
    new InterruptibleIterator[T](context, iter) {
      override def next(): T = {
        existingMetrics.incRecordsRead(1)
        delegate.next()
      }
    }
    //虽然RDD已经持久化过,
   //但是有可能一些原因会获取不到数据,数据既不在本地内存
  // 或磁盘,也不在 remote BlockManger的
    // 本地磁盘
  case None =>
    /**acquireLockForPartition 这个方法中还是在尝试通过 
 *调用BlockManger的get方法去获取数据,如果获取到了,那么就直接返回
      * */
    val storedValues = acquireLockForPartition[T](key)
    if (storedValues.isDefined) {
      return new InterruptibleIterator[T](context, storedValues.get)
    }

    try {
      logInfo(s"Partition $key not found, computing it")
      /**如果rdd之前checkpoint过,那么尝试读取它的checkpoint,但是如果没有checkpoint过,那么此时只能
        *执行算子,重新计算。
        * */
      val computedValues = rdd.computeOrReadCheckpoint(partition, context)

      if (context.isRunningLocally) {
        return computedValues
      }


      val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
      /**因为我们在获取数据的时候走的CacheManager,这说明之前肯定是有持久化级别的,被cache过,
        * 只是由于某些原因,持久化的数据没有找到,
        * 所以读取checkpoint数据,或者重新计算数据之后,再用putInBlockManager方法,将数据在blockmanager中
        * 持久化一份。
        * */
      val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
      val metrics = context.taskMetrics
      val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
      metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
      new InterruptibleIterator(context, cachedValues)

    } finally {
     // 释放锁
      loading.synchronized {
        loading.remove(key)
        loading.notifyAll()
      }
    }
  }
}

BlockManager get方法

/**
* 通过BlockManager获取数据的入口方法
* 获取的时候优先从本地获取,如果本地没有,那么从远程获取
*
* */
 def get(blockId: BlockId): Option[BlockResult] = {
 // 从本地获取 
  val local = getLocal(blockId) 
if (local.isDefined) {
  logInfo(s"Found block $blockId locally")
  return local
}
// 从远端拉取
val remote = getRemote(blockId)
if (remote.isDefined) {
  logInfo(s"Found block $blockId remotely")
  return remote
}
None
}

再追踪下getLocal方法,可以看到起内部调用了doGetLocal方法

 private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): 
 Option[Any] = {

//每个blockManager,维护一个map blockInfo
/*
*  这里维护的blockInfo,可以用于代表一个block
*  然后blockInfo最大的作用,就是说用于作为多线程并发访问同一个Block的同步监视器
*  保证安全
* */
// 首先尝试获取block对应的blockInfo的锁
val info = blockInfo.get(blockId).orNull
if (info != null) {
 /*
 *  对所有的blockInfo都会进行多线程并发访问的同步操作
*  所以BlockInfo,相当于一个block,用于作为多线程并发访问的同步监视器
 * */
  info.synchronized {

    // 再次确认block是否存在。
    if (blockInfo.get(blockId).isEmpty) {
      logWarning(s"Block $blockId had been removed")
      return None
    }
    // 如果其他线程在操作这个block
    // 那么其实会卡住 ,获取blockInfo的排他锁
    //如果始终没有获取到,返回false,那么就直接返回
    if (!info.waitForReady()) {
      // If we get here, the block write failed.
      logWarning(s"Block $blockId was marked as failure.")
      return None
    }

    val level = info.level
    logDebug(s"Level for block $blockId is $level")

  //        持久化级别使用了内存
  //        尝试从memoryStore中获取数据
    if (level.useMemory) {
      logDebug(s"Getting block $blockId from memory")
      val result = if (asBlockResult) {
        // 从缓存中通过blockManager反序列化的数据返回
        memoryStore.getValues(blockId).map(new BlockResult(_, DataReadMethod.Memory, info.size))
      } else {
        // 获得序列化后的数据
        memoryStore.getBytes(blockId)
      }
      result match {
        case Some(values) =>
          return result
        case None =>
          logDebug(s"Block $blockId not found in memory")
      }
    }
    if (level.useOffHeap) {
      logDebug(s"Getting block $blockId from ExternalBlockStore")
      if (externalBlockStore.contains(blockId)) {
        val result = if (asBlockResult) {
          externalBlockStore.getValues(blockId)
            .map(new BlockResult(_, DataReadMethod.Memory, info.size))
        } else {
          externalBlockStore.getBytes(blockId)
        }
        result match {
          case Some(values) =>
            return result
          case None =>
            logDebug(s"Block $blockId not found in ExternalBlockStore")
        }
      }
    }
    // 尝试将磁盘上的数据缓存至内存
    if (level.useDisk) {
      logDebug(s"Getting block $blockId from disk")
      val bytes: ByteBuffer = if (diskStore.contains(blockId)) {
        // DiskStore.getBytes() always returns Some, so this .get() is guaranteed to be safe
        // 通过java的nio进行文件读取数据
        diskStore.getBytes(blockId).get
      } else {
        // Remove the missing block so that its unavailability is reported to the driver
        // 删除丢失的block
        removeBlock(blockId)
        throw new BlockException(
          blockId, s"Block $blockId not found on disk, though it should be")
      }
      assert(0 == bytes.position())

      if (!level.useMemory) {
        //没有使用内存级别的话,直接将数据返回
        if (asBlockResult) {
          return Some(new BlockResult(dataDeserialize(blockId, bytes), DataReadMethod.Disk,
            info.size))
        } else {
          return Some(bytes)
        }
      } else {
        /*
        * 如果使用了Disk级别,也使用了memory级别
        * 那么从disk读取出来之后,其实会尝试将其放入memoryStore中,也就是缓存到内存中
        * */
        if (!level.deserialized || !asBlockResult){
          memoryStore.putBytes(blockId, bytes.limit, () => {
            val copyForMemory = ByteBuffer.allocate(bytes.limit)
            copyForMemory.put(bytes)
          })
          bytes.rewind()
        }
        if (!asBlockResult) {
          return Some(bytes)
        } else {
          val values = dataDeserialize(blockId, bytes)
          if (level.deserialized) {
            val putResult = memoryStore.putIterator(
              blockId, values, level, returnValues = true, allowPersistToDisk = false)

            putResult.data match {
              case Left(it) =>
                return Some(new BlockResult(it, DataReadMethod.Disk, info.size))
              case _ =>

                throw new SparkException("Memory store did not return an iterator!")
            }
          } else {
            return Some(new BlockResult(values, DataReadMethod.Disk, info.size))
          }
        }
      }
    }
  }
} else {
  logDebug(s"Block $blockId not registered locally")
}
None
}

getRemote 方法 内部调用了 doGetRemote方法:

 // 从远端获取数据
private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
require(blockId != null, "BlockId is null")
/*
* 首先BlcokMangerMaster上,获取每个BlockId对应的BlockManger的信息
* 然后随机打乱
* */
val locations = Random.shuffle(master.getLocations(blockId))
var numFetchFailures = 0
// 遍历每一个blockManager
for (loc <- locations) {
  logDebug(s"Getting remote block $blockId from $loc")
  val data = try {
    // 使用BlockTransferService 进行异步的远程网络获取,将block数据传输回来
    // 连接的时候,使用Blockmanager的唯一的标识,host,port,executorId
    blockTransferService.fetchBlockSync(
      loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
  } catch {
    case NonFatal(e) =>
      numFetchFailures += 1
  // 如果拉取的次数和副本数相等,抛出异常。
      if (numFetchFailures == locations.size) {
        // An exception is thrown while fetching this block from all locations
        logWarning(s"Failed to fetch block from" +
          s" ${locations.size} locations. Most recent failure cause:", e)
        return None
      } else {
        // This location failed, so we retry fetch from a different one by returning null here
        logWarning(s"Failed to fetch remote block $blockId " +
          s"from $loc (failed attempt $numFetchFailures)", e)
        null
      }
  }

  if (data != null) {
    if (asBlockResult) {
      return Some(new BlockResult(
        dataDeserialize(blockId, data),
        DataReadMethod.Network,
        data.limit()))
    } else {
      return Some(data)
    }
  }
  logDebug(s"The value of block $blockId is null")
}
logDebug(s"Block $blockId not found")
None
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352

推荐阅读更多精彩内容