行动操作
行动操作会把最终计算的结果返回到驱动程序,或写入外部的存储系统中。由于行动操作需要产生实际的输出,他们会强制执行那些必须用到的RDD的转换操作。
非行动操作的其他操作都是惰性求值的。这意味着在被调用行动操作之前Spark不会开始计算。
惰性求值意味着当我们对 RDD 调用转换操作 (例如 map() )时,操作不会立即执行。相反,Spark 会在内部记录下所要求的执行的操作的相关信息。我们不应该把 RDD 看做存放特定数据的数据集,而最好把每个 RDD 看做通过转化操作构建出来、记录如何计算的指令集。把数据读取到 RDD 仍然是惰性的。所以当调用 textFile() 时,数据不会读取进来, 而是在必要时才会读取。
常见的非行动算子有 map flatMap filter distinct sample union intersection subtract cartesian 等 用法详情见上一篇 Spar05 RDD 转换算子
collect 算子
返回 RDD 中所有元素
Python
>>> rdd = sc.parallelize(['a','b','c'])
>>> rdd.collect()
['a', 'b', 'c']
count 算子
计算 RDD 中的元素个数
Python
>>> rdd = sc.parallelize(['a','b','c'])
>>> rdd.count()
3
countByValue 算子
计算各个元素在 RDD 中出现的次数
Python
>>> rdd = sc.parallelize(['a','a','b','c'])
>>> rdd.countByValue()
defaultdict(<class 'int'>, {'a': 2, 'b': 1, 'c': 1})
take(num) 算子
从 RDD 中返回 num个元素
Python
>>> rdd = sc.parallelize(['a','b','c'])
>>> rdd.take(2)
['a', 'b']
top(num)
从RDD中返回最前面的 num 个元素
>>> rdd = sc.parallelize(['a','b','c'])
>>> rdd.top(2)
['c', 'b']
takeOrdered(num,key=ordering)
从 RDD 中按照顺序返回前num个元素
Python
>>> rdd = sc.parallelize([1,3,5,2,9,7])
>>> rdd.takeOrdered(3,key=lambda x:-x)
[9, 7, 5]
对复杂类型操作
Python
>>> rdd = sc.parallelize([('a',1),('b',3),('c',5),('d',2),('e',9),('f',7)])
>>> rdd.takeOrdered(3,key=lambda x:x[1])
[('a', 1), ('d', 2), ('b', 3)]
takeSample(withReplacement,num,[sed])
从 RDD 中返回任意一些元素
withReplacement: true表示有放回的采样,false表示无放回采样
num: 表示返回的采样数据的个数
sed: 表示用于指定的随机数生成器种子
Python
>>> rdd = sc.parallelize([1,3,5,2,9,7])
>>> rdd.takeSample(False,3)
[3, 9, 1]
>>> rdd.takeSample(False,3)
[2, 3, 5]
>>> rdd.takeSample(False,3)
[7, 3, 9]
>>> rdd.takeSample(False,3)
[7, 9, 1]
reduce(func)
并行整合 RDD 中所有的元素 例如 就和
Python
>>> rdd = sc.parallelize([1,3,5,2,9,7])
>>> rdd.reduce(lambda x,y:x+y)
27
fold(zero,func)
与 reduct 类型 需要提供一个初始值 zero是0值 也称为初始值
Python
>>> rdd = sc.parallelize([1,3,5,2,9,7])
>>> rdd.fold(0,lambda x,y:x+y)
27
与reduce 返回结果一致 假如zero不是0 而是1 结果是多少呢
>>> rdd.fold(1,lambda x,y:x+y)
32
结果并不是28而是32
这是为什么呢?
因为 RDD 是分区存放的 计算 fold 每个分区计算 让后将各个分区计算的结果在计算。所以每个分区计算时都有 zero 值
我们来看 rdd 的分区情况
>>> rdd.glom().collect()
[[1], [3, 5], [2], [9, 7]]
所以计算逻辑为:
先计算各个分区 第一列为 zero
1 + 1 = 2
1 + 3 + 5 = 9
1 + 2 = 3
1 + 9 + 7 = 17
各个分区求和:
1 + 2 + 9 + 3 + 17 = 32 (第一个1位zero)
aggregate(zeroValue, seqOp, combOp)
zeroValue:初始值
seqOp:每个分区操作
combOp:分区间操作
例如求 1,2,3,4,5,6,7,8,9,10求和
Python
>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
>>> rdd.aggregate(0,lambda acc,value:acc+value,lambda acc1,acc2:acc1+acc2)
55
seqOp:的第一个参数 acc 是上一次计算结果 value 是当前值
combOp:参数 acc1和acc2 都是各个分区计算结果
在 例如求 1,2,3,4,5,6,7,8,9,10 的平均值
Python
>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
>>> res = rdd.aggregate((0,0),lambda acc,value:(acc[0]+value,acc[1]+1),lambda acc1,acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1]))
>>> res[0]/res[1]
5.5
foreach(func)
对每个元素使用给定的函数 func
Python
>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
>>> rdd.foreach(lambda x:print(str(x)+'\n'))
1
2
5
6
3
4
7
8
9
10
持久化(缓存)
如前所说,Spark RDD 是惰性求值的,而有时我们期望能多次使用同一个 RDD。如果简单的对 RDD 做调用操作,Spark 每次会重新计算 RDD 以及它的所有依赖。这在迭代算法中消耗格外大,因为迭代算法长长会多次使用同一组数据。例如
Python
>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
>>> rdd1 = rdd.map(lambda x : x * x)
>>> rdd1.count()
10
>>> rdd1.collect()
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
在调用 rdd1.count 和 rdd1.collect() 都会计算 map。为了避免多次计算同一个 RDD ,可以让Spark 对数据进行持久化。Spark 会将 RDD 的节点分别保存到他们求出的分区上。如果有一个节点数据丢失,Spark 会计算丢失分区的数据。持久化方法persist
Python
>>> import pyspark.StorageLevel
>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
>>> rdd1 = rdd.map(lambda x : x * x)
>>> rdd1.persist(StorageLevel.MEMORY_ONLY)
下表是各个缓存级别及说明:
级别 | 使用空间 | CPU时间 | 是否内存中 | 是否磁盘上 | 备注 |
---|---|---|---|---|---|
MEMORY_ONLY | 高 | 低 | 是 | 否 | |
MEMORY_ONLY_SER | 低 | 高 | 是 | 否 | |
MEMORY_AND_DISK | 高 | 中等 | 部分 | 部分 | 如果数据在内存放不下,则溢写到磁盘 |
MEMORY_AND_DISK_SER | 低 | 高 | 部分 | 部分 | 如果数据在内存放不下,则溢写到磁盘。在内存中存放序列化的数据 |
DISK_ONLY | 低 | 高 | 否 | 是 |
如果有必要在存储级别的末尾加上 '_2' 来把持久化数据存为两份