剖析Spark工作的运行
我们来看下当我们运行一个Spark工作时,会发生什么。在最高级别上,有两个独立的实体:驱动(driver)和执行器(executors)。驱动持有(hosts)应用(SparkContext),为工作调度任务。执行器独立于应用,在应用的持续时间内运行,执行应用的任务。通常情况下,驱动作为客户端运行,不受集群管理器的管理,而执行器运行在集群中的多台机器上,但并非总是如此。在本节的其余部分,我们假定应用的执行器已经在运行。
工作的提交
图19-1说明了Spark是怎样运行一个工作的。当在一个RDD上执行一个行动时(比如count()),会自动提交一个Spark工作。内部来看,这会导致SparkContext的runJob()方法被调用(图19-1步骤1),该方法将调用传递给调度器,调度器作为驱动的一部分运行(步骤2)。调度器由两部分组成:DAG调度器和任务调度器。DAG调度器会把工作拆解为多个阶段组成的DAG。任务调度器的责任是把每个阶段的任务提交到集群。
图19-1. Spark怎样运行一个工作
接下来,我们来看看DAG调度器是怎样构建一个DAG的。
DAG的构建
为了理解怎样把一个工作拆分为阶段,我们需要看看阶段中运行的任务的类型。有两种任务类型:混洗map任务 shuffle map tasks 和结果任务 result tasks。任务类型的名字指示出Spark如何处理任务的输出:
混洗map任务
正如名字所暗示的,混洗map任务就像MapReduce中的map侧混洗。每个混洗map任务在一个RDD分区上(基于分区函数)执行计算,把输出写到一组新的分区,供后续的阶段获取,后续阶段可能由混洗map任务或者结果任务组成。混洗map任务运行在除最终阶段以外的所有阶段中。
结果任务
结果任务运行于最终阶段,把结果返回给用户程序(比如count()的结果)。每个结果任务在它的RDD分区上执行计算,然后把结果返回给驱动,驱动再把每个分区的结果组合成最终结果(可能是Unit,在saveAsTextFile()情况下)。
如果不需要混洗,仅有一个由结果任务组成的单独的阶段,这是最简单的Spark工作,就像MapReduce中仅有map任务的工作一样。
更复杂的工作会包含grouping操作,并且须要一个或多个混洗阶段。例如,存储在inputPath的文本文件(每行一个单词),下面的工作会计算出单词数量的直方图:
val hist: Map[Int, Long] = sc.textFile(inputPath)
.map(word => (word.toLowerCase(), 1))
.reduceByKey((a, b) => a + b)
.map(_.swap)
.countByKey()
前面两个转换操作,map()和reduceByKey(),执行单词计数。第三个map()转换操作,会把每个键值对的key和value交换,输出(count, word)对。最后一个是行动操作,countByKey(),返回每个count的word数量(单词数量的频率分布)。
Spark的DAG调度器把这个工作切分为两个阶段,因为reduceByKey()操作强制了一个混洗阶段。结果DAG如图19-2所示。
注意countByKey()是在驱动程序的本地执行最终的聚合操作,而不是第二个混洗阶段。这里不同于示例18-3中的Crunch程序,该程序使用了第二个MapReduce工作来做计数。
图19-2. 计算词数直方图的Spark工作中的阶段和RDD
一般来说,每个阶段内的RDD也会整理到DAG中。上图显示了RDD的类型和创建该RDD的操作。例如,RDD[String]是由textFile()创建的。为了简化图示,这里省略了Spark内部生成的一些中间RDD。比如,textFile()返回的RDD实际上是MapRDD[String],其父是HadoopRDD[LongWritable, Text]。
注意到转换操作reduceByKey()跨越了两个阶段,这是因为它的实现是一个混洗过程,该reduce函数在map侧作为combiner运行,在reduce侧作为reducer运行,正如MapReduce一样。还是与MapReduce相同,Spark的混洗实现会把输出写到本地磁盘的分区文件(即使是内存RDD也一样),这些文件被下一阶段的RDD读取。
通过配置,混洗的性能还有调优的余地。还要注意Spark使用自定义的混洗实现,与MapReduce的混洗实现没有任何的共享代码。
如果一个RDD在同一应用(SparkContext)中的前一个工作中持久化过,DAG调度器不会再创建重算这个RDD及其父RDD的阶段。
DAG调度器的责任是把阶段拆分为任务,提交给任务调度器。在这个例子中,第一个阶段,输入文件的每个分区对应运行一个混洗map任务。reduceByKey()操作的并行级别可以明确指定(传入第二个参数),如果不指定的话,将由父RDD确定,本例中是输入数据的分区数量。
DAG调度器为每个任务指定位置首选项,使得任务调度器可以利用数据本地化的优势。例如,如果一个任务处理的是存储在HDFS上的RDD分区,它的位置首选项是该分区的块(block)所在的数据节点(datanode),被称为节点本地 node local。如果一个任务处理的是缓存在内存中的RDD分区,它的位置首选项是存储该分区的执行器(executor),被称为进程本地 process local。
回到图19-1,一旦DAG调度器创建了完整的阶段DAG,它就会把每个阶段的任务集提交给任务调度器(步骤3)。父阶段成功完成之后,子阶段才会提交。
任务的调度
当任务调度器接收到一坨任务时,它会查找执行器列表,在考虑位置首选项的基础上,建立任务到执行器的映射关系。接着,任务调度器把任务分配给那些有空闲核心的执行器(如果同一应用中的其他工作正在运行的话,这坨任务可能分配不完),随着执行器中任务的运行完成,调度器会继续分配更多的任务,直到这一坨任务全部完成。每个任务分配到的核心数量,可以通过属性spark.task.cpus来设置,默认是1。
对于某个特定的执行器,调度器首先为它分配进程本地(process-local)任务,然后是节点本地(node-local)任务,然后是机架本地(rack-local)任务,最后是任意的非本地(nonlocal)任务,或者是“推测(speculative)任务”,如果没有其他候选者的话。
被分配的任务通过调度器后端(scheduler backend)启动(图19-1步骤4),它会发送一个远程“启动任务消息”(步骤5)给执行器后端(executor backend),告诉执行器要运行任务了(步骤6)。
Spark的远程调用,使用的不是Hadoop RPC,而是Akka,一个基于actor模型的平台,用于构建高度可伸缩的、事件驱动的分布式应用。
当一个任务完成或者失败的时候,执行器会发送“状态更新消息”给驱动。如果任务失败,任务调度器会在另一个执行器上重新提交该任务。如果启用了“推测任务”,并且有运行比较慢的任务,任务调度器还会启动“推测任务”。推测任务默认是未启用的。
任务的执行
执行器运行一个任务的过程如下(步骤7)。首先,它要确认该任务的JAR包和依赖文件都是最新的。前面的任务使用过的依赖文件,会被执行器保持在本地缓存中,只在这些文件变化的时候,执行器才会去下载它们。第二步,执行器反序列化任务的代码(包括用户定义的函数),这些代码是作为“启动任务消息”的一部分发送过来的序列化字节。第三步,执行任务代码。注意,任务和执行器运行在同一个JVM中,因此不存在任务启动的进程开销(Mesos的细粒度模式是个例外,它的每个任务都是一个独立的进程)。
任务会把结果返回给驱动。结果序列化之后发送给执行器后端,然后作为“状态更新消息”返回给驱动。一个混洗map任务返回一些信息,使下一个阶段可以获取它输出的分区。一个结果任务返回它所运行的分区的结果值,驱动再把这些结果值组装成最终结果,返回给用户程序。
执行器和集群管理器
我们已经看过Spark是如何依赖于执行器来执行Spark工作中的任务的,但是我们掩盖了执行器实际上是怎样启动的。管理执行器的生命周期,是集群管理器的责任。Spark提供了多种具有不同特征的集群管理器:
本地
在本地模式下,只有一个执行器,和驱动运行在同一个JVM里。这种模式对于测试和运行小的工作非常有帮助。这种模式的master URL是local(一个线程),local[n](n个线程),或者local(*)(每个核心一个线程)。
独立
独立的集群管理器是一个简单的分布式实现。它运行一个master和一个或多个worker。当一个Spark应用启动时,master会代表这个应用,指示worker进行执行器进程的创建。这种模式的master URL是spark://host:port。
Mesos
Apache Mesos是一个通用的集群资源管理器,根据一个组织策略,它允许不同的应用之间进行细粒度的资源共享。默认情况下(细粒度模式),每一个Spark任务,就是一个Mesos任务。这样可以高效利用集群资源,但是付出的代价是额外的进程启动的开销。在粗粒度模式下,执行器在进程内运行任务,因此在Spark应用的持续时间内,集群资源由执行器进程所持有。这种模式的master URL是mesos://host:port。
YARN
YARN是Hadoop使用的资源管理器。一个运行中的Spark应用,对应一个YARN的应用实例。一个执行器在它自身的YARN容器中运行。这种模式的master URL是yarn-client或者yarn-cluster。
Mesos和YARN集群管理器,优于独立模式,因为它们会考虑集群中运行的其他应用的资源需求(比如MapReduce工作),并为这些应用强制执行一个调度策略。独立模式对集群资源进行静态分配,因此不能适应其他应用随着时间变化的需求。并且,YARN是唯一一个和Hadoop的Kerberos安全机制集成的集群管理器。
Spark on YARN
在YARN上运行Spark,可以和其他的Hadoop组件紧密集成,而且如果你已经有了一个Hadoop集群,这是使用Spark的最方便的方式。Spark提供了两种部署模式:YARN client模式,驱动在client端运行;YARN cluster模式,驱动运行在集群上的YARN application master里。
对于具有交互组件的程序,比如spark-shell或者pyspark,YARN client模式是必须的。当你构建一个Spark程序时,client模式也会非常有帮助,因为任何的调试输出都是立即可见的。
另一方面,YARN cluster模式适用于生产系统(production job),因为整个应用运行在集群上,这样能更容易的保留日志文件(包括驱动程序的日志)以备后查。另外,如果application master失败了,YARN会重试这个应用。
YARN client模式
在YARN client模式中,与YARN的交互开始于驱动程序创建一个新的SparkContext实例的时候(图19-3步骤1)。SparkContext把一个YARN应用提交给YARN资源管理器(步骤2),YARN资源管理器会在集群中某个节点管理器上启动一个YARN容器,然后在容器中运行一个Spark ExecutorLauncher应用(步骤3)。这个ExecutorLauncher的工作是通过向资源管理器请求资源(步骤4),在YARN容器中启动执行器。当容器被分配过来以后,启动ExecutorBackend进程(步骤5)。
图19-3. 在YARN client模式下,Spark执行器的启动过程
执行器启动以后,会回连到SparkContext并注册自己。这样SparkContext中就有了关于可用执行器的数量及其位置的信息。在分配任务时可基于这些位置信息进行决策。
在spark-shell,spark-submit和pyspark中,可以设置要启动的执行器的数量(如果不设置,默认是2),还有每个执行器使用的核心数量(默认是1)以及内存大小(默认是1024MB)。下面这个例子在YARN上运行spark-shell,配有4个执行器,每个执行器使用1个核心和2G内存:
% spark-shell --master yarn-client \
--num-executors 4 \
--executor-cores 1 \
--executor-memory 2g
YARN资源管理器的地址,没有在master URL中指定(不同于独立模式或Mesos集群管理器),而是从Hadoop的配置中获取(Hadoop的配置位于环境变量HADOOP_CONF_DIR指定的目录下)。
YARN cluster模式
在YARN cluster模式中,用户的驱动程序运行在YARN的application master进程中。命令spark-submit的master URL是yarn-cluster:
% spark-submit --master yarn-cluster ...
其他的参数,比如num-executors以及应用的JAR文件(或者Python文件),与YARN client模式相同(使用spark-submit --help查看详细用法)。
图19-4. 在YARN cluster模式下,Spark执行器的启动过程
spark-submit客户端将会启动YARN应用(图19-4步骤1),但它不运行任何的用户代码。余下的流程与client模式相同,唯一的不同是,在为执行器分配资源之前(步骤4),驱动程序是由application master启动的(步骤3b)。
在这两种YARN模式中,执行器启动之前没有任何的数据本地化信息可以使用,因此这些执行器很可能不在那些持有工作对应文件的数据节点上。对于交互式会话,这是可以接受的,因为在会话开始之前可能不知道哪些数据集会被访问。但是对于生产系统(production job),这就不能接受了。因此Spark提供了一种给出位置线索的方式,以使运行YARN集群模式时的数据本地化程度得到提高。
SparkContext的构造函数可以接收第二个参数,用于指定偏爱的位置。此位置根据输入格式以及路径,使用InputFormatInfo帮助类计算而来。例如,对于文本文件,我们使用TextInputFormat:
val preferredLocations = InputFormatInfo.computePreferredLocations(
Seq(new InputFormatInfo(new Configuration(), classOf[TextInputFormat],
inputPath)))
val sc = new SparkContext(conf, preferredLocations)
“偏爱位置”被application master用来向资源管理器请求分配资源(步骤4)。本文写作时,Spark的最新版本是1.2.0,关于偏爱位置的API还不稳定,后续版本中可能会改变。
延伸阅读
本章仅仅覆盖了Spark的基础知识。更多细节请参阅《Learning Spark》。Apache Spark网站上也有关于Spark最新发行版的最新文档。
花了一个星期,在宝贝女儿(2岁)持续不断的骚扰下,终于把这件事情完成了。