概述
本文介绍spark中Broadcast Variables的实现原理。
基本概念
在spark中广播变量属于共享变量的一种,spark对共享变量的介绍如下:
通常,当在远程集群节点上执行传递给Spark操作(例如map或reduce)的函数时,它将在函数中使用的所有变量的单独副本上工作。这些变量将复制到每台计算机,而且远程机器上的变量的更新不会同步给驱动程序(driver)端。这种情况下,跨任务读写共享变量效率低下。但是,Spark确实为两种常见的使用模式提供了两种有限类型的共享变量:广播变量和累加器。
spark的共享变量有两种:
广播变量(broadcast variables)
累加器(accumulators)
注意: 每个广播变量和累加器只能在一个上下文中(context)写入(分别是驱动程序(driver)或工作程序(worker)),而在另一个上下文中(context)读取。
广播变量可以在driver程序中写入,在executor端读取。
累加器在executors中写入,而在驱动程序(driver端)读取。
广播变量(Broadcast Variables)介绍
Spark将值传递给Spark executor一次,并且当多次使用广播变量时,任务可以共享它而不会导致重复的网络传输。
广播变量为我们提供了一种方法,可以在驱动程序(driver端)上获取本地值,并将只读副本分发给每台机器(worker),而不是为每个任务(task)发送新副本。广播变量似乎不是特别有用,因为我们可以在闭包中捕获局部变量,以便将数据从驱动程序传输到worker; 但是,每台机器只发送一个副本而不是每个任务发送一个副本可以节省大量成本,特别是在相同的广播变量用于其他转换时。 使用广播变量的两个常见示例是:
广播需要join的的小表。
广播机器学习模型以便能够对我们的数据进行预测。
通过在SparkContext上调用broadcast来创建广播变量。 这会将值分配给worker并为我们提供一个包装器(wrapper),允许我们通过调用value来访问worker上的值。如果使用变量输入创建广播变量,则在创建变量后不应修改输入,因为现有worker将看不到更新,新的worker才可能会看到新的值。
另外要注意:广播变量的值必须是本地的可序列化的值:而不是RDD或其他分布式数据结构。
Broadcast的实现
大致的实现如下图所示:
broadcast()函数
该函数在SparkContext中进行定义,函数原型如下:
def broadcast[T: ClassTag](value: T): Broadcast[T]
1
在SparkContext中需要调用broadcast函数来创建一个广播变量,并返回一个org.apache.spark.broadcast.Broadcast对象这样可以在分布式函数中来读取广播变量的值。该变量会被发送到spark集群的每个执行的节点上。
注意:该广播变量一旦创建,将不可修改,因为即使修改了该变量的值,也无法让spark集群的执行节点看到改变后的新值。
broadcast()函数的实现流程如下:
判断需要广播的变量是否是分布式变量,若是则终止函数,报告“不能广播分布式变量”的错误。
通过通过BroadcastManager的newBroadcast函数来创建广播变量,并返回一个Broadcast对象,这里其实是TorrentBroadcast类的对象
注册broadcast的cleanup函数,可以用来清除不再使用的broadcast变量。
最后,返回新创建的对象
注意:不能对分布式变量,比如:rdd,进行广播。
在类SparkContext中,broadcast函数的实现代码如下:
def broadcast[T: ClassTag](value: T): Broadcast[T] = {
assertNotStopped()
// 不能直接广播rdd等分布式变量
require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
"Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
// 通过BroadcastManager工具类来创建一个BroadcastFactory对象
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
val callSite = getCallSite
logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
cleaner.foreach(_.registerBroadcastForCleanup(bc))
// 返回Broadcast对象,这里其实是TorrentBroadcast类的对象
bc
}
BroadcastManager
该类是一个辅助类,用来统一创建broadcast对外的接口。该类的构造函数流程如下:
定义了两个私有化变量,并且会为每个广播变量生成一个唯一的id,在创建broadcast变量时会通过nextBroadcastId.getAndIncrement()进行自增,并调用initialize()函数进行初始化:
// 是否已经初始
private var initialized = false
private var broadcastFactory: BroadcastFactory = null
initialize()
// 生成广播变量的id,该id是唯一的,这里先初始化,会在创建broadcast变量时进行自增操作
private val nextBroadcastId = new AtomicLong(0)
initialize()函数的实现逻辑如下:
(1)初始化broadcastFactory变量,这里创建了TorrentBroadcastFactory对象
(2)调用TorrentBroadcastFactory的initialize函数来初始化。在实际的代码中,该类的initialize函数什么都不做。
(3)把initialized设置为true,同一个对象只初始化一次
// Called by SparkContext or Executor before using Broadcast
private def initialize() {
synchronized { // 加锁
if (!initialized) {
// 初始化broadcastFactory变量,这里创建了TorrentBroadcastFactory对象
broadcastFactory = new TorrentBroadcastFactory
// 调用TorrentBroadcastFactory的initialize函数来初始化
broadcastFactory.initialize(isDriver, conf, securityManager)
// 把initialized设置为true,同一个对象只初始化一次
initialized = true
}
}
}
从以上分析可以看到,当创建广播变量时,实际上是调用的TorrentBroadcastFactory类的newBroadcast函数来进行创建。
TorrentBroadcastFactory工厂类
该类实现了一个类似于BitTorrent的协议,通过该协议把广播数据分发到各个executor中。这些操作其实是在类TorrentBroadcast中实现。
该类的代码相对简单,如下:
private[spark] class TorrentBroadcastFactory extends BroadcastFactory {
override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) { }
// 调用创建一个TorrentBroadcast对象
override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] = {
new TorrentBroadcast[T](value_, id)
}
override def stop() { }
/**
* Remove all persisted state associated with the torrent broadcast with the given ID.
* @param removeFromDriver Whether to remove state from the driver.
* @param blocking Whether to block until unbroadcasted
*/
// 删除广播变量
override def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
TorrentBroadcast.unpersist(id, removeFromDriver, blocking)
}
}
TorrentBroadcast类
介绍
真正实现广播变量的操作是在TorrentBroadcast类中实现的。该类实现了以下的机制:
驱动程序(driver)将序列化对象分成小块,并将这些块存储在驱动程序(driver)的BlockManager中。
在每个executor上,executor首先尝试从其BlockManager中获取对象。 若它不存在,则远程从driver或其他executor(如果可用)中获取对象块。 一旦获得块,它就会将块放在自己的BlockManager中,为其他executor来获取数据做好准备。
通过这种方式,可以防止driver成为发送多个广播数据副本的瓶颈(每个executor一个)。
代码实现分析
该类的构造过程如下:
通过readBroadcastBlock函数来从新构造广播对象,该函数会先从driver或其他executors中读取数据块。在driver端,若需要value值,它会直接从本地的block manager中读取数据。readBroadcastBlock函数的实现逻辑如下:
从SparkEnv.get.broadcastManager.cachedValues从来获取对应broadcastId的数据块值:broadcastCache.get(broadcastId)
从blockManager中获取对应id的广播变量的值:blockManager.getLocalValues(broadcastId)
若从blockManager中获取到了该变量的值,则:
若不能从blockManager中获取值,则调用readBlocks函数来读取数据块。该函数会从driver或其他的executors中读取该变量的数据。该函数会调用blockManager中的getLocalBytes函数来获取远端executor中的数据块。
设置配置信息:setConf(SparkEnv.get.conf)
初始化广播变量的唯一id值:private val broadcastId = BroadcastBlockId(id)
调用writeBlocks把广播变量划分成多个块,并保存到blockManager中。
广播变量的生命周期
Broadcast类是一个抽象类,它是TorrentBroadcast的父类。在该抽象类中,定义了一些常规的操作主要,包括以下一些操作:
destroy函数
该函数最终会调用实体类:TorrentBroadcast类中的unpersist方法。该方法会从master的blockManager中删除该广播变量。
最后,会调用doDestroy方法(广播实现应该提供)。
unpersist()函数
该函数的实现如下:
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit = {
logDebug(s"Unpersisting TorrentBroadcast $id")
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
}
该函数会调用blockManagerMaster的removeBroadcast函数来删除在executor上属于该broadcast变量的所有数据块。
实现过程是:从driver端发送一个RemoveBroadcast消息。
destory()函数
该函数和unpersist()函数的实现类似,不过该函数还会把广播变量从driver端删除。
总结
本文分析了spark中广播变量的实现原理。