spark中的广播变量broadcast

Spark中的Broadcast处理
首先先来看一看broadcast的使用代码:

val values = ListInt

val broadcastValues = sparkContext.broadcast(values)

rdd.mapPartitions(iter => {

broadcastValues.getValue.foreach(println)

})

在上面的代码中,首先生成了一个集合变量,把这个变量通过sparkContext的broadcast函数进行广播,

最后在rdd的每一个partition的迭代时,使用这个广播变量.

接下来看看广播变量的生成与数据的读取实现部分:

def broadcast[T: ClassTag](value: T): Broadcast[T] = {
assertNotStopped()
if (classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass)) {

这里要注意,使用broadcast时,不能直接对RDD进行broadcast的操作.
// This is a warning instead of an exception in order to avoid breaking

// user programs that
// might have created RDD broadcast variables but not used them:
logWarning("Can not directly broadcast RDDs; instead, call collect() and "
+ "broadcast the result (see SPARK-5063)")
}

通过broadcastManager中的newBroadcast函数来进行广播.
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
val callSite = getCallSite
logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
cleaner.foreach(_.registerBroadcastForCleanup(bc))
bc
}

在BroadcastManager中生成广播变量的函数,这个函数直接使用的broadcastFactory的对应函数.

broadcastFactory的实例通过配置spark.broadcast.factory,

 默认是TorrentBroadcastFactory.

def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
broadcastFactory.newBroadcast[T](value_, isLocal,

   nextBroadcastId.getAndIncrement())

}

在TorrentBroadcastFactory中生成广播变量的函数:

在这里面,直接生成了一个TorrentBroadcast的实例.

override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long)

: Broadcast[T] = {
new TorrentBroadcast[T](value_, id)
}

TorrentBroadcast实例生成时的处理流程:

这里主要的代码部分是直接写入这个要广播的变量,返回的值是这个变量所占用的block的个数.

Broadcast的block的大小通过spark.broadcast.blockSize配置.默认是4MB,

Broadcast的压缩是否通过spark.broadcast.compress配置,默认是true表示启用,默认情况下使用snappy的压缩.

private val broadcastId = BroadcastBlockId(id)
/** Total number of blocks this broadcast variable contains. */
private val numBlocks: Int = writeBlocks(obj)

接下来生成一个lazy的属性,这个属性只有在具体的使用时,才会执行,在实例生成时不执行(上面的示例中的getValue.foreach时执行).

@transient private lazy val _value: T = readBroadcastBlock()

override protected def getValue() = {
_value
}

看看实例生成时的writeBlocks的函数:

private def writeBlocks(value: T): Int = {

这里先把这个广播变量保存一份到当前的task的storage中,这样做是保证在读取时,如果要使用这个广播变量的task就是本地的task时,直接从blockManager中本地读取.
SparkEnv.get.blockManager.putSingle(broadcastId, value,

StorageLevel.MEMORY_AND_DISK,
tellMaster = false)

这里根据block的设置大小,对value进行序列化/压缩分块,每一个块的大小为blocksize的大小,
val blocks =
TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer,

compressionCodec)

这里把序列化并压缩分块后的blocks进行迭代,存储到blockManager中,
blocks.zipWithIndex.foreach { case (block, i) =>
SparkEnv.get.blockManager.putBytes(
BroadcastBlockId(id, "piece" + i),
block,
StorageLevel.MEMORY_AND_DISK_SER,
tellMaster = true)
}

这个函数的返回值是一个int类型的值,这个值就是序列化压缩存储后block的个数.
blocks.length
}

在我们的示例中,使用getValue时,会执行实例初始化时定义的lazy的函数readBroadcastBlock:

private def readBroadcastBlock(): T = Utils.tryOrIOException {
TorrentBroadcast.synchronized {
setConf(SparkEnv.get.conf)

这里先从local端的blockmanager中直接读取storage中对应此广播变量的内容,如果能读取到,表示这个广播变量已经读取过来或者说这个task就是广播的本地executor.
SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next()) match {
case Some(x) =>
x.asInstanceOf[T]

下面这部分执行时,表示这个广播变量在当前的executor中是第一次读取,通过readBlocks函数去读取这个广播变量的所有的blocks,反序列化后,直接把这个广播变量存储到本地的blockManager中,下次读取时,就可以直接从本地进行读取.
case None =>
logInfo("Started reading broadcast variable " + id)
val startTimeMs = System.currentTimeMillis()
val blocks = readBlocks()
logInfo("Reading broadcast variable " + id + " took" +

          Utils.getUsedTimeMs(startTimeMs))

    val obj = TorrentBroadcast.unBlockifyObject[T](
      blocks, SparkEnv.get.serializer, compressionCodec)
    // Store the merged copy in BlockManager so other tasks on this executor don't
    // need to re-fetch it.
    SparkEnv.get.blockManager.putSingle(
      broadcastId, obj, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
    obj
}

}
}

最后再看看readBlocks函数的处理流程:

private def readBlocks(): Array[ByteBuffer] = {

这里定义的变量用于存储读取到的block的信息,numBlocks是广播变量序列化后所占用的block的个数.
val blocks = new ArrayByteBuffer
val bm = SparkEnv.get.blockManager

这里开始迭代读取每一个block的内容,这里的读取是先从local中进行读取,如果local中没有读取到数据时,通过blockManager读取远端的数据,通过读取这个block对应的location从这个location去读取这个block的内容,并存储到本地的blockManager中.最后,这个函数返回读取到的blocks的集合.
for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
val pieceId = BroadcastBlockId(id, "piece" + pid)
logDebug(s"Reading piece pieceId ofbroadcastId")

def getLocal: Option[ByteBuffer] = bm.getLocalBytes(pieceId)
def getRemote: Option[ByteBuffer] = bm.getRemoteBytes(pieceId).map { block =>
  SparkEnv.get.blockManager.putBytes(
    pieceId,
    block,
    StorageLevel.MEMORY_AND_DISK_SER,
    tellMaster = true)
  block
}
val block: ByteBuffer = getLocal.orElse(getRemote).getOrElse(
  throw new SparkException(s"Failed to get $pieceId of $broadcastId"))
blocks(pid) = block

}
blocks
}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容