弹性分布式数据集(resilient distributed dataset RDD
),它是一个只读的元素集合,Spark将元素在集群节点之间划分为多个部分(partitions
),然后进行并行操作。RDD一般保留到内存中,方便重复使用,包含血统信息,可以自动从故障节点中恢复。Spark 的所有工作都是围绕RDD进行
创建
创建RDD有两种方法:
- 并行驱动程序中的现有集合:调用
parallelize
方法 - 引用外部存储系统中的数据集(如共享文件系统,HDFS,Cassandra, HBase, Amazon S3或任何提供Hadoop InputFormat的数据源)
两种情况下都可以使用默认的partitions
数目,也可以手动设定数据集划分的数目
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> distData = sc.parallelize(data);
Spark可以从任何Hadoop支持的存储系统中创建分布式的数据集
val distFile = sc.textFile("data.txt")
JavaRDD<String> distFile = sc.textFile("data.txt");
操作
支持两种操作,类似java.util.stream
包中的intermediate
和terminal
操作
- 转换操作(
transformations
):用之前的数据集创建一个新的数据集(map
filter
等),所有的转换操作都是惰性求值的,当行动操作触发才进行处理, - 行动操作(
actions
):触发进行数据集计算,产生一个结果返回给驱动程序或者写到存储中(reduce
count
等)
惰性求值 Lazy Evaluation
不立即进行计算,直到使用它的时候才进行计算。
避免无用的计算,处理大量数据的时候可以减少内存占用
val lines = sc.textFile("data.txt")
val lineLengths = lines.map(s => s.length) //transformation
val totalLength = lineLengths.reduce((a, b) => a + b) //action
JavaRDD<String> lines = sc.textFile("data.txt");
JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
int totalLength = lineLengths.reduce((a, b) -> a + b);
Spark记录了不同RDD之间的依赖关系,叫做谱系图 lineage graph
,可以用这些信息进行RDD的计算,如果一部分部分持久化的数据丢失,可以重新计算,来恢复丢失的数据
函数名 | 功能 |
---|---|
map[U](f: (T) ⇒ U)(implicit arg0: ClassTag[U]): RDD[U] |
对每个元素使用map函数重新计算,返回一个新的RDD |
flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] |
先对所有元素应用该函数,遍历迭代器,添加到一个新的RDD |
union(other: RDD[T]): RDD[T] |
并集,包含相同元素 |
intersection(other: RDD[T]): RDD[T] |
交集 |
subtract(other: RDD[T]): RDD[T] |
相减 |
cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] |
笛卡尔积 |
reduce(f: (T, T) ⇒ T): T |
将所有元素结合到一起 |
aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U |
类似于reduce,但是可以产生一个不同类型的结果 |
foreach(f: T => Unit): Unit |
对每个元素执行相应的函数操作 |
RDD键值对操作
在Scala中,键值对为Tuple2
对象,键值对类型的RDD进行隐式转换,使其能够使用PairRDDFunctions
类中的函数
val lines = sc.textFile("data.txt")
val pairs = lines.map(s => (s, 1))
val counts = pairs.reduceByKey((a, b) => a + b)
Java使用scala.tuple2
表示键值对,创建new Tuple2(a, b)
,访问tuple._1()
,tuple._2()
,对应的RDD为JavaPairRDD
,可以使用JavaRDD
的mapTuple
,flatMapToPair
函数创建,或者使用parallelizePairs(java.util.List<scala.Tuple2<K,V>> list)
,从集合转换
static <K2,V2> JavaPairRDD<K2,V2> mapToPair(PairFunction<T,K2,V2> f)
JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
Pair RDD可以使用标准RDD的所有操作函数,此外还有键值对RDD特有的一些操作:
函数名 | 功能 |
---|---|
reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)] |
key相同的结合起来 |
mapValues(Function<V,U> f) |
改变键值对的值,不改变key |
combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)] |
依据三个主要函数,初始化createCombiner,分区内累加mergeValue,分区之间累加mergeCombiners,讲相同K值的元素结合到一起RDD[(K, V)] => RDD[(K, C)]
|
groupByKey(): RDD[(K, Iterable[V])] |
key相同的值分组到一个序列 |
groupBy(Function<T,U> f) |
通过函数计算key,其他的和 groupByKey 一致 |
cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] |
对多个RDD使用相同的key分组 |
join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] |
内连接 |
leftOuterJoin/rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] |
左/右连接 |
sortByKey() |
排序,被划分为转换操作,实际上是action操作 |
countByKey(): Map[K, Long] |
统计每个key对应的元素数目 |
collectAsMap() |
键值对RDD转为Map |
lookup(key: K): Seq[V] |
返回对应key的所有值序列 |
数据分区
合理的布局数据,可以减少网络通信,显著提高性能,同时分区数目也决定了程序的并发程度
Shuffle 操作
Shuffle是Spark重新分配数据的机制,重新组织分布在不同节点的数据,通常涉及机器之间数据复制,是非常耗时的操作。在shuffle操作过程中,中间数据会被自动的保存到spark.local.dir
目录下
引发Shuffle的操作:
- 重新/合并分区repartition operations :repartition, coalesce
- 按键进行的操作*ByKey operations : reduceByKey,groupByKey, combineByKey, groupBy等
- 连接操作join operations : cogroup, join
- 参数中带有
numPartitions
的转换操作可能会导致shuffle,例如distinct(numPartitions: Int)
coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T]
repartition(numPartitions: Int)(implicit ord: Ordering[T] = null)
如果是缩减RDD分区的数目,使用coalesce()
更加高效,因为它避免了Shuffle操作,repartition()
会进行Shuffle操作
合并分区时通过PartitionCoalescer
特质指定合并方式,多个Partition被分到一个PartitionGroup
中
Partitioner 分区器
对于K/V RDD,不能明确的控制每个K的工作节点,但是可以确保一组键在一些节点上同时出现(通过hash取模或者K的范围分配元素)。分区器只对K/V pair RDD有效,把每一个key映射到 partition ID(0 到 numPartitions - 1
)
默认有两种分区方式:范围分区RangePartitioner
和哈希分区HashPartitioner
设定分区:partitionBy(partitioner: Partitioner): RDD[(K, V)]
RDD类中包含分区器的信息,初始可能为空,用可选项类表示
- Scala : RDD中的字段
val partitioner: Option[Partitioner]
- Java : JavaPairRDD<K,V>中的函数
static Optional<Partitioner> partitioner()
需要按照Key通过网络进行Shuffle数据的操作,cogroup(), groupWith(), join(), leftOuterJoin(), rightOuterJoin(), groupByKey(), reduceByKey(), combineByKey(), lookup()
,分区都可以提升它的性能
例如reduceByKey
,先在本地计算reduce,减少了传输的数据量,cogroup
之类的多个RDD操作,至少有一个已经具有分区器的RDD不会进行shuffle
Spark内部知道每个操作如何影响分区
- 对数据进行分区的操作创建的RDD会自动设定分区器。例如
join
,具有相同key的元素被哈希到同一台机器上,分区器就是哈希分区器(hash-partitioned) - 对于不能保证确定分区的转换,产生的RDD不设定分区器,例如
map
操作,而mapValues()
分区器保留不变,因为K值不变
二元RDD的操作,具体设定分区器取决于父RDD分区器,默认是哈希分区器。如果有一个父RDD有分区器,它将继承该分区器; 如果两个父RDD都有,继承第一个分区器
自定义分区器
继承org.apache.spark.Partitioner
,实现以下三个方法:
- numPartitions: Int, 分区数目
- getPartition(key: Any): Int, 返回给定key的 partition ID (0 到 numPartitions-1)
- equals(), Java 判定相等的方法,用来决定分区方式是否相同
示例:通过域名分区
class DomainNamePartitioner(numParts: Int) extends Partitioner {
override def numPartitions: Int = numParts
override def getPartition(key: Any): Int = {
val domain = new Java.net.URL(key.toString).getHost()
val code = (domain.hashCode % numPartitions)
if (code < 0) {
code + numPartitions // Make it non-negative
} else {
code
}
}
// Java equals method to let Spark compare our Partitioner objects
override def equals(other: Any): Boolean = other match {
case dnp: DomainNamePartitioner =>
dnp.numPartitions == numPartitions
case _ =>
false
}
}
基于分区的操作
传入函数的输入是RDD元素的迭代器,输出是另一种类型的迭代器,每个分区只会调用一次该函数,通常用来避免过多的创建开销,
mapPartitions[U](f: (Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
mapPartitionsWithIndex[U](f: (Int, Iterator[T]) ⇒ Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit
其中index是每个分区的索引,迭代器可以获取对应分区内的所有元素
持久化
默认情况下,每次在RDD上执行一个action操作,RDD以及它依赖的RDD都会重新计算,在大数据集的情况下,这样避免浪费大量的存储空间来记录不会被重用的RDD
部分情况下(例如迭代算法和交互式命令)可能需要重复使用同一数据,使用persist() / cache()
,当数据第一次计算出来以后,可以将其存留在内存中,极大的提高速度
Spark有多种持久化的等级,可以根据需要,确定数据留存的位置是在磁盘、内存、堆外内存,是否进行序列化,是否进行复制
// 默认情况下的存储等级
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
-
MEMORY_ONLY
:默认情况,RDD存储在内存中,如果因为内存问题没有缓存,当再次需要的时候重新计算丢失的部分数据 -
MEMORY_AND_DISK
:内存不足够时数据写入到磁盘,需要时候读取,写入磁盘的数据需要序列化 -
MEMORY_ONLY_SER
:RDD作为序列化后的对象存储在内存中,节省空间,不过需要更多的计算 -
MEMORY_AND_DISK_SER
:类似MEMORY_ONLY_SER
,内存不足够时数据写入到磁盘,数据在内存和磁盘上都进行序列化 -
MEMORY_ONLY_2, MEMORY_AND_DISK_2
:在复制每个分区的数据到另个一个集群节点上,备份数目为2 -
OFF_HEAP
:使用堆外内存,即不受JVM垃圾回收器控制的内存
当存留数据较多,内存不足时,会采用LRU缓存策略,移除部分旧的数据,RDD.unpersist()
手动从缓存中移除
容错机制
RDD的容错机制是依靠 RDD Lineage 实现的,每个RDD都记录了其是如何由其他父RDD转换而来的,即依赖关系,当部分分区数据丢失时,通过血统信息重新运算,自动重建丢失的数据。
RDD.toDebugString
方法可以显示RDD Lineage 的情况
scala> val text = sc.textFile("/home/wdy/bin/spark-2.0.2-bin-hadoop2.7/README.md")
text: org.apache.spark.rdd.RDD[String] = /home/wdy/bin/spark-2.0.2-bin-hadoop2.7/README.md MapPartitionsRDD[3] at textFile at <console>:24
scala> text.toDebugString
res4: String =
(2) /home/wdy/bin/spark-2.0.2-bin-hadoop2.7/README.md MapPartitionsRDD[3] at textFile at <console>:24 []
| /home/wdy/bin/spark-2.0.2-bin-hadoop2.7/README.md HadoopRDD[2] at textFile at <console>:24 []
圆括号中的数字表示在对应阶段的并行水平(Partition的数目)
Checkpoint
对于包含宽依赖的很长的谱系图来说,单个节点的失败可能导致整个过程重新计算,可以设置检查点,将对应位置的信息保存到稳定的存储系统中(HDFS),使得出现错误时,计算开销减小
使用检查点时,首先调用SparkContext.setCheckpointDir(directory: String)
设定检查点的存储目录,非local模式下必须使用HDFS路径,然后调用RDD.checkpoint()
将RDD标记为一个检查点,内部会创建一个ReliableCheckpointRDD
记录相关文件信息,必须在RDD触发执行Job之前,调用checkpoint()
函数,因为检查点机制是在SparkContext.runJob
函数内部,Job执行完成后,调用rdd.doCheckpoint()
实现的,它把数据存到之前设定的目录文件中,返回一个RDDCheckpointData
对象代表该RDD
checkpoint vs cache
RDD被缓存后依旧会保留血统lineage,如果某些节点的缓存丢失,可以从头开始重新计算数据
检查点的RDD,血统完全丢失,不再记录依赖,没有重建rdd的信息,数据被保存在可靠的存储上
cache会最大化的保留数据,但不一定是绝对完整的,因为当内存不足时会腾出空间,例如默认指定的存储级别是MEMORY_ONLY
,当内存不足时,不会被写入磁盘,就会直接清除
如果设定了cache,那么会优先从本地BlockManager
内的MemoryStore
或DiskStore
内读取数据块,或者远程读取数据块,如果都没有读取到该数据块,就会进行计算,同时根据StorageLevel
还会将计算结果进行缓存
RDD 表示和实现
RDD主要的接口,每个具体的RDD都要实现相关的接口,后两个属性为可选:
- getPartitions: Array[Partition] :返回分区列表,partition是进行数据处理的单元,每个Task对应一个分区的数据计算,分区数目决定了并行计算数量,如果从 HDFS上读取,默认一个block对应一个分区
- iterator(split: Partition, context: TaskContext): Iterator[T]:获取RDD数据元素的迭代器(直接读取缓存或者计算
compute(split: Partition, context: TaskContext)
) - getDependencies: Seq[Dependency[_]]:返回RDD依赖的父RDD
- partitioner:分区器,决定了key-value对类型的RDD如何分区
- getPreferredLocations(split: Partition): Seq[String]:获取数据优先的节点位置,优先选择数据块所在的位置进行计算
计算
所有RDD的获取都是通过iterator
方法读取底层的数据,可能直接读取缓存或需要进行计算:
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
// 查看storageLevel,如果存储级别不是NONE,首先通过blockManager检查对应的block是否存在,如果没有,内部调用computeOrReadCheckpoint方法
getOrCompute(split, context)
} else {
//查看是否有checkPoint,如果有直接读取,否则就调用compute函数计算
computeOrReadCheckpoint(split, context)
}
}
Dependency
依赖关系分为两种类型
- narrow dependencies: 窄依赖,父RDD的每个分区最多被一个子RDD分区使用
- wide dependencies:宽依赖,多个子分区依赖一个父分区
窄依赖允许在一个节点上进行流水线执行,计算出所有的父分区,宽依赖需要对所有节点的数据进行读取,并使用类似MapReduce的操作在节点间进行混洗
窄依赖在节点故障后恢复效率更高,宽依赖一个单一的失败节点可能会导致依赖所有父分区的部分子分区丢失,需要完全重新执行恢复