简介
Spark
Spark是分布式计算框架
Spark可以和多种存储系统结合使用,如Kafka、HBase、Hive、HDFS以及关系型数据库。
-
与MapReduce的区别
- Spark是基于内存迭代处理数据;MapReduce是基于磁盘迭代处理数据
- Spark中有DAG(有向无环图)执行引擎,执行速度快
- Spark是粗粒度资源申请,MapReduce是细粒度的资源申请
- MapReduce中只有mapper和reducer,相当于spark中的map和reduceByKey两个算子
在MapReduce中很多业务逻辑要自己实现,Spark中有各种算子对应各种业务逻辑
- MapReduce中只有mapper和reducer,相当于spark中的map和reduceByKey两个算子
-
Spark的四大特性
- 高效性:运行速度提高100倍
- 易用性:支持Java,Scala,Python,R和SQL
- 通用性:Spark提供了统一的解决方案。可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)
- 兼容性:可以运行在Hadoop,Apache Mesos,Kubernetes,Standalone或者云环境上,可以连接不同的数据源。
Spark运行模式
- local,用于本地开发,多用于测试
- Standalone,Spark自带的资源调度框架
- Yarn,Hadoop生态圈中的资源调度框架
- Mesos,资源调度框架(国内不常用)
Spark Application
driver
driver驱动程序是Spark应用程序的核心,并在应用程序的生命周期内维护所有相关信息
-
driver职责
- 维护有关Spark应用程序的信息
- 响应用户的程序或输入
- 分析、分配和调度executor的工作
executor
executors进程实际执行driver分配给它们的工作
executors是一个worker进程所管理的节点上为某Application启动的一个进程,该进程负责运行任务,并且负责将数据存在内存或磁盘上。
每个应用程序都有各自独立的executors。
-
executor职责
- 执行由驱动程序分配给它的代码
- 将执行器executor的计算状态报告给驱动节点
Task
- 被发送到某个executor上的工作单元
Job
- 包含很多任务(task)的并行计算
(在一个SparkApplication中有几个action算子就有几个Spark job)
Stage
- 一个Job会被拆分成很多任务,每组任务被称为Stage——stage由一组并行的task组成
(类似于MapReduce分为map task和reduce task一样) - Job根据RDD的宽窄依赖来划分Stage
- Stage的并行度由finalRDD的partition数量决定
- Stage的计算模式:pipeline管道计算模式--一条条处理数据
任务提交(standalone)
-
client
- 在客户端提交Application,Driver在客户端启动
- 客户端向Master申请资源,Master返回Worker节点
- Driver向worker节点发送task,监控task执行,回收结果
- 注:client模式提交任务适用于程序测试,不适用于生产。
Driver是在客户端启动,当在客户端提交多个application时,会有网卡流量激增问题
-
cluster
- 在客户端提交application,首先客户端向Master申请启动Driver
- Master随机在一台Worker中启动Driver
- Driver启动之后向Master申请资源,Master返回资源
- Driver发送task,监控task,回收结果
- 注:cluster模式提交任务适用于生产环境,Driver是在集群中某一个节点启动
会将client模式的网卡流量激增问题分散到集群
-
driver的功能
- 发送task
- 监控task
- 申请资源
- 回收结果
名词
SparkSession
- 任何Spark程序的第一步都是先创建SparkSession
DataFrames
在Spark中,DataFrame是一种以RDD为基础的分布式数据集
DataFrame只是表示包含行和列的数据表
-
DataFrames特点
- 支持单机KB级到集群PB级的数据处理
- 支持多种数据格式和存储系统(Hive,MySQL,PostgreSQL,JSON等等)
- 通过Spark SQL Catalyst优化器可以进行高效的代码生成和优化
- 能够无缝集成所有的大数据处理工具
- 提供Python,Java,Scala,R语言API
对于DataFrames,大多数情况下不会手动或单独操作分区。您只需在物理分区中指定数据的高级转换,Spark将确定该工作将如何在集群上执行。
Spark SQL可以将任何DataFrame注册为表或视图(临时表),并使用纯SQL查询它。
在编写SQL查询和编写DataFrame代码之间没有性能差异。
DataSet
- DataFrames和Dataset都是(分布式的)类似于表的集合,具有定义好的行和列
- Dataset只适用于Java虚拟机(JVM)的语言(Scala和Java)并且指定带有case类和JavaBean的类型
- 对于Spark在Python或R中没有Dataset这样的东西,所有的都是DataFrame
Schema
- 定义了DataFrame的列名和数据类型
Partition
分区是集群中的一个物理机器上的行集合;
Partition是RDD的最小存储单元分区从不跨越多台计算机,即同一分区中的元组保证位于同一台计算机上;
群集中的每台计算机都包含一个或多个分区-
两种类型的分区
- Hash partitioning
(通常,哈希分区将元组(k,v)分配给分区p,其中 p = k.hashCode() % numPartitions) - Range partitioning
- 默认情况下,当使用sortByKey时,将使用RangePartitioner;
使用groupByKey时的默认分区器是HashPartitioner。
- Hash partitioning
RDD
RDD(Resilient Distributed Dataset)弹性分布式数据集
RDD中是不存数据的,partition中也不存数据,存的是处理逻辑
-
五大特性
- RDD是由一系列Partition组成的
(在从HDFS读取文件时block的数量决定了partition的数量)
- RDD是由一系列Partition组成的
- 算子(函数)是作用在Partition上的
- RDD之间有依赖关系
(容错性:在一个lifeage中只要其中一个RDD还在就可以生成其他RDD)
- RDD之间有依赖关系
- 分区器是作用在(K,V)格式的RDD上的
(K,V格式的RDD表示RDD中的每个元素都是一个二元组)
- 分区器是作用在(K,V)格式的RDD上的
- RDD对外提供最佳的计算位置
(利于数据处理的本地化)
- RDD对外提供最佳的计算位置
Spark没有直接读取HDFS文件的方法,textFile方法的底层调用的是MapReduce读取
(首先会split,每个split的默认大小为128M即block的大小,每个split对应一个partition)
算子
-
Transformations类型算子
Lazy Evaluation--它是懒加载的
-
转换的类型
- 窄依赖的转换:每个输入数据分区只对一个数据输出分区(或多对一)
- 宽依赖的转换:一个输入数据分区对应多个输出分区
(会产生shuffle的算子就是宽依赖的算子)
-
Action类型算子
可以触发Lazy Evaluation算子执行
-
类型
- 在控制台中查看数据的action
- 数据收集的action操作
- 输出到第三方存储系统的action
在一个SparkApplication中有几个action算子就有几个Spark job
-
持久化类型算子
-
cache
- 默认将数据存在内存中
- 缓存的最小单位是partition,是Lazy Evaluation类型的算子,需要action算子触发
- application执行结束之后数据会被清除
-
persist
- 可以手动指定数据的持久化级别
- 缓存的最小单位是partition,是Lazy Evaluation类型的算子,需要action算子触发
- application执行结束之后数据会被清除
-
checkpoint
可以将数据持久化到磁盘,还可以切断RDD之间的依赖关系
当lineage非常长并且计算又复杂时,可以使用checkpoint对RDD进行持久化
application执行结束之后数据不会被清除
-
checkpoint的执行流程
- 当application有action触发执行时,job执行之后会从后往前回溯
- 回溯去找有哪些RDD被checkpoint,被checkpoint的做标记
- 回溯完成之后,重新计算checkpointRDD的数据,将结果写入指定的checkpoint的目录中
- 切断RDD之间的依赖关系
- 优化:在对RDD进行checkpoint之前,最好先cache下
-
SPARK是一个分布式编程模型,用户可以在其中指定TRANSFORMATION。
多个TRANSFORMATION构建一个有向无环图(DAG)。一个ACTION开始执行DAG的过程,作为一个单一的JOB作业,将它分解成多个STAGES阶段和TASK任务,以便在整个集群中执行
Spark的资源调度与任务调度
资源调度
- 1.启动集群,Worker向Master汇报资源,Master掌握集群资源
- 当new SparkContext时会创建两个对象:DAGScheduler和TaskScheduler
- TaskScheduler向Master申请资源
- Master收到请求,找到满足资源的worker,启动Executor
- Executor启动之后反向注册给TaskScheduler,Driver掌握了一批资源
任务调度
- 有一个action算子就有一个job,job中有RDD,RDD会形成有向无环图
- DAGScheduler负责将每个job中的DAG按照RDD的宽窄依赖切割job,划分stage,将stage以TaskSet形式提交给TaskScheduler
- TaskScheduler遍历TaskSet,拿到一个个task,将task发送到Executor中的ThreadPool去执行
- TaskScheduler监控task执行,回收结果
小结
- TaskScheduler可以重试发送失败的task,默认重试3次,若仍然失败则stage失败
- DAGScheduler负责重试失败的Stage,默认重试4次,
若仍然失败则job失败即Application失败
- DAGScheduler负责重试失败的Stage,默认重试4次,
- TaskScheduler不仅可以重试失败的task,还可以重试执行缓慢的task
这是Spark的推测执行机制,默认是关闭的(对于ETL业务场景建议关闭)
- TaskScheduler不仅可以重试失败的task,还可以重试执行缓慢的task