Spark基础知识
- Spark是基于内存的计算框架,但是也存在磁盘IO。
- 使用的排序算法:归并排序(大数据里基本都使用该排序算法)
- 四种运行模式:
- Local (一般用于测试)
- Standalone(Spark自带模式)
- Mesos
- YARN
- Spark容错的几种方法:
- 缓存
- checkpoint
- 重新计算
基本术语
- Application: Spark应用程序,即我们编写的包含main函数的整个程序
- Driver Program: 运行main函数并创建SparkContext
- Cluster Manager: 在集群上获取资源的外部服务(Standalone模式下是Master,Yarn模式下是ResourceManager)
- Worker Node: 集群中任何可以运行Application代码的节点,每个节点可起一个或多个Executor
- Executor: 在某个Worker Node上为某个Application启动的一个进程,该进程负责运行Task,并负责将数据存储到内存或磁盘上,每个Executor由若干core(核)组成,即一个进程由若干线程组成,Executor核数和内存大小可调节,每个Application都有各自独立的Executors
- Task: 被送到某个Executor上的工作单元(最小单位)
- Job: 一个Job包含多个Stage
- Stage: 一个Stage包含多个Task
Block和partition
- HDFS中,Block是分布式存储的最小单元,位于存储空间,大小固定(一般设置为128M),Block存在冗余(即副本),一个Block内的内容只能来自同一份文件,不允许跨文件
- partition是RDD的最小单元,位于内存空间,大小不固定,Stage根据pipeline划分Tasks时,一个Task就对应一个partition
Spark运行简单原理
工作原理:
- 集群启动,Worker Node注册到Cluster Manger并持续发送心跳
- 提交Application,Driver Pragram创建SparkContext上下文
- SparkContext向Cluster Manager申请资源
- Cluster Manger找到资源后告诉SparkContext去哪些Worker Node申请资源
- SparkContext向对应的Worker Node申请资源
- Worker Node创建Executor后告诉SparkContext资源已经分配好了
- SparkContext把Task分配到各Executor上执行
- Executor将计算结果返回给SparkContext
- 整个Application完成后,SparkContext通知Cluster Manager释放资源
- Culster Manger让对应的Worker Node释放资源,Executor被释放
注意:
- 运行模式有多种,区别在于使用了不同的资源管理框架来分配集群资源。
- Spark应用程序(Application)的原理是相同的,注意不要和资源管理框架混淆。
Standalone Client模式运行原理
编写代码,打jar包,以Client模式提交到Standalone集群中运行Spark程序。
- 在Shell中用spark-submit脚本提交Spark Application
- 在该机器上启动一个Driver进程
- 在Driver进程中new一个SparkContext对象
- SparkContext初始化的时候,首先会构造一个DAGScheduler
- SparkContext初始化的时候,其次会构造一个TaskScheduler
- TaskSchedulerBackend(TaskScheduler的后台进程)连接Master并向Master注册当前Application
- Master收到该Application的注册请求后,使用资源调度算法,在Spark集群的Worker Nodes上为该Application启动多个Executor
- Executor启动后反向注册到TaskScheduler上, 当所有Executor都反向注册完后,SparkContext初始化完毕
- SparkContext初始化完毕后,继续执行我们编写的程序,DAGScheduler开始切分Job,遇到一个Action算子(源码中使用
sc.runJob()
)后,构成一个Job - DAGScheduler根据宽依赖切分Job,构成多个Stage,然后DAGScheduler根据Pipeline将各个Stage封装成对应的TaskSet
- DAGScheduler将TaskSet传给TaskScheduler
- TaskScheduler根据之前获得的Executor信息把TaskSet里的每个Task提交到相应的Executor进程中执行
- Task被放到TaskRunner中,Executor进程启多个线程并行执行TaskRunner
注意:
- 各模式的运行原理是相同,区别在于:
- Standalone模式:Master、Worker集群管理
- Yarn模式:ResourceManager、NodeManager集群管理
- 两种提交模式的区别:
- Client模式:Driver进程运行在提交Application的Client上
- Cluster模式:在Client上提交Application后,Master分配一个Worker来启动Driver进程完成SparkContext的初始化
RDD基础知识
RDD:弹性分布式数据集(Resilient Distributed Dataset)
RDD是一个抽象的概念,具有如下特性:
- 一个RDD由多个partition(分区)组成,每个partition对应一个节点上的一块内存,因此partition不是抽象的概念。
- 多个partition并行计算,因此partition数就等于计算并行度。
- RDD之间存在依赖列表(关系),可用于重新计算丢失的数据。
- [可选]K-V格式的RDD可以重新分区。
- [可选]Spark择优选择每个partition的计算位置(本地性),例如HDFS系统。
Hadoop集群一般与Spark集群建在相同的节点上,各节点磁盘上分布式存储文件,构成HDFS系统。当Spark需要加载内容到内存中时,根据本地性,会从本节点的磁盘上获取内容并加载到内存中,HDFS系统中的Block对应于Spark中的partition。多个partition并行计算。
RDD持久化
使用持久化的时机:
- 当RDD需要被重复使用的时候
- 需要做容错的时候
几种RDD持久化策略:
//持久化策略定义
class StorageLevel private(
private var _useDisk: Boolean, // 是否使用本地磁盘(不是HDFS)
private var _useMemory: Boolean, // 是否使用内存
private var _useOffHeap: Boolean, // 是否不适用Java堆内存
private var _deserialized: Boolean, // 是否不序列化
private var _replication: Int = 1 // 副本数(默认为1)
)
// 多种持久化策略定义
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
持久化策略的选择建议:
- 优先选择默认的 MEMORY_ONLY
- 如果内存空间紧张,选择 XXX_SER
- 如果需要对缓存的数据具备一定的容错性,选择 XXX_2
- 如果某RDD的计算代价比较大,则选择 MEMORY_AND_DISK
MEMORY_ONLY 和 MEMORY_AND_DISK 的 区别:
- MEMORY_ONLY将RDD缓存到内存中的持久化区域,如果该区域内存不够,则多余的数据不缓存
- MEMORY_AND_DISK优先缓存RDD到内存中的持久化区域,如果该区域内存不够,则将多余的数据存到本地磁盘
RDD算子
分类:Transformation算子、Action算子
Transformation算子
- filter(f : T => Boolean) : RDD[T] => RDD[T]
- map(f : T => U) : RDD[T] => RDD[U]
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
- flatMap(f : T => Seq[U]) : RDD[T] => RDD[U]
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
- sample(fraction : Float) : RDD[T] => RDD[T] 取样
- groupByKey() : RDD[(K,V)] => RDD[(K,Seq[V])]
- reduceByKey(f : (V,V) => V) : RDD[(K,V)] => RDD[(K,V)]
- union() : (RDD[T],RDD[T]) => RDD[T]
- join() : (RDD[(K,V)],RDD[(K,W)]) => RDD[(K,(V,W))]
- cogroup() : (RDD[(K,V)],RDD[(K,W)]) => RDD[(K,(Seq[V],Seq[W]))]
- crossProduct() : (RDD[T],RDD[U]) => RDD[(T,U)]
- mapValues(f : V => W) : RDD[(K,V)] => RDD[(K,W)]
- sort(c : Comparator[K]) : RDD[(K,V)] => RDD[(K,V)]
- partitionBy(p: Partitioner[K]) : RDD[(K,V)] => RDD[(K,V)]
Action算子
- count() : RDD[T] => Long
- collect() : RDD[T] => Seq[T] 慎用,数据量大时会OOM
- reduce(f : (T,T) => T) : RDD[T] => T
- lookup(k : K) : RDD[(K,V)] => Seq[V]
- save(path : String) : RDD存储到磁盘
注意:
在一个Application可以有多个Action操作,那么运行时就会提交多个Job。
这些Job是按顺序依次执行的,前一个Job执行完才会提交后一个Job执行。
RDD的宽窄依赖
RDD的宽窄依赖是根据RDD算子判定的。例如:map、filter是窄依赖;groupByKey是宽依赖。
窄依赖:父RDD的一个partition只对应子RDD的一个partition,即子RDD的各partition计算不依赖于父RDD的其他partition,不发生shuffle(洗牌)操作。
宽依赖:父RDD的一个partition对应子RDD的多个partition,即子RDD的各partition计算依赖于父RDD的其他partition,发生shuffle(洗牌)操作。
注意:如果算子是窄依赖,则父RDD和子RDD的partition数量始终是相同的;如果算子是宽依赖(shuffle算子),一般情况下父RDD和子RDD的partition数量是相同的(除非是shuffle算子本身改变了partition数量或者设置了默认partition数)。
例如:父RDD的partition数为5,groupByKey()后,子RDD的partition数还是5,但是如果使用groupByKey(10),那么子RDD的partition数为10。
如果设置了spark.default.parallelism=6,那么groupByKey()后,子RDD的partition数是6,groupByKey(10)不受影响,子RDD的partition数还是10。
因此优先级如下:
- shuffle算子传入partition个数参数,则改变RDD的partition数为该参数值;
- 设置spark.default.parallelism,则改变partition数为该设置值;
- 否则子RDD的partition数和父RDD的partition数保持一样。
shuffle算子(宽依赖)理解:
shuffle算子本身分为两个阶段:map阶段和reduce阶段。
- map阶段就是map算子,是窄依赖,父RDD和子RDD各partition一一对应。
- reduce阶段是宽依赖,根据目标partition数对map阶段计算出来的RDD内各partition进行划分,然后取每个partition的对应部分组成新的partition,最终形成新的RDD(partition数就变成目标partition数了)。
shuffle算子有:
- repartition、coalesce(可控制是否shuffle)
- *ByKey:groupByKey、reduceByKey等
- join、cogroup
共享变量
广播变量
广播变量是只读的,且会被缓存到集群的各个台机器中,而不是像普通变量一样被copy到Task内(网络传输中需要序列化和反序列化),当广播变量创建(sc.broadcast(v)
)完成后不允许被修改。累加器
累加器只能用于累加计数(数字类型和自定义类型)。创建方法(sc.accumulator(v)
)。
累加器只能在Driver端被读取,在Executor内只能累加不能读取。
----------------------
欢迎补充修正