Spark 为包含键值对类型的 RDD 提供了一些专有的操作。这些 RDD 被称为 pair RDD1。Pair RDD 是很多程序的构成要素,因为它们提供了并行操作各个键或跨节点重新进行数据分组 的操作接口。例如,pair RDD 提供 reduceByKey() 方法,可以分别归约每个键对应的数据, 还有 join() 方法,可以把两个 RDD 中键相同的元素组合到一起,合并为一个 RDD。我们 通常从一个 RDD 中提取某些字段(例如代表事件时间、用户 ID 或者其他标识符的字段), 并使用这些字段作为 pair RDD 操作中的键。
创建Pair RDD
在 Spark 中有很多种创建 pair RDD 的方式。第 5 章会讲到,很多存储键值对的数据格式会 在读取时直接返回由其键值对数据组成的 pair RDD。此外,当需要把一个普通的 RDD 转 为 pair RDD 时,可以调用 map() 函数来实现,传递的函数需要返回键值对。后面会展示如 何将由文本行组成的 RDD 转换为以每行的第一个单词为键的 pair RDD。
构建键值对 RDD 的方法在不同的语言中会有所不同。在 Python 中,为了让提取键之后的 数据能够在函数中使用,需要返回一个由二元组组成的 RDD(见例 4-1)。
例 4-1:在 Python 中使用第一个单词作为键创建出一个 pair RDD pairs = lines.map(lambda x: (x.split(" ")[0], x))
在 Scala 中,为了让提取键之后的数据能够在函数中使用,同样需要返回二元组(见例 4-2)。隐式转换可以让二元组 RDD 支持附加的键值对函数。
例 4-2:在 Scala 中使用第一个单词作为键创建出一个 pair RDD val pairs = lines.map(x => (x.split(" ")(0), x))
Java 没有自带的二元组类型,因此 Spark 的 Java API 让用户使用 scala.Tuple2 类来创建二 元组。这个类很简单:Java 用户可以通过 new Tuple2(elem1, elem2) 来创建一个新的二元 组,并且可以通过 ._1() 和 ._2() 方法访问其中的元素。
Java 用户还需要调用专门的 Spark 函数来创建 pair RDD。例如,要使用 mapToPair() 函数 来代替基础版的 map() 函数,这在 3.5.2 节中的“Java”一节有过更详细的讨论。下面通过 例 4-3 中展示一个简单的例子。
例 4-3:在 Java 中使用第一个单词作为键创建出一个 pair RDD
PairFunction<String, String, String> keyData =
new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String x) {
return new Tuple2(x.split(" ")[0], x);
}
};
JavaPairRDD<String, String> pairs = lines.mapToPair(keyData);
当用 Scala 和 Python 从一个内存中的数据集创建 pair RDD 时,只需要对这个由二元组组成 的集合调用 SparkContext.parallelize() 方法。而要使用 Java 从内存数据集创建 pair RDD 的话,则需要使用 SparkContext.parallelizePairs()。
4.3 Pair RDD的转化操作
Pair RDD 可以使用所有标准 RDD 上的可用的转化操作。3.4 节中介绍的所有有关传递函数的规则也都同样适用于 pair RDD。由于 pair RDD 中包含二元组,所以需要传递的函数应 当操作二元组而不是独立的元素。表 4-1 和表 4-2 总结了对 pair RDD 的一些转化操作
Pair RDD 也还是 RDD(元素为 Java 或 Scala 中的 Tuple2 对象或 Python 中的元组),因此 同样支持 RDD 所支持的函数。例如,我们可以拿前一节中的 pair RDD,筛选掉长度超过 20 个字符的行,如例 4-4 至例 4-6 以及图 4-1 所示。
例 4-4:用 Python 对第二个元素进行筛选
result = pairs.filter(lambda keyValue: len(keyValue[1]) < 20)
例 4-5:用 Scala 对第二个元素进行筛选 pairs.filter{case (key, value) => value.length < 20}
例 4-6:用 Java 对第二个元素进行筛选
Function<Tuple2<String, String>, Boolean> longWordFilter =
new Function<Tuple2<String, String>, Boolean>() {
public Boolean call(Tuple2<String, String> keyValue) {
return (keyValue._2().length() < 20);
} };
JavaPairRDD<String, String> result = pairs.filter(longWordFilter);
有时,我们只想访问 pair RDD 的值部分,这时操作二元组很麻烦。由于这是一种常见的 使用模式,因此 Spark 提供了 mapValues(func) 函数,功能类似于 map{case (x, y): (x, func(y))}。可以在很多例子中使用这个函数。
4.3.1 聚合操作
当数据集以键值对形式组织的时候,聚合具有相同键的元素进行一些统计是很常见的操 作。之前讲解过基础 RDD 上的 fold()、combine()、reduce() 等行动操作,pair RDD 上则 有相应的针对键的转化操作。Spark 有一组类似的操作,可以组合具有相同键的值。这些 操作返回 RDD,因此它们是转化操作而不是行动操作。
reduceByKey() 与 reduce() 相当类似;它们都接收一个函数,并使用该函数对值进行合并。 reduceByKey() 会为数据集中的每个键进行并行的归约操作,每个归约操作会将键相同的值合 并起来。因为数据集中可能有大量的键,所以 reduceByKey() 没有被实现为向用户程序返回一 个值的行动操作。实际上,它会返回一个由各键和对应键归约出来的结果值组成的新的 RDD。
例 4-8:在 Scala 中使用 reduceByKey() 和 mapValues() 计算每个键对应的平均值 rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
例 4-10:用 Scala 实现单词计数
val input = sc.textFile("s3://...")
val words = input.flatMap(x => x.split(" "))
val result = words.map(x => (x, 1)).reduceByKey((x, y) => x + y)
例 4-11:用 Java 实现单词计数
JavaRDD<String> input = sc.textFile("s3://...")
JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String x) { return Arrays.asList(x.split(" ")); }
});
JavaPairRDD<String, Integer> result = words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String x) { return new Tuple2(x, 1); }
}).reduceByKey(
new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});
事实上,我们可以对第一个 RDD 使用 countByValue() 函数,以更快地实现 单词计数:
input.flatMap(x => x.split(" ")).countByValue()。
combineByKey() 是最为常用的基于键进行聚合的函数。大多数基于键聚合的函数都是用它 实现的。和 aggregate() 一样,combineByKey() 可以让用户返回与输入数据的类型不同的 返回值。
要理解 combineByKey(),要先理解它在处理数据时是如何处理每个元素的。由于 combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就 和之前的某个元素的键相同。
如果这是一个新的元素,combineByKey() 会使用一个叫作 createCombiner() 的函数来创建 那个键对应的累加器的初始值。需要注意的是,这一过程会在每个分区中第一次出现各个 键时发生,而不是在整个 RDD 中第一次出现一个键时发生。
如果这是一个在处理当前分区之前已经遇到的键,它会使用 mergeValue() 方法将该键的累 加器对应的当前值与这个新的值进行合并。
由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更 多的分区都有对应同一个键的累加器,就需要使用用户提供的 mergeCombiners() 方法将各 个分区的结果进行合并。
combineByKey() 有多个参数分别对应聚合操作的各个阶段,因而非常适合用来解释聚合操 作各个阶段的功能划分。为了更好地演示 combineByKey() 是如何工作的,下面来看看如何 计算各键对应的平均值,
例 4-13:在 Scala 中使用 combineByKey() 求每个键对应的平均值
val result = input.combineByKey(
(v) => (v, 1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
).map{ case (key, value) => (key, value._1 / value._2.toFloat) }
result.collectAsMap().map(println(_))
例 4-14:在 Java 中使用 combineByKey() 求每个键对应的平均值
public static class AvgCount implements Serializable {
public AvgCount(int total, int num) { total_ = total; num_ = num; }
public int total_;
public int num_;
public float avg() { returntotal_/(float)num_; }
}
Function<Integer, AvgCount> createAcc = new Function<Integer, AvgCount>() {
public AvgCount call(Integer x) {
return new AvgCount(x, 1);
}
};
Function2<AvgCount, Integer, AvgCount> addAndCount =
new Function2<AvgCount, Integer, AvgCount>() {
public AvgCount call(AvgCount a, Integer x) {
a.total_ += x;
a.num_ += 1;
return a;
} };
Function2<AvgCount, AvgCount, AvgCount> combine =
new Function2<AvgCount, AvgCount, AvgCount>() {
public AvgCount call(AvgCount a, AvgCount b) {
a.total_ += b.total_;
a.num_ += b.num_;
return a;
} };
AvgCount initial = new AvgCount(0,0);
JavaPairRDD<String, AvgCount> avgCounts =
nums.combineByKey(createAcc, addAndCount, combine);
Map<String, AvgCount> countMap = avgCounts.collectAsMap();
for (Entry<String, AvgCount> entry : countMap.entrySet()) {
System.out.println(entry.getKey() + ":" + entry.getValue().avg());
}
有很多函数可以进行基于键的数据合并。它们中的大多数都是在 combineByKey() 的基础上实现的,为用户提供了更简单的接口。不管怎样,在 Spark 中使用这些专用的聚合函数,始终要比手动将数据分组再归约快很多。
并行度调优
到目前为止,我们已经讨论了所有的转化操作的分发方式,但是还没有探讨 Spark 是怎样 确定如何分割工作的。每个 RDD 都有固定数目的分区,分区数决定了在 RDD 上执行操作 时的并行度。
在执行聚合或分组操作时,可以要求 Spark 使用给定的分区数。Spark 始终尝试根据集群的大小推断出一个有意义的默认值,但是有时候你可能要对并行度进行调优来获取更好的性能表现。
本章讨论的大多数操作符都能接收第二个参数,这个参数用来指定分组结果或聚合结果的 RDD 的分区数,如例 4-15 和例 4-16 所示。
例 4-15:在 Python 中自定义 reduceByKey() 的并行度
data = [("a", 3), ("b", 4), ("a", 1)] sc.parallelize(data).reduceByKey(lambda x, y: x + y) # 默认并行度 sc.parallelize(data).reduceByKey(lambda x, y: x + y, 10) # 自定义并行度
例 4-16:在 Scala 中自定义 reduceByKey() 的并行度
val data = Seq(("a", 3), ("b", 4), ("a", 1)) sc.parallelize(data).reduceByKey((x, y) => x + y) // 默认并行度 sc.parallelize(data).reduceByKey((x, y) => x + y, 10) // 自定义并行度
有时,我们希望在除分组操作和聚合操作之外的操作中也能改变 RDD 的分区。对于 这样的情况,Spark 提供了 repartition() 函数。它会把数据通过网络进行混洗,并创 建出新的分区集合。切记,对数据进行重新分区是代价相对比较大的操作。Spark 中也 有一个优化版的 repartition(),叫作 coalesce()。你可以使用 Java 或 Scala 中的 rdd. partitions.size() 以及 Python 中的 rdd.getNumPartitions 查看 RDD 的分区数,并确保调 用 coalesce() 时将 RDD 合并到比现在的分区数更少的分区中。
4.3.2 数据分组
对于有键的数据,一个常见的用例是将数据根据键进行分组——比如查看一个顾客的所有订单。
如果数据已经以预期的方式提取了键,groupByKey() 就会使用 RDD 中的键来对数据进行 分组。对于一个由类型 K 的键和类型 V 的值组成的 RDD,所得到的结果 RDD 类型会是 [K, Iterable[V]]。
groupBy() 可以用于未成对的数据上,也可以根据除键相同以外的条件进行分组。它可以 接收一个函数,对源 RDD 中的每个元素使用该函数,将返回结果作为键再进行分组。
4.3.3 连接
将有键的数据与另一组有键的数据一起使用是对键值对数据执行的最有用的操作之一。连 接数据可能是 pair RDD 最常用的操作之一。连接方式多种多样:右外连接、左外连接、交 叉连接以及内连接。
普通的 join 操作符表示内连接 2。只有在两个 pair RDD 中都存在的键才叫输出。当一个输 入对应的某个键有多个值时,生成的 pair RDD 会包括来自两个输入 RDD 的每一组相对应 的记录。例 4-17 可以帮你理解这个定义。
例 4-17:在 Scala shell 中进行内连接
storeAddress = {
(Store("Ritual"), "1026 Valencia St"), (Store("Philz"), "748 Van Ness Ave"),
(Store("Philz"), "3101 24th St"), (Store("Starbucks"), "Seattle")}
storeRating = {
(Store("Ritual"), 4.9), (Store("Philz"), 4.8))}
storeAddress.join(storeRating) == {
(Store("Ritual"), ("1026 Valencia St", 4.9)),
(Store("Philz"), ("748 Van Ness Ave", 4.8)),
(Store("Philz"), ("3101 24th St", 4.8))}
4.3.4 数据排序
很多时候,让数据排好序是很有用的,尤其是在生成下游输出时。如果键有已定义的顺 序,就可以对这种键值对 RDD 进行排序。当把数据排好序后,后续对数据进行 collect() 或 save() 等操作都会得到有序的数据。
我们经常要将 RDD 倒序排列,因此 sortByKey() 函数接收一个叫作 ascending 的参数,表 示我们是否想要让结果按升序排序(默认值为 true)。有时我们也可能想按完全不同的排 序依据进行排序。要支持这种情况,我们可以提供自定义的比较函数。
例 4-19:在 Python 中以字符串顺序对整数进行自定义排序
rdd.sortByKey(ascending=True, numPartitions=None, keyfunc = lambda x: str(x))
例 4-20:在 Scala 中以字符串顺序对整数进行自定义排序
val input: RDD[(Int, Venue)] = ...
implicit val sortIntegersByString = new Ordering[Int] {
override def compare(a: Int, b: Int) = a.toString.compare(b.toString)
}
rdd.sortByKey()
例 4-21:在 Java 中以字符串顺序对整数进行自定义排序
class IntegerComparator implements Comparator<Integer> {
public int compare(Integer a, Integer b) {
return String.valueOf(a).compareTo(String.valueOf(b))
}
}
rdd.sortByKey(comp)
4.4 Pair RDD的行动操作
和转化操作一样,所有基础 RDD 支持的传统行动操作也都在 pair RDD 上可用。Pair RDD 提供了一些额外的行动操作,可以让我们充分利用数据的键值对特性。这些操作列在了表 4-3 中。
4.5 数据分区
本章要讨论的最后一个 Spark 特性是对数据集在节点间的分区进行控制。在分布式程序中, 通信的代价是很大的,因此控制数据分布以获得最少的网络传输可以极大地提升整体性 能。和单节点的程序需要为记录集合选择合适的数据结构一样,Spark 程序可以通过控制 RDD 分区方式来减少通信开销。分区并不是对所有应用都有好处的——比如,如果给定 RDD 只需要被扫描一次,我们完全没有必要对其预先进行分区处理。只有当数据集多次在 诸如连接这种基于键的操作中使用时,分区才会有帮助。我们会给出一些小例子来说明这点。
Spark 中所有的键值对 RDD 都可以进行分区。系统会根据一个针对键的函数对元素进行分 组。尽管 Spark 没有给出显示控制每个键具体落在哪一个工作节点上的方法(部分原因是 Spark 即使在某些节点失败时依然可以工作),但 Spark 可以确保同一组的键出现在同一个 节点上。比如,你可能使用哈希分区将一个 RDD 分成了 100 个分区,此时键的哈希值对 100 取模的结果相同的记录会被放在一个节点上。你也可以使用范围分区法,将键在同一 个范围区间内的记录都放在同一个节点上。
举个简单的例子,我们分析这样一个应用,它在内存中保存着一张很大的用户信息表—— 也就是一个由 (UserID, UserInfo) 对组成的 RDD,其中 UserInfo 包含一个该用户所订阅 的主题的列表。该应用会周期性地将这张表与一个小文件进行组合,这个小文件中存着过 去五分钟内发生的事件——其实就是一个由 (UserID, LinkInfo) 对组成的表,存放着过去 五分钟内某网站各用户的访问情况。例如,我们可能需要对用户访问其未订阅主题的页面 的情况进行统计。我们可以使用 Spark 的 join() 操作来实现这个组合操作,其中需要把 UserInfo 和 LinkInfo 的有序对根据 UserID 进行分组。我们的应用如例 4-22 所示。
例 4-22:简单的 Scala 应用
// 初始化代码;从HDFS商的一个Hadoop SequenceFile中读取用户信息
// userData中的元素会根据它们被读取时的来源,即HDFS块所在的节点来分布
// Spark此时无法获知某个特定的UserID对应的记录位于哪个节点上
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist()
// 周期性调用函数来处理过去五分钟产生的事件日志
// 假设这是一个包含(UserID, LinkInfo)对的SequenceFile def processNewLogs(logFileName: String) {
val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
val joined = userData.join(events)// RDD of (UserID, (UserInfo, LinkInfo)) pairs
val offTopicVisits = joined.filter {
case (userId, (userInfo, linkInfo)) => // Expand the tuple into its components
!userInfo.topics.contains(linkInfo.topic)
}.count()
println("Number of visits to non-subscribed topics: " + offTopicVisits)
}
这段代码可以正确运行,但是不够高效。这是因为在每次调用 processNewLogs() 时都会用 到 join() 操作,而我们对数据集是如何分区的却一无所知。默认情况下,连接操作会将两 个数据集中的所有键的哈希值都求出来,将该哈希值相同的记录通过网络传到同一台机器 上,然后在那台机器上对所有键相同的记录进行连接操作(见图 4-4)。因为 userData 表比 每五分钟出现的访问日志表 events 要大得多,所以要浪费时间做很多额外工作:在每次调 用时都对 userData 表进行哈希值计算和跨节点数据混洗,虽然这些数据从来都不会变化。
要解决这一问题也很简单:在程序开始时,对 userData 表使用 partitionBy() 转化操作, 将这张表转为哈希分区。可以通过向 partitionBy 传递一个 spark.HashPartitioner 对象来 实现该操作,如例 4-23 所示。
例 4-23:Scala 自定义分区方式
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
.partitionBy(new HashPartitioner(100)) // 构造100个分区 .persist()
processNewLogs() 方 法 可 以 保 持 不 变: 在 processNewLogs() 中,eventsRDD 是 本 地 变 量,只在该方法中使用了一次,所以为 events 指定分区方式没有什么用处。由于在构 建 userData 时调用了 partitionBy(),Spark 就知道了该 RDD 是根据键的哈希值来分 区的,这样在调用 join() 时,Spark 就会利用到这一点。具体来说,当调用 userData. join(events) 时,Spark 只会对 events 进行数据混洗操作,将 events 中特定 UserID 的记 录发送到 userData 的对应分区所在的那台机器上(见图 4-5)。这样,需要通过网络传输的 数据就大大减少了,程序运行速度也可以显著提升了。
注意,partitionBy() 是一个转化操作,因此它的返回值总是一个新的 RDD,但它不会改变 原来的 RDD。RDD 一旦创建就无法修改。因此应该对 partitionBy() 的结果进行持久化, 并保存为 userData,而不是原来的 sequenceFile() 的输出。此外,传给 partitionBy() 的 100 表示分区数目,它会控制之后对这个 RDD 进行进一步操作(比如连接操作)时有多少 任务会并行执行。总的来说,这个值至少应该和集群中的总核心数一样。
如果没有将 partitionBy() 转化操作的结果持久化,那么后面每次用到这个 RDD 时都会重复地对数据进行分区操作。不进行持久化会导致整个 RDD 谱 系图重新求值。那样的话,partitionBy() 带来的好处就会被抵消,导致重 复对数据进行分区以及跨节点的混洗,和没有指定分区方式时发生的情况十 分相似。
事实上,许多其他 Spark 操作会自动为结果 RDD 设定已知的分区方式信息,而且除 join() 外还有很多操作也会利用到已有的分区信息。比如,sortByKey() 和 groupByKey() 会分别生成范围分区的 RDD 和哈希分区的 RDD。而另一方面,诸如 map() 这样的操作会 导致新的 RDD 失去父 RDD 的分区信息,因为这样的操作理论上可能会修改每条记录的 键。接下来的几节中,我们会讨论如何获取 RDD 的分区信息,以及数据分区是如何影响 各种 Spark 操作的。
4.5.1 获取RDD的分区方式
在 Scala 和 Java 中,你可以使用 RDD 的 partitioner 属性(Java 中使用 partitioner() 方法)来获取 RDD 的分区方式。 它会返回一个 scala.Option 对象,这是 Scala 中用来存放 可能存在的对象的容器类。你可以对这个 Option 对象调用 isDefined() 来检查其中是否有 值,调用 get() 来获取其中的值。如果存在值的话,这个值会是一个 spark.Partitioner 对象。这本质上是一个告诉我们 RDD 中各个键分别属于哪个分区的函数。
在 Spark shell 中使用 partitioner 属性不仅是检验各种 Spark 操作如何影响分区方式的一种 好办法,还可以用来在你的程序中检查想要使用的操作是否会生成正确的结果(见例 4-24)。
例 4-24:获取 RDD 的分区方式
scala> val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3)))
pairs: spark.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at
<console>:12
scala> pairs.partitioner
res0: Option[spark.Partitioner] = None
scala> val partitioned = pairs.partitionBy(new spark.HashPartitioner(2))
partitioned: spark.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at <console>:14
scala> partitioned.partitioner
res1: Option[spark.Partitioner] = Some(spark.HashPartitioner@5147788d)
在这段简短的代码中,我们创建出了一个由 (Int, Int) 对组成的 RDD,初始时没有分 区方式信息(一个值为 None 的 Option 对象)。然后通过对第一个 RDD 进行哈希分区, 创建出了第二个 RDD。如果确实要在后续操作中使用 partitioned,那就应当在定义 partitioned 时,在第三行输入的最后加上 persist()。这和之前的例子中需要对 userData 调用 persist() 的原因是一样的:如果不调用 persist() 的话,后续的 RDD 操作会对 partitioned 的整个谱系重新求值,这会导致对 pairs 一遍又一遍地进行哈希分区操作。
4.5.2 从分区中获益的操作
Spark 的许多操作都引入了将数据根据键跨节点进行混洗的过程。所有这些操作都会 从数据分区中获益。就 Spark 1.0 而言,能够从数据分区中获益的操作有 cogroup()、 groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、 combineByKey() 以及 lookup()。
对于像 reduceByKey() 这样只作用于单个 RDD 的操作,运行在未分区的 RDD 上的时候会 导致每个键的所有对应值都在每台机器上进行本地计算,只需要把本地最终归约出的结 果值从各工作节点传回主节点,所以原本的网络开销就不算大。而对于诸如 cogroup() 和 join() 这样的二元操作,预先进行数据分区会导致其中至少一个 RDD(使用已知分区器的那个 RDD)不发生数据混洗。如果两个 RDD 使用同样的分区方式,并且它们还缓存在 同样的机器上(比如一个 RDD 是通过 mapValues() 从另一个 RDD 中创建出来的,这两个 RDD 就会拥有相同的键和分区方式),或者其中一个 RDD 还没有被计算出来,那么跨节 点的数据混洗就不会发生了。
4.5.4 自定义分区方式
虽然 Spark 提供的 HashPartitioner 与 RangePartitioner 已经能够满足大多数用例,但 Spark 还是允许你通过提供一个自定义的 Partitioner 对象来控制 RDD 的分区方式。这可 以让你利用领域知识进一步减少通信开销。
要实现自定义的分区器,你需要继承org.apache.spark.Partitioner 类并实现下面三个方法。
- numPartitions: Int:返回创建出来的分区数。
- getPartition(key: Any): Int:返回给定键的分区编号(0 到 numPartitions-1)。
- equals():Java 判断相等性的标准方法。这个方法的实现非常重要,Spark 需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样 Spark 才可以判断两个 RDD 的分区方式是否相同。