Spark运行模式
Spark 的运行模式多种多样、灵活多变,部署在单机上时,既可以用本地模式运行,也可以用伪分布式模式运行,而当以分布式集群的方式部署时,也有众多的运行模式可以供选择,这取决于集群的实际情况,底层的资源调度既可以依赖于外部的资源调度框架,也可以使用 Spark 内建的 Standalone 模式。对于外部资源调度框架的支持,包括Mesos 模式,以及Hadoop YARN 模式。
在实际应用中,Spark 应用程序的运行模式取决于传递给 SparkContext 的 MASTER 环境变量的值,个别模式还需要依赖辅助的程序接口来配合使用,目前所支持的 MASTER 环境变量由特定的字符串或 URL 所组成。如下:
local(本地模式):常用于本地开发测试,本地还分为local单线程和local-cluster多线程:
- Local[N]:本地模式,使用 N 个线程。
- Local[*]:还是本地模式,但是用了系统中所有的核。
- Local[N,M]:这里有两个参数,第一个代表的是用到的核个数;第二个参数代表的是容许该作业失败M次。上面的几种模式没有指定M参数,其默认值都是1。
- Local Cluster[Worker,core,Memory]:伪分布式模式,可以配置所需要启动的虚拟工作节点的数量,以及每个工作节点所管理的 CPU 数量和内存尺寸。
cluster(集群模式):master-slave模式,Master服务(YARN ResourceManager,Mesos master和Spark standalone master)决定哪些application可以运行,什么时候运行以及哪里去运行。而slave服务( YARN NodeManager,Mesos slave和Spark standalone slave)实际上运行executor进程。
- Spark://hostname:port : Standalone模式,需要部署 Spark 到相关节点,URL 为 Spark Master 主机地址和端口。部署Standalone模式的集群仅需把编译好的Spark发布版本分发到各节点,部署路径尽量一致,且配置文件相同。
- Mesos://hostname:port : Mesos模式,需要部署 Spark 和 Mesos 到相关节点,URL 为 Mesos 主机地址和端口。
- YARN standalone/Yarn cluster : YARN 模式一,主程序逻辑和任务都运行在YARN集群中。用于集群生产模式。
-
YARN client : YARN 模式二,主程序逻辑运行在本地,具体任务运行在YARN集群中。主要用于与用户交互调试模式,可以快速地看到application的输出。spark-shell和pyspark必须要使用yarn-client模式。
二者主要区别是Driver的运行位置。
Spark架构中的基本组件
- ClusterManager:在Standalone模式中即为Master(主节点),控制整个集群,监控Worker。在YARN、Mesos模式中担任资源管理器。
- Worker:从节点,负责控制计算节点,启动Executor或Driver。在YARN模式中为NodeManager,负责计算节点的控制。
- Driver:运行Application的main()函数并创建SparkContext。
- Executor:执行器,在worker node上执行任务的组件、用于启动线程池运行任务。不同的Spark应用程序拥有独立的一组Executors。
- SparkContext:整个应用的上下文,控制应用的生命周期。
- RDD:Spark的基本计算单元,一组RDD可形成执行的有向无环图RDD Graph。
- Task:运行于Executor中的任务单元,Spark应用程序最终被划分为经过优化后的多个任务的集合。
- Job:由多个任务构建的并行计算任务,具体为Spark中的action操作,如collect,save等)。
- Stage:每个job将被拆分为更小的task集合,这些任务集合被称为stage,各stage相互独立(类似于MapReduce中的map stage和reduce stage),由于它由多个task集合构成,因此也称为TaskSet。
- DAG Scheduler:根据作业(Job)构建基于Stage的DAG,并提交Stage给TaskScheduler。
- TaskScheduler:将任务(Task)分发给Executor执行。
- SparkEnv:线程级别的上下文,存储运行时的重要组件的引用。
其中SparkEnv内创建并包含如下一些重要组件的引用:
- MapOutPutTracker:负责Shuffle元信息的存储。
- BroadcastManager:负责广播变量的控制与元信息的存储。
- BlockManager:负责存储管理、创建和查找块。
- MetricsSystem:监控运行时性能指标信息。
- SparkConf:负责存储配置信息。
弹性分布式数据集(RDD)
- RDD设计目标
RDD用于支持在并行计算时能够高效地利用中间结果,支持更简单的编程模型,同时也具有像MapReduce等并行计算框架的高容错性、能够高效地进行调度及可扩展性。RDD的容错通过记录RDD转换操作的lineage关系来进行,lineage记录了RDD的家族关系,当出现错误的时候,直接通过lineage进行恢复。RDD最合数据挖掘,机器学习及图计算等涉及到迭代计算的场景,基于内存能够极大地提升其在分布式环境下的执行效率。RDD不适用于诸如分布式爬虫等需要频繁更新共享状态的任务。
spark-shell中如何查看RDD的Lineage:
//textFile读取hdfs根目录下的README.md文件,然后筛选出所有包括Spark的行
scala> val rdd2=sc.textFile("/README.md").filter(line => line.contains("Spark"))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:21
//toDebugString方法会打印出RDD的家族关系
//可以看到textFile方法会生成两个RDD,分别是HadoopRDD、MapPartitionsRDD,而filter同时也会生成新的MapPartitionsRDD
scala> rdd2.toDebugString
15/09/20 01:35:27 INFO mapred.FileInputFormat: Total input paths to process : 1
res0: String =
(2) MapPartitionsRDD[2] at filter at <console>:21 []
| MapPartitionsRDD[1] at textFile at <console>:21 []
| /README.md HadoopRDD[0] at textFile at <console>:21 []
- RDD抽象
RDD在Spark中是一个只读的(val类型)、经过分区的记录集合。
两种创建方式:从存储系统中创建[本地/分布式/内存文件系统]或者从其它RDD中创建。
1.从HDFS中创建RDD
scala> sc.textFile("/README.md")
res1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at textFile at <console>:22
2.从内存中创建RDD
//内存中定义了一个数组
scala> val data = Array(1, 2, 3, 4, 5)
data: Array[Int] = Array(1, 2, 3, 4, 5)
//通过parallelize方法创建ParallelCollectionRDD
scala> val distData = sc.parallelize(data)
distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at parallelize at <console>:23
3.从其它RDD创建新的RDD
//filter函数将distData RDD转换成新的RDD
scala> val distDataFiletered=distData.filter(e=>e>2)
distDataFiletered: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at filter at <console>:25
//触发action操作,注意collect只适合数据量较少时使用
scala> distDataFiltered.collect
res3: Array[Int] = Array(3, 4, 5)
-
RDD的窄依赖与宽依赖
RDD经过transformation操作后会生成新的RDD,前一个RDD与tranformation操作后的RDD构成了lineage关系,也即后一个RDD与前一个RDD存在一定的依赖关系,根据tranformation操作后RDD与父RDD中的分区对应关系,可以将依赖分为两种:宽依赖(wide dependency)和窄依赖(narrow dependency),如下图:
图中的实线空心矩形代表一个RDD,实线空心矩形中的带阴影的小矩形表示分区(partition)。从上图中可以看到, map、filter、union等transformation操作后的RDD仅依赖于父RDD的固定分区,它们是窄依赖的;而groupByKey后的RDD的分区与父RDD所有的分区都有依赖关系,此时它们就是宽依赖的。join操作存在两种情况,如果分区仅仅依赖于父RDD的某一分区,则是窄依赖的,否则就是宽依赖。
RDD API:http://blog.csdn.net/pelick/article/details/44922619
reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合,而groupByKey算子不会进行map-side预聚合(类似于MapReduce中的本地combiner)