RDD编程
算子类型
- 转换算子: 返回一个新的RDD类型的数据,转换算子都是lazy模式,直到遇见执行算子才执行
- 执行算子: 无返回或者返回一个非RDD类型的数据
- 持久化算子: 将数据持久化或者缓存到内存中,持久化和缓存都是lazy模式的
转换算子
-
创建:
- 从集合: parallelize和makeRdd,makeRdd底层调用了parallelize,使用了默认的分片数
- 从外部: textfile
- 从其他: flatmap等
-
转换:
- value类型:
map,mapPartitions,mapPartitionswithIndex,flatmap,glom(同分区放一起),groupBy,filter,sample,distinct(map,reducebykey,map),coalesce,reparation,sortby,pipe(调用脚本) - 双value类型:
union(并集),subtract(子集),intersection(交集),cartesian(笛卡尔积),zip(压缩) - key-value类型:
partitionby,groupbykey,reducebykey,aggregatebykey,foldbykey,combinebykey,sortbykey,mapvalues,join,cogroup
- value类型:
-
补充:
- xxx和xxxpartitions的区别
- xxx函数里面是分区内的每个元素,适用于大量元素,不适用启动资源,xxxpartitons里面是每个分区,适用于启动资源,不适用于大量元素
- coalesce和repartition的区别
- reparation底层调用了coalesce,默认进行了shuffle,而coalesce是可以选择是否进行shuffle
- reducebykey和groupbykey的区别
- reducebykey在shuffle之前进行了combine操作,groupbykey没有进行combine操作,直接进行了shuffle
- join和cogroup的区别
- cogroup会对相同的key进行聚合操作,join不会
- xxx和xxxpartitions的区别
执行算子
- reduce,collect,count,first,take,takeordered,aggregate,fold
- take和top和takeordered的区别
- take取出,top降序取出,takeordered升序取出
持久化算子
- saveastextfile,saveassequencefile,saveasobjectfile,countbykey,foreach,
函数传递
- 意义:
对RDD进行操作时,初始化工作是在Driver中进行的,而实际运行程序是在excuter中进行的,这就涉及到了跨进程通讯,是需要序列化的, - 解决方案:
- 传递一个方法: 将类继承scala.Serializable接口即可
- 传递一个变量: 将类变量改为局部变量也可以
依赖关系
概念:
依赖关系也叫做lineage血缘关系,即在rdd创建及转换时记录下来的各个Rdd的依赖关系意义:
lineage会记录RDD元数据的信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的分区数据,而不用全部分区重新计算-
分类:
- 宽依赖: 一个父分区对应了多个子分区的数据,产生了shuffle过程,需要等待parent Rdd的数据
- 窄依赖: 一个父分区只对应一个子分区的数据,没有产生shuffle过程,可以并行计算
-
DAG:
tranform算子是lazy模式的,当原始RDD经过一系列的transform之后,就形成了一个DAG,根据DAG基于是否需要shuffle来划分stage,shuffle之前相当于map,前后无关系,可以并行执行,优化执行算法,shuffle之后各个节点数据只有当parent RDD执行完毕之后才可以执行,因此是否产生shuffle过程,即窄依赖就是DAG划分stage的依据- Spark会根据shuffle/宽依赖使用回溯算法来对DAG进行Stage划分,从后往前,遇到窄依赖就断开,遇到窄依赖就把当前的RDD加入到当前的stage/阶段中,底层使用的数据结构是栈
-
任务划分:
- application:一个程序包就是一个application,包含一个driver和一个或多个job
- job: 遇见一个action算子,就会拆分为一个job,一个job就是一个DAG
- stage: 一个job/DAG遇见一次宽依赖,就会划分为一个stage,一个stage就是一个taskset
- task: 一个转换算子就是一个task,一个taskset包含多个task
持久化和检查点
-
持久化:
概念:
通过persist方法或者cache方法可以将前面的计算结果进行缓存,cache底层调用了persist方法,默认情况下persist方法会将数据以序列化的形式缓存在jvm的堆空间中,这两个方法也是lazy模式的,当遇到action算子时才会触发,触发后会将该rdd缓存在计算节点的内存中,供后续使用-
级别:
可以在下面存储级别的末尾添加_2,来将数据缓存为两份- memory_only: 默认
- memory_only_ser
- memory_and_disk: 常用
- memory_and_disk_ser
- disk_only
意义:
数据是缓存在计算节点的内存中,也会可能发生丢失现象,若发生丢失现象,可根据RDD的依赖关系恢复丢失的分区数据
-
检查点:
- 概念:
通过lineage血缘关系做容错辅助,设置检查点目录,检查点将数据写入到hdfs文件系统,在检查点目录下创建一个二进制文件,只记录rdd数据并移除rdd的血缘关系。检查点操作也是lazy模式的,遇见action算子才会触发 - 意义:
lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的rdd开始重组lineage,就会减少开销
- 概念:
- 补充:
- 持久化persist和检查点checkpoint的区别:
- 持久化persist是将数据缓存在计算节点的内存中,保留了RDD数据及其依赖关系,性能较好,但数据容易丢失,应用程序结束后自动释放
- 检查点checkpoint是将数据持久化在hdfs上的,只保留了RDD数据,未保留依赖关系,性能较差,数据不容易丢失,需要手动释放删除
- 在进行故障恢复时,Spark会对读取Checkpoint的开销和重新计算RDD分区的开销进行比较,从而自动选择最优的恢复策略。
- 持久化persist和检查点checkpoint的区别:
分区器
分类: 仅支持hash分区,range分区,自定义分区
作用: spark中的分区器直接决定了RDD中的分区个数,RDD中的每条数据经过shuffle过程属于哪个分区和reduce的个数
-
注意事项:
- 只有Key-Value类型的RDD才有分区器的,非Key-Value类型的RDD分区器的值是None
- 每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的
hash分区
- 分类: 仅支持hash分区,range分区,自定义分区
- 作用: spark中的分区器直接决定了RDD中的分区个数,RDD中的每条数据经过shuffle过程属于哪个分区和reduce的个数
- 注意事项:
- 只有Key-Value类型的RDD才有分区器的,非Key-Value类型的RDD分区器的值是None
- 每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的
range分区
-
原理: 将一定范围内的数据映射到某一个分区内
- 先从整个rdd中抽取样本数据,将样本数据排序,计算出每个分区最大的key值,形成一个数组变量
- 判断每条数据的key在数组变量中所处的范围,给出该key值在下一个rdd中的分区id下标
缺点: 该分区器要求rdd中的key类型必须是可以排序的
自定义分区
- 继承org.apache.spark.Partitioner类并实现numPartitions,getPartition,equals三个方法