当一个 Driver 启动后,会在很多节点上运行 Task,有一些Task 是相同的,即具有相同的 function,只是处理的数据不一样。
如果 Task 运行处理数据时,会产生一些数据量比较大的变量,或者这些数据从 Hbase 等获取,如果此变量在很多其他节点上都需要使用,那么每个Task 都会产生这样一个变量,会大大占用内存,这时 可以通过 Broadcast 优化。
Broadcast 机制 和使用
我们可以把上面提到的变量 broadcast 出去,这样每个 Executor 持有一份 broadcasted data,每个 Executor 中的若干个 Task 共用这一份 data。
例子:
val data = List(1, 2, 3, 4, 5, 6)
val bdata = sc.broadcast(data)
val rdd = sc.parallelize(1 to 6, 2)
val observedSizes = rdd.map(_ => bdata.value.size)
问题:为什么只能 broadcast 只读的变量?
这就涉及一致性的问题,如果变量可以被更新,那么一旦变量被某个节点更新,其他节点要不要一块更新?如果多个节点同时在更新,更新顺序是什么?怎么做同步?还会涉及 fault-tolerance 的问题。为了避免维护数据一致性问题,Spark 目前只支持 broadcast 只读变量。
问题:broadcast 到节点而不是 broadcast 到每个 task?
因为每个 task 是一个线程,而且同在一个进程运行 tasks 都属于同一个 application。因此每个节点(executor)上放一份就可以被所有 task 共享。
Executor 获取 broadcast 的方式
最开始的时候数据放在 Driver 的本地文件系统中,Driver 在本地会创建一个文件夹来存放 Broadcast 中的 data,然后启动 HttpServer 来访问文件夹中的数据,同时写入到BlockManager (Storage Level 是MEMORY_AND_DISK) 中获得 BlockId (BroadcastBlockId) ,当第一次 Executor 中的 Task 要访问 Broadcast 变量的时候,会向 Driver 通过 HttpServer 来访问数据, 然后会在 Executor 中的 Broadcast 中注册该 Broadcast 中的数据,这样后续需要的 Task 访问的 Broadcast 的变量的时候会首先查询BlockManager 中有没有该数据,如果有就直接使用;
-
BroadcastManager 是用来管理 Broadcast,该实例是在 SparkContext 创建 SparkEnv 的时候创建的。
通过 HttpBroadcast 方式获取
HttpBroadcast 就是每个 executor 通过的 http 协议连接 driver 并从 driver 那里 获取 data。
HttpBroadcast 最大的问题就是 存在单点故障, 和 网络IO性能问题。
因为 worker 上的 executor 都会去 driver 那里取数据。
通过 TorrentBroadcast 方式获取
假设有 executor fetch 到了一些 data blocks,那么这个 executor 就可以被当作 data server 了,随着 fetch 的 executor 越来越多,有更多的 data server 加入,data 就很快能传播到全部的 executor 那里去了。
分担了 Driver 的压力