数据集
Spark支持弹性分布式数据集,具体包括如下3种,三类数据集可以相互转换
数据基本单元RDD
不支持Spark Sql
DataFrame
Structured data files
Tables in Hive
External databases
Using existing RDD
Datasets
支持java对象获取指定的field
缺点
a. 不支持Union Type
b. Varchar Type没有超size的错误提醒,相同数据从Hive读出来会被清空,但是从spark里面读出来不会被清空SparkSQL将varchar处理成string没有大小限制
c.不支持Transactional Table:Spark SQL不支持Hive transactions
d. 不支持Char Type
e. N不支持Time-StampinAvro Table.
SparkContext
SparkConf:在SparkConf中设置的属性会覆盖system属性,且一旦上传Spark,设置不可修改;SparkConf(false) 来避免加载外部属性(常用在单元测试中)Spark UI在spark调用stop的时候自动关闭
内部工作机制
Application->Job->Stage->Task 的分解、分发和并行计
在一个Spark应用的执行过程中,Driver是应用的逻辑执行起点,运行Application的main函数并创建SparkContext,DAGScheduler把对Job中的RDD有向无环图根据依赖关系划分为多个Stage,每一个Stage是一个TaskSet, TaskScheduler把Task分发给Worker中的Executor;Worker启动Executor,Executor启动线程池用于执行Task
Spark从外部空间(HDFS)读取数据形成RDD_0,Tranformation算子对数据进行操作(如fliter)并转化为新的RDD_1、RDD_2,通过Action算子(如collect/count)触发Spark提交作业
I/O机制
序列化及块管理
通信机制
Spark在模块间通信使用的是AKKA框架。AKKA基于Scala开发,用于编写Actor应用。Actors是一些包含状态和行为的对象。它们通过显式传递消息来进行通信,这些消息会被发送到它们的收信箱中(消息队列)。
容错机制
Lineage机制
记录粗粒度的更新(每个RDD维护一个指向其parent的指针,对于提交失败的场景:通过在另一个partition上重复执行失败的DAG调用链即可)
Checkpoint机制
将RDD写入Disk做检查点。检查点的本质是作为Lineage做容错的辅助,lineage过长会造成容错成本过高。在计算的中间阶段做检查点容错,如果之后的节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就可以减少开销。
Shuffle机制
当单进程空间无法容纳所有计算数据进行计算时,通过Shuffle将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。数据倾斜是Spark性能优化的一个重大课题。
可能会触发shuffle操作的算子 :distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。
Shuffle分为两个阶段:Shuffle Write和Shuffle Fetch
DAG调度
边:用于RDD的操作
节点:RDD,一组分片的集合:创建RDD时指定RDD的分片个数,没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目
RDD的持久化,包括cache缓存,persist,MEMORY_ONLY(以java对象存于内存,等价于cache())3种;具体配置及说明如下:
MEMORY_ONLY使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。那么下次对这个RDD执行算子操作时,那些没有被持久化的数据,需要从源头处重新计算一遍【默认的持久化策略,使用cache()方法时,实际就是使用的这种持久化策略】
MEMORY_AND_DISK
使用未序列化的Java对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个RDD执行算子时,持久化在磁盘文件中的数据会被读取出来使用。
MEMORY_ONLY_SER
同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,可避免持久化的数据占用过多内存导致频繁GC。
MEMORY_AND_DISK_SER
同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,可避免持久化的数据占用过多内存导致频繁GC。
DISK_ONLY
使用未序列化的Java对象格式,将数据全部写入磁盘文件中。
MEMORY_ONLY_2, MEMORY_AND_DISK_2等对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。
每个DAG调用Action进行提交,触发DAG调度器进行之后stages下任务分发及调度;
宽窄依赖
窄依赖:每一个父RDD的Partition最多被子RDD的一个Partition使用
宽依赖(涉及shuffling):多个子RDD的Partition会依赖同一个父RDD的Partition
宽依赖是划分Stage的依据
shuffling的两个重要参数
spark.shuffle.compress是否会将shuffle中outputs的过程进行压缩,spark.io.compression.codec将编码器设置为压缩数据,默认是true. spark.shuffle.manager设置shuffle时的排序算法,有hash,sort,tungsten-sort。用hash会快一点
Stage
执行计划中的一个步骤,包括两类:ShuffleMapstag、ResultStage是一系列并行tasks的结合(一个task/分区)parallel tasksaction触发job提交(触发该stage及其parent的stags执行)
集群管理
支持Standalone模式、YARN模式、Mesos模式
Spark性能调优
RDD的复用:同一份数据进行多个算子操作尽量使用同一份RDD
RDD持久化:对多次使用的RDD持久化,保证一个RDD被多次使用时只被计算一次
默认情况下MEMORY_ONLY性能最高,前提:内存必须足够大,可放下整个RDD的所有数据。避免序列化及反序列化的性能开销;实际生产环境,如果RDD中数据较多(如几十亿),直接用此持久化级别,会导致JVM的OOM内存溢出异常。
其次建议使用MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中(每个partition仅仅是一个字节数)但多出了序列化与反序列化的开销。如果RDD中的数据量过多的话,还是可能会导致OOM内存溢出的异常。
上述两种都无法满足的情况下,建议使用MEMORY_AND_DISK_SER策略,非MEMORY_AND_DISK策略,RDD的数据量很大,内存无法完全放下;序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。
不建议使用DISK_ONLY和后缀为_2的级别:完全基于磁盘文件进行数据的读写,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性能开销,除非是要求作业的高可用性,否则不建议使用