背景与实现概述
众所周知,GPU作为通用的加速硬件,被越来越广泛的用于图形图像处理、深度学习、高性能计算等领域, 效果显著。
ML/DL领域,Tensorflow, PyTorch 等深度学习框架大行其道,而 Spark 提供的 GraphX 和 MlLib 可以做一些机器学习的东西,但是在深度学习的战场里没有什么优势,最大的问题就在于硬件加速上,3.0 以前的社区版 Spark 没有调度 GPU 的方法。
训练模型除了本身的大规模的并行密集计算,从数据到模型,必须有数据处理的过程,这也是 Spark 的强项,因为你不太可能用 pandas 简单清洗汇总你的训练数据,而且做AI业务的企业往往已经部署有一套大数据平台环境。
这既是机遇也是挑战!
社区SPIP
Accelerator-aware task scheduling for Spark:SPARK-24615
SPIP:Spark Project Improvement Proposal,Spark项目改进方案
2018年,Hadoop3.1 YARN已经支持GPU调度。
Apache Spark支持的资源管理器 YARN 和 Kubernetes 已经支持了 GPU。为了让 Spark 也支持 GPU,在技术层面上需要做出两个主要改变:
- 在
cluster manager
层面上,需要升级 cluster managers 来支持 GPU。并且给用户提供相关 API,使得用户可以控制 GPU 资源的使用和分配。 - 在 Spark 内部,需要在
scheduler
层面做出修改,使得 scheduler 可以在用户 task 请求中识别 GPU 的需求,然后根据 executor 上的 GPU 供给来完成分配。
因为让 Apache Spark 支持 GPU 是一个比较大的特性,所以项目分为了几个阶段。在 Apache Spark 3.0 版本,将支持在 standalone、 YARN 以及 Kubernetes 资源管理器下支持 GPU,并且对现有正常的作业基本没影响。对于 TPU 的支持、Mesos 资源管理器中 GPU 的支持、以及 Windows 平台的 GPU 支持将不是这个版本的目标。而且对于一张 GPU 卡内的细粒度调度也不会在这个版本支持(指spark3.0)。
2020年6月,Spark3.0.0发布开始支持GPU调度。
Spark Scheduling
在这个层面,我们得允许从 RDD/PandasUDF API 中指定资源请求,这些请求应该在 DAGScheduler 中汇总。TaskSetManager 管理每个 Stage 挂起(pending)的任务,对于那些有 GPU 请求的任务,我们需要处理;对于那些不需要 GPU 的作业,其调度行为和效率应该和之前保持一致。
目前,CPUS_PER_TASK(spark.task.cpus)是一个 int 类型的全局配置,用于指定每个 task 应分配的 cores。为了支持 GPU 的配置,引入了 spark.task.gpus 参数用于指定每个 task 需要申请的 GPU 数。如果用户没有指定 spark.task.cpus 或 spark.task.gpus,那么 Spark 程序将使用默认的值;因为需要向后兼容,所以如果用户没指定spark.task.cpus
或 spark.task.gpus
,这两个参数的默认值分别为 1
和 空。
对于 ExecutorBackend ,需要使得它可以识别和管理 GPU ,并且把这些信息同步(比如修改现有的 RegisterExecutor 类)到 SchedulerBackend,然后 SchedulerBackend 可以根据这些 GPU 信息,为那些需要 GPU 资源的 task 进行资源分配。
Resource Manager
第一阶段将在 Standalone、YARN 以及 Kubernetes 上支持 GPU。Spark 需要在这三种资源管理上面做一些工作。
- Standalone
Standalone 是 Spark 内置的资源管理模式,但是之前的 Standalone 部署模式并不能支持 GPU 等资源。为了能识别 GPU 信息,一种可行的方法是在配置文件里面对 GPU 资源进行配置, Worker 通过读取这些配置信息,并在内存结构里面维护 GPU 和 CPU 等可用资源等信息。同时,在 Master 上通过 allocateWorkerResourceToExecutors
方法对 Executors 申请的资源(包括 GPU)进行分配。
- YARN
为了能够在 YARN 上支持 GPU,我们需要使用 YARN 3.1.2+ 版本;同时我们需要在 YARN 集群上做出相关配置,使得 YARN 启动了对 GPU 资源的支持,关于如何在 YARN 上配置 GPU 资源,请参见这里。
当为 Executors 申请 YARN 容器时,Spark 需要在 YARN 容器请求中将 executor 所需的 GPU 数量映射到 yarn.io/gpu 资源中。YARN 具有 GPU 隔离机制,所以无论是否使用 Docker 容器, 对未分配给 YARN 容器的 GPU 资源的使用将会被阻止。
需要注意的是,截至目前 YARN 仅支持 Nvidia GPU。
- Kubernetes
从 Kubernetes 1.8 版本开始,Kubernetes 使用设备插件模型(device plugin model)来支持 GPU、高性能NIC,FPGA 等设备。目前 Kubernetes 支持 Nvidia 、AMD 和 Intel 的 GPU 设备。在 Spark + k8s 里面为 task 指定 GPU 的数量和在 Standalone 或 YARN 模式里面一样。也是支持 spark.task.gpus
和 spark.executor.gpus
的全局配置,也支持在 RDD stage 中为每个 task 设置。
功能概述
GPU 和其他加速器已广泛用于加速特殊工作负载,例如深度学习和信号处理。Spark 现在支持请求和调度通用资源,例如 GPU。
Spark 将使用指定的配置首先从集群管理器请求具有相应资源的容器。一旦获得容器,Spark 就会在该容器中启动一个 Executor,它将发现容器拥有哪些资源以及与每个资源关联的地址。Executor 将向 Driver 注册并报告该 Executor 可用的资源。然后 Spark 调度器可以将任务调度到每个 Executor 并根据用户指定的资源需求分配特定的资源地址。
注意:它目前不适用于 Mesos 或本地模式。
阶段级调度(Stage Level Scheduling)
阶段级别调度功能(SPARK-27495)允许用户在阶段级别指定任务和执行器资源需求。这允许不同阶段与具有不同资源的执行程序一起运行。一个典型的例子是一个 ETL 阶段,执行器只使用 CPU,下一个阶段是需要 GPU 的 ML 阶段。阶段级调度允许用户在 ML 阶段运行时请求具有 GPU 的不同执行器,而不必在应用程序开始时获取具有 GPU 的执行器,并且在 ETL 阶段运行时它们处于空闲状态。
这仅适用于 Scala、Java 和 Python 中的 RDD API。当启用动态分配时,它在 YARN 和 Kubernetes 上可用。
该功能在Spark3.0.0版本还没有实现,在Spark3.1版本才有。
YARN上资源分配与配置
YARN 3.1.0 中添加了 YARN 上的GPU资源调度。理想情况下,资源是隔离设置的,以便执行器只能看到分配给它的资源。如果您没有启用隔离,则用户负责创建一个发现脚本,以确保资源不会在执行程序之间共享。
YARN 目前支持任何用户定义的资源类型,但内置了 GPU ( yarn.io/gpu
) 和 FPGA ( yarn.io/fpga
) 类型。因此,如果您正在使用这些资源中的任何一个,Spark 可以将对 spark 资源的请求转换为 YARN 资源。
对GPU和FPGA仅配置spark.{driver/executor}.resource. 否则配置YARN (spark.yarn.{driver/executor}.resource.) 和 Spark (spark.{driver/executor}.resource.)
例如:每个Executor请求2个GPUs,用户指定
spark.executor.resource.gpu.amount=2
,Spark会自行处理YARN上的yarn.io/gpu资源。如果是其他自定义资源,如acceX,则需要同时写spark.yarn.executor.resource.acceX.amount=2
andspark.executor.resource.acceX.amount=2
.
YARN 不会告诉 Spark 分配给每个容器的资源地址。出于这个原因,用户必须指定一个发现脚本,该脚本在启动时由executor运行,以发现该executor可用的资源。可以在 examples/src/main/scripts/getGpusResources.sh
中找到示例脚本。
- 阶段级调度
启用动态分配(dynamic allocation)时,YARN 支持阶段级调度。
测试用例
一个简单的资源spark-shell GPU配置启动:
./bin/spark-shell --master yarn --executor-cores 2 \
--conf spark.driver.resource.gpu.amount=1 \
--conf spark.driver.resource.gpu.discoveryScript=/opt/spark/getGpuResources.sh \
--conf spark.executor.resource.gpu.amount=2 \
--conf spark.executor.resource.gpu.discoveryScript=./getGpuResources.sh \
--conf spark.task.resource.gpu.amount=1 \
--files examples/src/main/scripts/getGpusResources.sh
对于这种配置,我们期望在 Executor 上一次运行不超过两个任务,因为每个任务将使用一个 GPU,并且每个 Executor 最多有两个 GPU。
- GPU分配API
// Task API
val context = TaskContext.get()
val resources = context.resources()
val assignedGpuAddrs = resources("gpu").addresses
// Pass assignedGpuAddrs into TensorFlow or other AI code
// Driver API
scala> sc.resources("gpu").addresses
Array[String] = Array(0)
- 加速效果
criteo是非常经典的点击率预估比赛。训练集4千万行,特征连续型的有13个,类别型的26个,没有提供特征名称,样本按时间排序。测试集6百万行。
社区发展路线
Spark 2时代,正如我之前提到的,传统上在与机器学习相关的管道中,数据准备,在一个 CPU 集群上,由 Spark 编排,这必然会被序列化到共享存储,然后是一个单独的GPU集群将实际加载序列化数据备份并使用它进行训练,如 Tensorflow。
有了 Spark 3,我们终于有了一个统一的架构。我们有一个单一的管道,具备阶段级调度,我们实际上可以将它作为一个应用程序进行调度,我们可以进行数据摄取、准备和模型训练,所有这些都由 Spark 编排,单一平台,为 AI 而生。
Spark 3 with Project Hydrogen