pairRDD
Spark 为包含键值对类型的 RDD 提供了一些专有的操作。这些 RDD 被称为pair RDD1。 PairRDD 是很多程序的构成要素, 因为它们提供了并行操作各个键或跨节点重新进行数据分组的操作接口。
创建方法
- 使用map创建
pairs = lines.map(lambda x: (x.split(" ")[0], x))
pairRDD的转化操作
-
Pair RDD 可以使用所有标准 RDD 上的可用的转化操作
下面的转化操作以键值对集合{(1,2),(3,4),(5,6)}为例
-
mapValues函数
- 函数的功能类似
map{case (x, y): (x,func(y))}
- 函数的功能类似
pairRDD的聚合操作
- reduceByKey():为数据集中的每个键进行并行的归约操作,每个归约操作会将键相同的值合并起来。它会返回一个由各键和对应键归约出来的结果值组成的新的 RDD。
#这里使用reduceByKey以及MapValues来计算相同的键的值的平均值
#!/usr/bin/env python
# coding=utf-8
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf= conf)
nums = sc.parallelize([(1,2), (1,3), (1,6), (2,4), (2, 7), (2, 9)])
avg = nums.mapValues(lambda x : (x, 1)).reduceByKey(lambda x, y : (x[0] + y[0], x[1] + y[1]))
result = avg.mapValues(lambda x : float(x[0])/float(x[1]))
print (result.first())
#使用reduceByKey来进行词频统计
#!/usr/bin/env python
# coding=utf-8
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf= conf)
lines = sc.textFile("usePairRDDWordCount.py")
words = lines.flatMap(lambda x:x.split(" "))
wordsMap = words.map(lambda x:(x, 1)).reduceByKey(lambda x, y : x + y)
print(wordsMap.first())
-
foldByKey():所使用的合并函数对零值与另一
个元素进行合并,结果仍为该元素 -
conbineByKey():如果这是一个新的元素, combineByKey() 会使用一个叫作 createCombiner() 的函数来创建那个键对应的累加器的初始值。 如果这是一个在处理当前分区之前已经遇到的键, 它会使用mergeValue() 方法将该键的累加器对应的当前值与这个新的值进行合并。由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。其数据流程图如下所示:
下面是使用conbineByKey()求出每个键对应的平均值:
#!/usr/bin/env python
# coding=utf-8
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf= conf)
lines = sc.textFile("./words.map")
#wordsMap = lines.map(lambda x : (x.split(" ")[0], x.split(" ")[1]))
wordsMap = lines.map(lambda x : x.split(" ")).map(lambda x : (x[0], int(x[1])))
sumCount = wordsMap.combineByKey(
(lambda x:(x, 1)), #createCombiner()
(lambda x, y:(x[0] + y, x[1] + 1)), #mergeValue()
(lambda x, y:(x[0] + y[0], x[1] + y[1])) #mergeContainers()
)
#averg = sumCount.map(lambda xy : (xy[0], xy[1][0]/xy[1][1])).collectAsMap()
#print(averg["coffee"])
averg = sumCount.map(lambda xy : (xy[0], xy[1][0]/xy[1][1]))
print(averg.first())
print(averg.getNumPartitions())
-
关于并行度
每个 RDD 都有固定数目的分区,分区数决定了在 RDD 上执行操作
时的并行度。在执行聚合或分组操作时, 可以要求 Spark 使用给定的分区数。上面讨论的大多数操作符都能接收第二个参数, 这个参数用来指定分组结果或聚合结果的RDD 的分区数,例如:
data = [("a", 3), ("b", 4), ("a", 1)]
sc.parallelize(data).reduceByKey(lambda x, y: x + y) # 默认并行度
sc.parallelize(data).reduceByKey(lambda x, y: x + y, 10) # 自定义并行度
-
其他
- Spark 提供了 repartition() 函数。它会把数据通过网络进行混洗,并创
建出新的分区集合。 切记,对数据进行重新分区是代价相对比较大的操作。 - 可以调用Python 中的 rdd.getNumPartitions 查看 RDD 的分区数
- Spark 提供了 repartition() 函数。它会把数据通过网络进行混洗,并创
数据分组
- groupByKey:对于一个由类型 K 的键和类型 V 的值组成的 RDD,所得到的结果 RDD 类型会是[K, Iterable[V]]。
如果程序中先使用了groupByKey() 然后再使用了 reduce() 或者fold() 的代码,很可
能可以通过使用一种根据键进行聚合的函数来更高效地实现同样的效果。例如,
rdd.reduceByKey(func)与 rdd.groupByKey().mapValues(value =>
value.reduce(func)) 等价,但是前者更为高效,因为它避免了为每个键创建存放值的列表的步骤。
-
groupBy: 可以用于未成对的数据上,也可以根据除键相同以外的条件进行分组。它可以
接收一个函数,对源 RDD 中的每个元素使用该函数,将返回结果作为键再进行分组。 -
cogroup:对多个共享同
一个键的 RDD 进行分组。两个键的类型均为 K 而值的类型分别为 V 和 W 的 RDD 进行cogroup() 时,得到的结果 RDD 类型为 [(K, (Iterable[V], Iterable[W]))]。注意后面这个方括号中的是两个迭代对象
连接
- join:只有在两个 pair RDD 中都存在的键才叫输出。
-
leftOuterJoin:源 RDD 的每一个键都有对应的记录。每个键相应的值是由一个源 RDD 中的值与一个包含第二个 RDD 的值的 Option(在 Java 中为Optional)对象组成的二元组。在 Python 中,如果一个值不存在,则使用 None 来表示;而数据存在时就用常规的值来表示, 不使用任何封装。
rightOuterJoin:只不过预期结果中的键必须出现在
第二个 RDD 中,而二元组中的可缺失的部分则来自于源 RDD 而非第二个 RDD。
排序
- sortByKey:接收一个叫作 ascending 的参数,表示我们是否想要让结果按升序排序,下面是一个例子
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf= conf)
lines = sc.textFile("usePairRDD.py")
pairs = lines.map(lambda x : (x.split(' ')[0], x))
res = pairs.sortByKey(ascending = True, numPartitions=None, keyfunc=lambda
x:str(x))
#res = pairs.sortByKey(True, None, lambda x : str(x)) #这种或者上面那种形式都可以的
print (res.first())
RDD行动操作
数据分区(难点)
Spark 程序可以通过控制RDD 分区方式来减少通信开销。 分区并不是对所有应用都有好处的——比如,如果给定RDD 只需要被扫描一次, 我们完全没有必要对其预先进行分区处理。只有当数据集多次在诸如连接这种基于键的操作中使用时, 分区才会有帮助。
Spark 中所有的键值对 RDD 都可以进行分区。系统会根据一个针对键的函数对元素进行分组。Spark 没有给出显示控制每个键具体落在哪一个工作节点上的方法(部分原因是Spark 即使在某些节点失败时依然可以工作),但 Spark 可以确保同一组的键出现在同一个节点上。
- 一个例子:内存中保存着一张很大的用户信息表——一个由 (UserID, UserInfo) 对组成的 RDD,其中 UserInfo 包含一个该用户所订阅的主题的列表。 该应用会周期性地将这张表与一个小文件进行组合,这个小文件中存着过去五分钟内发生的事件——其实就是一个由 (UserID, LinkInfo) 对组成的表,存放着过去五分钟内某网站各用户的访问情况。我们需要对用户访问其未订阅主题的页面的情况进行统计。下面首先是未分区的Scala程序例子:
// 初始化代码;从HDFS商的一个Hadoop SequenceFile中读取用户信息
// userData中的元素会根据它们被读取时的来源,即HDFS块所在的节点来分布
// Spark此时无法获知某个特定的UserID对应的记录位于哪个节点上
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist()
// 周期性调用函数来处理过去五分钟产生的事件日志
// 假设这是一个包含(UserID, LinkInfo)对的SequenceFile
def processNewLogs(logFileName: String) {
val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
val joined = userData.join(events)// RDD of (UserID, (UserInfo, LinkInfo)) pairs
val offTopicVisits = joined.filter {
case (userId, (userInfo, linkInfo)) => // Expand the tuple into its components
!userInfo.topics.contains(linkInfo.topic)
}.count()
println("Number of visits to non-subscribed topics: " + offTopicVisits)
}
对上面的代码,每次调用 processNewLogs() 时都会用到 join() 操作,而我们对数据集是如何分区的却一无所知。接操作会将两个数据集中的所有键的哈希值都求出来, 将该哈希值相同的记录通过网络传到同一台机器上,然后在那台机器上对所有键相同的记录进行连接操作(见图 4-4)。因为 userData 表比每五分钟出现的访问日志表 events 要大得多,所以要浪费时间做很多额外工作:在每次调用时都对 userData 表进行哈希值计算和跨节点数据混洗,虽然这些数据从来都不会变化。其网络通信图如下所示:
更改上面的代码(Scala)
//这里创建了自定义分区
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
.partitionBy(new HashPartitioner(100)) // 构造100个分区
.persist()
//python是这样使用的: rdd.partitionBy(100)
在 processNewLogs() 中, eventsRDD 是 本 地 变量,只在该方法中使用了一次, 所以为 events 指定分区方式没有什么用处。由于在构建 userData 时调用了 partitionBy(), Spark 就知道了该 RDD 是根据键的哈希值来分区的,这样在调用 join() 时, Spark 就会利用到这一点。具体来说,当调用 userData.join(events) 时, Spark 只会对 events 进行数据混洗操作,将 events 中特定 UserID 的记录发送到 userData 的对应分区所在的那台机器上(见图 4-5)。这样,需要通过网络传输的数据就大大减少了,程序运行速度可以显著提升了。这种方式的额网络通信图如下所示。注意上面需要将partitionBy的结果持久化,不进行持久化会导致整个 RDD 谱系图重新求值。那样的话, partitionBy() 带来的好处就会被抵消,导致重复对数据进行分区以及跨节点的混洗,和没有指定分区方式时发生的情况十分相似。
从分区中可以获益的操作
cogroup()、groupWith()、 join()、 leftOuterJoin()、 rightOuterJoin()、 groupByKey()、 reduceByKey()、combineByKey() 以及 lookup()。
对于 reduceByKey() 这样只作用于单个 RDD 的操作,运行在未分区的 RDD 上的时候会导致每个键的所有对应值都在每台机器上进行本地计算,只需要把本地最终归约出的结果值从各工作节点传回主节点, 所以原本的网络开销就不算大。
对于 cogroup() 和join() 这样的二元操作,预先进行数据分区会导致其中至少一个 RDD(使用已知分区器的那个 RDD)不发生数据混洗。如果两个 RDD 使用同样的分区方式, 并且它们还缓存在同样的机器上(比如一个 RDD 是通过 mapValues() 从另一个 RDD 中创建出来的,这两个RDD 就会拥有相同的键和分区方式),或者其中一个 RDD 还没有被计算出来,那么跨节点的数据混洗就不会发生了。
影响分区方式的操作
会为生成的结果 RDD 设好分区方式的操作:cogroup()、 groupWith()、 join()、 lef tOuterJoin()、 rightOuterJoin()、 groupByKey()、reduceByKey()、combineByKey()、 partitionBy()、 sort()、 mapValues()(如果父 RDD 有分区方式的话)、flatMapValues()
对于二元操作,输出数据的分区方式取决于父 RDD 的分区方式。默认情况下,结果会采用哈希分区, 分区的数量和操作的并行度一样。不过,如果其中的一个父 RDD 已经设置过分区方式, 那么结果就会采用那种分区方式;如果两个父 RDD 都设置过分区方式,结果 RDD 会采用第一个父 RDD 的分区方式。
PageRank算法
计算步骤:算法会维护两个数据集: 一个由 (pageID, linkList) 的元素组成,包含每个页面的相邻页面的列表;另一个由 (pageID, rank) 元素组成,包含每个页面的当前排序值。计算的步骤如下:
- 将每个页面的排序值初始化为 1.0。
- 在每次迭代中, 对页面 p,向其每个相邻页面(有直接链接的页面)发送一个值为rank(p)/numNeighbors(p) 的贡献值。
- 将每个页面的排序值设为 0.15 + 0.85 * contributionsReceived
// 假设相邻页面列表以Spark objectFile的形式存储
val links = sc.objectFile[(String, Seq[String])]("links")
.partitionBy(new HashPartitioner(100))
.persist()
// 将每个页面的排序值初始化为1.0;由于使用mapValues,生成的RDD
// 的分区方式会和"links"的一样
var ranks = links.mapValues(v => 1.0)
// 运行10轮PageRank迭代
for(i <- 0 until 10) {
val contributions = links.join(ranks).flatMap {
case (pageId, (links, rank)) =>
links.map(dest => (dest, rank / links.size))
}
//这里的value表示的含义是每个key收到的所有的引用贡献值。
ranks = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85*v)
}
// 写出最终排名
ranks.saveAsTextFile("ranks")
上面程序的一些注意点:
- linksRDD 在每次迭代中都会和 ranks 发生连接操作。links 是一个静态数据集,程序一开始的时候就对它进行了分区操作,这样就不需要把它通过网络进行数据混洗了。 linksRDD 的字节数一般来说也会比 ranks 大很多,毕竟它包含每个页面的相邻页面列表(由页面 ID 组成),而不仅仅是一个 Double 值,因此这一优化相比 PageRank 的原始实现(例如普通的 MapReduce)节约了相当可观的网络通信开销。
- 出于同样的原因, 我们调用 links 的 persist() 方法,将它保留在内存中以供每次迭代使用。
- 当我们第一次创建 ranks 时,我们使用 mapValues() 而不是 map() 来保留父 RDD(links)的分区方式,这样对它进行的第一次连接操作就会开销很小。
- 在循环体中, 我们在 reduceByKey() 后使用 mapValues();因为 reduceByKey() 的结果已经是哈希分区的了, 这样一来,下一次循环中将映射操作的结果再次与 links 进行连接操作时就会更加高效。
自定义分区
假设我们要在一个网页的集合上运行前一节中的 PageRank 算法。在这里,每个页面的 ID(RDD 中的键)是页面的 URL。当我们使用简单的哈希函数进行分区时,拥有相似的 URL 的页面(比如http://www.cnn.com/WORLD
和http://www.cnn.com/US
可能会被分到完全不同的节点上。 )然而,我们知道在同一个域名下的网页更有可能相互链接。由于 PageRank 需要在每次迭代中从每个页面向它所有相邻的页面发送一条消息,因此把这些页面分组到同一个分区中会更好。 可以使用自定义的分区器来实现仅根据域名而不是整个 URL 来分区。这种方式的python代码如下所示:
import urlparse
def hash_domain(url):
return hash(urlparse.urlparse(url).netloc)
rdd.partitionBy(20, hash_domain) # 创建20个分区