注:本文是跟着《Spark快速大数据分析》一书学习的笔记式总结,不喜勿喷。
RDD(Resilient Distributed Dataset)弹性分布式数据集是Spark对数据的核心抽象。RDD是一个不可变的分布式对象集合。每一个RDD都会被分成多个分区,不同的分区运行在集群的不同节点,这就构成了Spark的分布式计算模型。
RDD创建
我们可以通过两种方式创建RDD。一种方式是直接读取外部数据,这在我们实际使用中较常用,另一种是在驱动程序中分发驱动器程序中的对象集合(List或set),一般调试中会使用。
直接读取外部数据
JavaRDD<String> lines = sc.textFile("/path/to/README.md");
JavaRDD<String> lines1 = ctx.textFile("/path/to/README.md",1);//每次读取一行(类似Hadoop)
对一个集合进行并行化
JavaRDD<String> lines = sc.parallelize(Arrays.asList("pandas", "i like pandas"));
RDD操作
RDD支持两种操作:转化和行动。转化会生成一个新的RDD,如上面所说,RDD是不可变的,原来的RDD依然存在,只是在它的基础上,生成了一个新的RDD。行动会返回其他的数据类型,很多时候我们将其作为最后结果输出。实际上,转化操作是惰性的,只有在行动操作中,之前的转化操作才会真正执行。这可以避免多余的计算。
转化操作
许多转化操作都是针对各个元素的,也就是说,这些转化操作每次只会操作RDD 中的一个元素。不过并不是所有的转化操作都是这样的。
通过转化操作,从老的RDD中生成新的RDD,Spark会使用==谱系图==来记录不同的RDD之间的依赖关系。依靠谱系图,Spark可以在某些RDD计算出问题后,恢复出丢失的信息。
Java 版计算RDD 中各值的平方
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4));
JavaRDD<Integer> result = rdd.map(new Function<Integer, Integer>() {
public Integer call(Integer x) { return x*x; }
});
System.out.println(StringUtils.join(result.collect(), ","));
Java 中的flatMap() 将行数据切分为单词
JavaRDD<String> lines = sc.parallelize(Arrays.asList("hello world", "hi"));
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String line) {
return Arrays.asList(line.split(" "));
}
});
words.first(); // 返回"hello"
我们的RDD 中最常缺失的集合属性是元素的唯一性,因为常常有重复的元素。如果只
要唯一的元素,我们可以使用RDD.distinct() 转化操作来生成一个只包含不同元素的新
RDD。不过需要注意,distinct() 操作的开销很大,因为它需要将所有数据通过网络进行
混洗(shuffle),以确保每个元素都只有一份。
表:对一个数据为{1, 2, 3, 3}的RDD进行基本的RDD转化操作
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
map() | 将函数应用于RDD 中的每个元素,将返回值构成新的RDD | rdd.map(x => x + 1) | {2, 3, 4, 4} |
flatMap() | 将函数应用于RDD 中的每个元素,将返回的迭代器的所有内容构成新的RDD。通常用来切分单词 | rdd.flatMap(x => x.to(3)) | {1, 2, 3, 2, 3, 3, 3} |
filter() | 返回一个由通过传给filter()的函数的元素组成的RDD | rdd.filter(x => x != 1) | {2, 3, 3} |
distinct() | 去重 | rdd.distinct() | {1, 2, 3} |
sample(withReplacement, fraction, [seed]) | 对RDD 采样,以及是否替换 | rdd.sample(false, 0.5) | 非确定的 |
表:对数据分别为{1, 2, 3}和{3, 4, 5}的RDD进行针对两个RDD的转化操作
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
union() | 生成一个包含两个RDD 中所有元素的RDD | rdd.union(other) | {1, 2, 3, 3, 4, 5} |
intersection() | 求两个RDD 共同的元素的RDD | rdd.intersection(other) | {3} |
subtract() | 移除一个RDD 中的内容(例如移除训练数据) | rdd.subtract(other) | {1, 2} |
cartesian() | 与另一个RDD 的笛卡儿积 | rdd.cartesian(other) | {(1, 3), (1, 4), ...(3, 5)} |
行动操作
行动操作会把最终求得的结果返回到驱动器程序,或者写入外部存储系统中。它往往会触发转化RDD的运行。
调试小数据集的情况下,我们可以使用RDD的collection()函数,获取整个RDD中的数据。但这又一个前提,要单击内存能够成功存下这所有的数据才行。生产环境下这显然是不现实的。我们一般将数据写到HDFS或Amazon S3这种分布式的存储系统中。
Java 中的reduce()
Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer x, Integer y) { return x + y; }
});
Java 中的aggregate()
class AvgCount implements Serializable {
public AvgCount(int total, int num) {
this.total = total;
this.num = num;
}
public int total;
public int num;
public double avg() {
return total / (double) num;
}
}
Function2<AvgCount, Integer, AvgCount> addAndCount =
new Function2<AvgCount, Integer, AvgCount>() {
public AvgCount call(AvgCount a, Integer x) {
a.total += x;
a.num += 1;
return a;
}
};
Function2<AvgCount, AvgCount, AvgCount> combine =
new Function2<AvgCount, AvgCount, AvgCount>() {
public AvgCount call(AvgCount a, AvgCount b) {
a.total += b.total;
a.num += b.num;
return a;
}
};
AvgCount initial = new AvgCount(0, 0);
AvgCount result = rdd.aggregate(initial, addAndCount, combine);
System.out.println(result.avg());
表:对一个数据为{1, 2, 3, 3}的RDD进行基本的RDD行动操作
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
collect() | 返回RDD 中的所有元素 | rdd.collect() | {1, 2, 3, 3} |
count() | RDD 中的元素个数 | rdd.count() | 4 |
countByValue() | 各元素在RDD 中出现的次数 | rdd.countByValue() | {(1, 1),(2, 1),(3, 2)} |
take(num) | 从RDD 中返回num 个元素 | rdd.take(2) | {1, 2} |
top(num) | 从RDD 中返回最前面的num个元素 | rdd.top(2) | {3, 3} |
takeOrdered(num)(ordering) | 从RDD 中按照提供的顺序返回最前面的num 个元素 | rdd.takeOrdered(2)(myOrdering) | {3, 3} |
takeSample(withReplacement, num, [seed]) | 从RDD中返回任意一些元素 | rdd.takeSample(false, 1) | 非确定的 |
reduce(func) | 并行整合RDD 中所有数据(例如sum) | rdd.reduce((x, y) => x + y) | 9 |
fold(zero)(func) | 和reduce() 一样, 但是需要提供初始值 | rdd.fold(0)((x, y) => x + y) | 9 |
aggregate(zeroValue)(seqOp, combOp) | 和reduce() 相似, 但是通常返回不同类型的函数 | rdd.aggregate((0, 0)) ((x, y) =>(x._1 + y, x._2 + 1),(x, y) =>(x._1 + y._1, x._2 + y._2)) | (9,4) |
foreach(func) | 对RDD 中的每个元素使用给定的函数 | rdd.foreach(func) | 无 |
持久化
对于Spark,虽然RDD是惰性求值的,但是,如果简单地对其调用行动操作,多次使用某一个RDD时就会重复计算。为了避免多次重新计算一个RDD,我们可以将其持久化。
表:org.apache.spark.storage.StorageLevel和pyspark.StorageLevel中的持久化级
别
级 别 | 使用的空间 | CPU时间 | 是否在内存中 | 是否在磁盘上 | 备 注 |
---|---|---|---|---|---|
MEMORY_ONLY | 高 | 低 | 是 | 否 | |
MEMORY_ONLY_SER | 低 | 高 | 是 | 否 | |
MEMORY_AND_DISK | 高 | 中等 | 部分 | 部分 | 如果数据在内存中放不下,则溢写道磁盘上 |
MEMORY_AND_DISK_SER | 低 | 高 | 部分 | 部分 | 如果数据在内存中存不下,则溢写道磁盘上。在内存中存放序列化后的数据 |
DISK_ONLY | 低 | 高 | 否 | 是 |
我们在第一次对这个RDD 调用行动操作前就调用了persist() 方法。==persist() 调
用本身不会触发强制求值==。
RDD 还有一个方法叫作unpersist(),调用该方法可以手动把持久化的RDD 从缓
存中移除。
Java在不同RDD类型间转换
在Java 中,各种RDD 的特殊类型间的转换更为明确。Java 中有两个专门的类JavaDoubleRDD
和JavaPairRDD,来处理特殊类型的RDD,这两个类还针对这些类型提供了额外的函数。
这让你可以更加了解所发生的一切,但是也显得有些累赘。
要构建出这些特殊类型的RDD,需要使用特殊版本的类来替代一般使用的Function 类。如果
要从T 类型的RDD 创建出一个DoubleRDD,我们就应当在映射操作中使用DoubleFunction<T>
来替代Function<T, Double>。表3-5 展示了一些特殊版本的函数类及其用法。
此外,我们也需要调用RDD 上的一些别的函数(因此不能只是创建出一个DoubleFunction
然后把它传给map())。当需要一个DoubleRDD 时,我们应当调用mapToDouble() 来替代
map(),跟其他所有函数所遵循的模式一样。
表3-5:Java中针对专门类型的函数接口
函数名 | 等价函数 | 用途 |
---|---|---|
DoubleFlatMapFunction<T> | Function<T, Iterable<Double>> | 用于flatMapToDouble,以生成DoubleRDD |
DoubleFunction<T> | Function<T, Double> | 用于mapToDouble,以生成DoubleRDD |
PairFlatMapFunction<T, K, V> | Function<T, Iterable<Tuple2<K, V>>> | 用于flatMapToPair,以生成PairRDD<K, V> |
PairFunction<T, K, V> | Function<T, Tuple2<K, V>> | 用于mapToPair,以生成PairRDD<K, V> |
例:用Java 创建DoubleRDD
JavaDoubleRDD result = rdd.mapToDouble(
new DoubleFunction<Integer>() {
public double call(Integer x) {
return (double) x * x;
}
});
System.out.println(result.mean());
键值对RDD
众所周知,Hadoop的MapReduce一般处理键值对数据。Map输出,Reduce输入和输出都是Key-Value形式。在Spark中我们也支持键值对形式。并且,Spark的键值对Rdd相对MapReduce的键值对来说,操作更为简单,形式更加复杂多样。Spark 为包含键值对类型的RDD 提供了一些专有的操作。这些RDD 被称为pair RDD。
创建键值对RDD
Java 没有自带的二元组类型,因此Spark 的Java API 让用户使用scala.Tuple2 类来创建二
元组。这个类很简单:Java 用户可以通过new Tuple2(elem1, elem2) 来创建一个新的二元
组,并且可以通过._1() 和._2() 方法访问其中的元素。
在Java 中使用第一个单词作为键创建出一个pair RDD
PairFunction<String, String, String> keyData =
new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String x) {
return new Tuple2(x.split(" ")[0], x);
}
};
JavaPairRDD<String, String> pairs = lines.mapToPair(keyData);
表4-1:Pair RDD的转化操作(以键值对集合{(1, 2), (3, 4), (3, 6)}为例)
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
reduceByKey(func) | 合并具有相同键的值 | rdd.reduceByKey((x, y) => x + y) | {(1,2),(3,10)} |
groupByKey() | 对具有相同键的值进行分组 | rdd.groupByKey() | {(1,[2]),(3, [4,6])} |
combineByKey( createCombiner,mergeValue,mergeCombiners,partitioner) | 使用不同的返回类型合并具有相同键的值 | ||
mapValues(func) | 对pair RDD 中的每个值应用一个函数而不改变键 | rdd.mapValues(x =>x+1) | {(1,3), (3,5), (3,7)} |
flatMapValues(func) | 对pair RDD 中的每个值应用一个返回迭代器的函数,然后对返回的每个元素都生成一个对应原键的键对记录。通常用于符号化 | rdd.flatMapValues(x => (x to 5)) | {(1,2), (1,3), (1,4), (1,5), (3,4), (3,5)} |
keys() | 返回一个仅包含键的RDD | rdd.keys() | {1, 3,3} |
values() | 返回一个仅包含值的RDD | rdd.values() | {2, 4,6} |
sortByKey() | 返回一个根据键排序的RDD | rdd.sortByKey() | {(1,2), (3,4), (3,6)} |
表4-2:针对两个pair RDD的转化操作(rdd = {(1, 2), (3, 4), (3, 6)}other = {(3, 9)})
函数名 | 目的 | 示例 | 结果 |
---|---|---|---|
subtractByKey | 删掉RDD 中键与other RDD 中的键相同的元素 | rdd.subtractByKey(other) | {(1, 2)} |
join | 对两个RDD 进行内连接 | rdd.join(other) | {(3, (4, 9)), (3,(6, 9))} |
rightOuterJoin | 对两个RDD 进行连接操作,确保第一个RDD 的键必须存在(右外连接) | rdd.rightOuterJoin(other) | {(3,(Some(4),9)),(3,(Some(6),9))} |
leftOuterJoin | 对两个RDD 进行连接操作,确保第二个RDD的键必须存在(左外连接) | rdd.leftOuterJoin(other) | {(1,(2,None)), (3,(4,Some(9))), (3,(6,Some(9)))} |
cogroup | 将两个RDD 中拥有相同键的数据分组到一起 | rdd.cogroup(other) | {(1,([2],[]), (3,([4, 6],[9]))} |
combineByKey()
combineByKey()是最为常用的基于键进行聚合的函数。大多数基于键聚合的函数都是用它
实现的。和aggregate() 一样,combineByKey() 可以让用户返回与输入数据的类型不同的
返回值。
要理解combineByKey(), 要先理解它在处理数据时是如何处理每个元素的。由于
combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就
和之前的某个元素的键相同。
如果这是一个新的元素,combineByKey() 会使用一个叫作createCombiner() 的函数来创建
那个键对应的累加器的初始值。需要注意的是,这一过程会在每个分区中第一次出现各个
键时发生,而不是在整个RDD 中第一次出现一个键时发生。
如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue() 方法将该键的累
加器对应的当前值与这个新的值进行合并。
由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更
多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners() 方法将各
个分区的结果进行合并。
如果已知数据在进行combineByKey() 时无法从map 端聚合中获益的话,可以
禁用它。例如,由于聚合函数(追加到一个队列)无法在map 端聚合时节约
任何空间,groupByKey() 就把它禁用了。如果希望禁用map 端组合,就需要
指定分区方式。就目前而言,你可以通过传递rdd.partitioner 来直接使用
源RDD 的分区方式。
例:在Java 中使用combineByKey() 求每个键对应的平均值
public static class AvgCount implements Serializable {
public AvgCount(int total, int num) { total_ = total; num_ = num; }
public int total_;
public int num_;
public float avg() { returntotal_/(float)num_; }
}
Function<Integer, AvgCount> createAcc = new Function<Integer, AvgCount>() {
public AvgCount call(Integer x) {
return new AvgCount(x, 1);
}
};
Function2<AvgCount, Integer, AvgCount> addAndCount =
new Function2<AvgCount, Integer, AvgCount>() {
public AvgCount call(AvgCount a, Integer x) {
a.total_ += x;
a.num_ += 1;
return a;
}
};
Function2<AvgCount, AvgCount, AvgCount> combine =
new Function2<AvgCount, AvgCount, AvgCount>() {
public AvgCount call(AvgCount a, AvgCount b) {
a.total_ += b.total_;
a.num_ += b.num_;
return a;
}
};
AvgCount initial = new AvgCount(0,0);
JavaPairRDD<String, AvgCount> avgCounts =
nums.combineByKey(createAcc, addAndCount, combine);
Map<String, AvgCount> countMap = avgCounts.collectAsMap();
for (Entry<String, AvgCount> entry : countMap.entrySet()) {
System.out.println(entry.getKey() + ":" + entry.getValue().avg());
}
Pair RDD的行动操作
表:Pair RDD的行动操作(以键值对集合{(1, 2), (3, 4), (3, 6)}为例)
函数 | 描述 | 示例 | 结果 |
---|---|---|---|
countByKey() | 对每个键对应的元素分别计数 | rdd.countByKey() | {(1, 1), (3, 2)} |
collectAsMap() | 将结果以映射表的形式返回,以便查询 | rdd.collectAsMap() | Map{(1,2), (3,4), (3, 6)} |
lookup(key) | 返回给定键对应的所有值 | rdd.lookup(3) | [4, 6] |