RDD解决的问题:
1.中间结果保存在内存中,并且重用
2.提供了通用的抽象的分布式的数据模型
3.提供了多种数据操作模式(支持函数式编程):如map,ruduce,foreach啊
4.不同RDD之间的转换操作之间还可以形成依赖关系
总之:比传统的MapReduce,RDD更快速,更通用,更灵活
RDD介绍:
弹性分布式数据集(抽象数据模型)
总结
RDD具有数据流模型的特点:
自动容错(通过依赖连重新计算),位置感知性调度和可伸缩.
RDD将spark的底层细节都隐藏起来了(任务调度/Task执行,任务失败重试等待)
开发者使用起来可以像操作本地集合一样以函数式编程的方式操作RDD这个抽象的数据集进行各种并行计算
RDD5个主要属性:
①:分区:RDD创建时会指定分区个数,没有指定,就会采用默认值,分区数决定了并行度
②:函数:一个函数会被作用在每一个分区, Spark中RDD的计算是以分区为单位的
③:依赖:一个RDD依赖多层RDD(容错:部分分区数据丢失,Spark可以通过依赖关系重新计算丢失的分区数据,,不用重新计算所有分区)
④:分区器(可选项):即RDD分区函数,一个是基于哈希的HashPartitioner(key_value的RDD),另外一个是基于范围的RangePartitioner(非key_value的RDD)。指定数据去哪个分区
⑤:位置列表(可选项):存储存取每个Partition的优先位置,尽可能选择那些存有数据的worker节点来进行任务计算
RDD 区分Transformation和Action:
转化算子:
①:返回值是RDD的为Transformation转换操作
行动算子:
①:返回值不是RDD(如Unit、Array、Int)的为Action动作操作
②:统计操作(也属于行动算子)
注意:
RDD中实际不存储真正计算的数据,只是记录了RDD的转换关系.
RDD中所有转换操作都是延迟执行的(懒执行),只有当发生Action操作的时候,这些转换才会真正运行
目的:让spark更有效率的运行
持久化:
①:持久化/缓存相关API:RDD通过persist(StorageLevel.级别)或cache方法可以将前面的计算结果缓存,当发生Action操作的时候,才执行持久化
②:缓存的级别有很多,默认只存在内存中,开发中使用memory_and_disk
③:实际开发中如果某一个RDD后面频繁的被使用,那么就可以将该RDD进行持久化/缓存
④:RDD持久化/缓存的目的是为了提高后续操作的速度
⑤:只有执行action操作的时候才会真正将RDD数据进行持久化/缓存
容错:
引入容错的目的:
①:中间结果持久化放在内存中(快),或者磁盘上(不安全)
②:Checkpoint:借助HDFS高容错、高可靠,保证数据最大程度的安全
使用步骤:
调用SparkContext.setCheckpointDir("目录")方法,设置一个容错文件系统目录,比如hdfs,
调用RDD.checkpoint方法。
action计算结束后,会单独启动一个job,将数据写入之前设置的文件系统持久化
如何保证数据安全与读取效率:
对频繁使用且重要的数据,先做持久化,再做checkpint操作
RDD本身的血统容错(回溯依赖链,重放计算出来)加上chekpriont容错(把结果保存在HDFS中)
RDD读取步骤:
一个任务内(action前)看看有没有自己需要的RDD,有就直接使用
任务内部没有,就看之久有没有持久化
没有持久化,就看有没有checkpoint过
如果没有根据依赖(血统)重新计算
持久化和Checkpoint的区别
①:位置
Persist 和 Cache 只能保存在本地的磁盘和内存中(或者堆外内存--实验中)
Checkpoint 可以保存数据到 HDFS 这类可靠的存储上
②:生命周期
Cache和Persist的RDD会在程序结束后立刻被清除
Checkpoint的RDD在程序结束后依然存在,不会被删除
③:依赖关系
Persist和Cache,不会丢掉RDD间的依赖链/依赖关系,因为这种缓存是不可靠的,如果出现了一些错误(例如 Executor 宕机),需要通过回溯依赖链,重放计算出来
Checkpoint会把结果保存在HDFS中,因为HDFS这类存储中,更加的安全可靠,一般不需要回溯依赖链
宽窄依赖
怎么区分宽窄依赖:
窄依赖:父RDD的一个分区只会被子RDD的一个分区依赖
宽依赖:父RDD的一个分区会被子RDD的多个分区依赖(涉及到shuffle的也是宽依赖)
宽窄依赖设计的目的(好处):
DAG
介绍:
有向无环图,其实就是RDD执行的流程
DAG的边界:
开始:通过SparkContext创建的RDD
结束:触发Action,一旦触发Action就形成了一个完整的DAG(Action,决定了多少个DAG)
为什么会有stage:
以宽依赖划分,分成一个个stage中(窄依赖中task可以),并行计算
怎么划分stage:
Spark(DAGScheduler)会根据shuffle/宽依赖来进行DAG划分,从后往前划分,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到当前的stage/阶段中(回溯算法)
Spark原理
执行流程:
流程总结:
Spark的构成
在Spark中,一个应用(Application)由一个任务控制节点(Driver)和若干个作业(Job)/DAG构成(以调用Action划分),一个作业/DAG由多个阶段(Stage)构成(以宽依赖划分),一个阶段由多个任务(Task)构成TaskSet。
DAGScheduler
划分Stage
重新提交出错/失败的Stage
将 Taskset 传给底层调度器
TaskScheduler
为每一个TaskSet构建一个TaskSetManager
数据本地性决定每个Task最佳位置(移动计算比移动数据更划算)
提交 taskset(一组task) 到集群运行并监控
推测执行,碰到 straggle(计算缓慢) 任务需要放到别的节点上重试
重新提交Shuffle输出丢失的Stage给DAGScheduler
Spark运行特点
每个Application获取专属自己的executor进程,多个应用进程隔离
Spark与资源管理器无关(Standalone、Yarn)
Client应该靠近Worker节点,因为行过程中SparkContext和Executor之间有大量的信息交换
Task采用了数据本地性和推测执行的优化机制(由TaskScheduler完成)
Executor上有一个BlockManager存储模块(存储中间结果)
累加器
引入:
RDD分布式集合,数据分发到各个分区,无法累加
目的:
累加器accumulators:累加器支持在所有不同节点之间进行累加计算(比如计数或者求和)
广播变量
广播变量broadcast variables:广播变量用来把变量在所有节点的内存之间进行共享,提高效率,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本。