“以道佐人主者,不以兵强天下。
其事好远。
师之所处,荆棘生焉;大军之后,必有凶年。
善有果而已,不敢以取强。
果而勿矜,果而勿伐,果而勿骄。
果而不得已,果而勿强。
物壮则老,是谓不道,不道早已。”[1]
spark内置了非常多有用的算子(方法),通过对这些算子的组合就可以完成业务需要的功能,spark的编程归根结底就是对spark算子的使用,因此非常有必要对这些内置算子进行详细的归纳。
spark算子在大的方向上可以分为两类:
名称 | 说明 |
---|---|
Transformation |
变换、转换算子 :不触发提交作业,只是完成作业中间过程处理;Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。Transformation参数类型为value或者key-value的形式。 |
Action |
行动算子 :触发SparkContext提交job作业。Action 算子会触发 Spark 提交作业(Job),并将数据输出 Spark系统。 |
上表中提到Transformation的参数类型分为两类:value和key-value形式,对其归纳如下:
value 类型
细类型 | 算子 |
---|---|
输入分区与输出分区一对一型 | map flatMap mapPartitions glom |
输入分区与输出分区多对一型 | union cartesain |
输入分区与输出分区多对多型 | groupBy |
输出分区为输入分区子集型 | filter distinct substract sample takeSample |
Cache型 | cache persist |
key-value类型
细类型 | 算子 |
---|---|
输入分区与输出分区一对一 | mapValues |
对单个RDD或两个RDD聚集 | 单个RDD聚集: combineByKey reduceByKey partitionBy 两个RDD聚集: Cogroup |
连接 | join leftOutJoin和 rightOutJoin |
Action算子
细类型 | 算子 |
---|---|
无输出 | foreach |
HDFS | saveAsTextFile saveAsObjectFile |
Scala集合和数据类型 | collect collectAsMap reduceByKeyLocally lookup count top reduce fold aggregate |
接下来,以pyspark
为编译环境对上述算子(不限于)进行详细解析。
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.3.0.cloudera2
/_/
Using Python version 2.7.5 (default, Aug 4 2017 00:39:18)
SparkSession available as 'spark'.
map
map(f, preservesPartitioning=False)
map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD,RDD之间的元素是一对一关系。
>>> x = sc.parallelize([1,2,3,4])
>>> y = x.map(lambda x:(x**3))
>>> y.collect()
[1, 8, 27, 64]
filter
filter(f)
对RDD元素进行过滤,返回一个新的数据集,由经过func函数后返回值为true的原元素组成。
>>> y = x.filter(lambda x:x>2)
>>> y.collect()
[3, 4]
flatMap
flatMap(f, preservesPartitioning=False)
类似于map,但是每一个输入元素会被映射为0到多个输入元素,RDD之间的元素是一对多关系。
>>> y = x.flatMap(lambda x:(x,x*100,x**2))
>>> y.collect()
[1, 100, 1, 2, 200, 4, 3, 300, 9, 4, 400, 16]
glom
glom()
返回一个RDD,它将每个分区中的所有元素合并到一个列表中。
>>> a = sc.parallelize([1,2,3,4],2) //第二个参数2,表示数据集切分的份数(slices)。Spark将会在集群上为每一份数据起一个任务。
>>> y = a.glom()
>>> y.collect()
[[1, 2], [3, 4]]
mapPartitions
mapPartitions(f, preservesPartitioning=False)
Return a new RDD by applying a function to each partition of this RDD.
>>> xx = sc.parallelize([1,2,3,4], 2)
>>> def f(iter):
... yield sum(iter)
...
>>> yy = xx.mapPartitions(f)
>>> print 'xx原来分区信息:{0}'.format(xx.glom().collect())
xx原来分区信息:[[1, 2], [3, 4]]
>>> print 'xx经过f计算后的结果:{}'.format(yy.glom().collect())
xx经过f计算后的结果:[[3], [7]]
关于yield的使用,请参考我的另一篇笔记《Python易筋经-yield when data is bigger》
mapPartitionsWithIndex
mapPartitionsWithIndex(f, preservesPartitioning=False)
Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.
和mapPartitions类似,也是针对每个分区处理,但是func函数需要两个入参,第一个表示partition分区索引,第二个入参表示每个分区的迭代器。
>>> x = sc.parallelize([1, 2, 3, 4], 2)
>>> def f(splitIndex, iterator): yield (splitIndex, sum(iterator))
...
>>> y = x.mapPartitionsWithIndex(f)
>>> print 'x原来分区信息:{0}'.format(x.glom().collect())
x原来分区信息:[[1, 2], [3, 4]]
>>> print 'x经过f计算后的结果:{}'.format(y.glom().collect())
x经过f计算后的结果:[[(0, 3)], [(1, 7)]]
getNumsPartitions
getNumPartitions()
Returns the number of partitions in RDD
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> print '分区有{}个'.format(rdd.getNumPartitions())
分区有2
distinct
distinct(numPartitions=None)
Return a new RDD containing the distinct elements in this RDD.
>>> res = sorted(sc.parallelize([1, 1, 1, 2, 3, 2, 3]).distinct().collect())
>>> print '去重后的结果:{}'.format(res)
去重后的结果:[1, 2, 3]
sample
sample(withReplacement, fraction, seed=None)
Return a sampled subset of this RDD.
Parameters:
withReplacement – can elements be sampled multiple times (replaced when sampled out)
fraction – expected size of the sample as a fraction of this RDD’s size without replacement: probability that each element is chosen; fraction must be [0, 1] with replacement: expected number of times each element is chosen; fraction must be >= 0
seed – seed for the random number generator
对原RDD进行采样,其中withReplacement表示是否有放回的抽样,fraction表示采样大小是原RDD的百分比,seed表示随机数生成器。fraction和seed相同,则每次返回的数据相同。
>>> rdd = sc.parallelize(range(100), 4)
>>> y = rdd.sample(False, 0.1, 81)
>>> y.collect()
[4, 27, 40, 42, 43, 60, 76, 80, 86, 97]
>>> y.count()
10
union 并集
union(other)
Return the union of this RDD and another one.
>>> rdd = sc.parallelize([1, 1, 2, 3])
>>> rdd1 = sc.parallelize([5, 3, 4, 6])
>>> print rdd.union(rdd1).collect()
[1, 1, 2, 3, 5, 3, 4, 6]
intersection 交集
intersection(other)
Note that this method performs a shuffle internally.
>>> rdd = sc.parallelize([1, 1, 2, 3])
>>> rdd1 = sc.parallelize([5, 3, 4, 6])
>>> print rdd.intersection(rdd1).collect()
[3]
sortByKey
sortByKey(ascending=True, numPartitions=None, keyfunc=func)
Sorts this RDD, which is assumed to consist of (key, value) pairs.
>>> tmp = [('a', 1), ('f', 2), ('d', 3), ('c', 4), ('b', 5)]
>>> rdd = sc.parallelize(tmp, 2)
>>> print rdd.glom().collect()
[[('a', 1), ('f', 2)], [('d', 3), ('c', 4), ('b', 5)]]
>>> sort1 = rdd.sortByKey(True,1).glom().collect()
>>> sort2 = rdd.sortByKey(True,3).glom().collect()
>>> print sort1
[[('a', 1), ('b', 5), ('c', 4), ('d', 3), ('f', 2)]]
>>> print sort2
[[('a', 1), ('b', 5)], [('c', 4), ('d', 3)], [('f', 2)]]
sortBy
sortBy(keyfunc, ascending=True, numPartitions=None)
跟sortByKey类似,用索引的方式指定根据什么sort
>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
>>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
>>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()
[('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
cartesian
cartesian
(other)
返回两个rdd的笛卡尔积。
>>> rdd = sc.parallelize([1, 2])
>>> rdd_1 = sc.parallelize([3,4])
>>> rdd.cartesian(rdd_1).collect()
[(1, 3), (1, 4), (2, 3), (2, 4)]
groupBy
groupBy(f, numPartitions=None, partitionFunc=<function portable_hash>)
根据f逻辑结果进行分组。
>>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
>>> result = rdd.groupBy(lambda x: x % 2).collect()
>>> sorted([(x, sorted(y)) for (x, y) in result])
[(0, [2, 8]), (1, [1, 1, 3, 5])]
groupByKey
groupByKey(numPartitions=None, partitionFunc=<function portable_hash>)
groupByKey([numTasks])是数据分组操作,在一个由(K, V)键值对组成的数据集上调用,返回一个(K, Seq[V])对的数据集。
注意,如果要对每个键执行聚合(比如求和或平均值),使用reduceByKey或aggregateByKey将提供更好的性能。
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.groupByKey().mapValues(len).collect())
[('a', 2), ('b', 1)]
>>> sorted(rdd.groupByKey().mapValues(list).collect())
[('a', [1, 1]), ('b', [1])]
pipe
pipe(command, env=None, checkCode=False
将由管道元素创建的RDD返回到一个forked外部进程。
参数:checkCode——是否检查shell命令的返回值。
>>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()
[u'1', u'2', u'', u'3']
>>> rdd = sc.parallelize(['spark2.3.0', 'kafka', 'hbase'])
>>> rdd2 = rdd.pipe('grep -i "ar"')
>>> print '经过pipe处理过后的数据:{}'.format(rdd2.collect())
经过pipe处理过后的数据:[u'spark2.3.0']
foreach
foreach(f)
Applies a function to all elements of this RDD.
>>> def f(x): print(x)
...
>>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
max, min, sum, count
>>> x = sc.parallelize(range(10))
>>> x.collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> print '最大值:{}'.format(x.max())
最大值:9
>>> print '最小值:{}'.format(x.min())
最小值:0
>>> print '总和:{}'.format(x.sum())
总和:45
>>> print '总个数:{}'.format(x.count())
总个数:10
mean, variance, sampleVariance, stdev, sampleStdev
>>> x.collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> print '平均值:{}'.format(x.mean())
平均值:4.5
>>> print '方差:{}'.format(x.variance())
方差:8.25
>>> print '样本方差:{}'.format(x.sampleVariance())
样本方差:9.16666666667
>>> print '总体标准偏差:{}'.format(x.stdev())
总体标准偏差:2.87228132327
>>> print '样本标准偏差:{}'.format(x.sampleStdev())
样本标准偏差:3.0276503541
countByKey, countByValue
countByKey()
Count the number of elements for each key, and return the result to the master as a dictionary.
countByValue()
Return the count of each unique value in this RDD as a dictionary of (value, count) pairs.
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.countByKey().items())
[('a', 2), ('b', 1)]
>>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
[(1, 2), (2, 3)]
first, top, take, takeOrdered
first()
top(num, key=None)
从按降序排列的RDD中获取前N个元素,或者有可选的key函数决定顺序。
take(num)
Take the first num elements of the RDD.
akeOrdered(num, key=None)
从按升序排列的RDD中获取N个元素,或者由可选key函数指定。
>>> rdd = sc.parallelize([10, 4, 2, 12, 3])
>>> rdd.first()
10
>>> rdd.top(1)
[12]
>>> rdd.top(2)
[12, 10]
>>> rdd.top(2,key=str)
[4, 3]
>>> rdd.take(3)
[10, 4, 2]
subtract
subtract(other, numPartitions=None)
Return each value in self that is not contained in other.
>>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
>>> y = sc.parallelize([("a", 3), ("c", None)])
>>> sorted(x.subtract(y).collect())
[('a', 1), ('b', 4), ('b', 5)]
cache
cache()
Persist this RDD with the default storage level (MEMORY_ONLY).
presist
persist()
Set this RDD’s storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. If no storage level is specified defaults to (MEMORY_ONLY).
cache只有一个默认的缓存级别MEMORY_ONLY ,而persist可以根据情况设置其它的缓存级别。
reduce
reduce(f)
Reduces the elements of this RDD using the specified commutative and associative binary operator. Currently reduces partitions locally.
>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
15
>>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
10
reduceByKey
reduceByKey(func, numPartitions=None, partitionFunc=<function portable_hash>)
Merge the values for each key using an associative and commutative reduce function.
This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.
Output will be partitioned with numPartitions partitions, or the default parallelism level if numPartitions is not specified. Default partitioner is hash-partition.
把一个函数作用在一个RDD上,这个函数必须接收两个参数,reduce把结果继续和RDD的下一个元素做累积计算。注意:reduceByKey的函数是针对具有相同key的二元组
>>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKey(add).collect())
[('a', 2), ('b', 1)]
>>> num = sc.parallelize([1, 4, 2, 3, 4, 4, 2, 4])
>>> pairs = num.map(lambda x: (x, 1))
>>> pairs.collect()
[(1, 1), (4, 1), (2, 1), (3, 1), (4, 1), (4, 1), (2, 1), (4, 1)]
>>> a = pairs.reduceByKey(lambda x, y: x+y+1)
>>> a.collect()
[(2, 3), (4, 7), (1, 1), (3, 1)]
>>> b = pairs.reduceByKey(lambda x, y: x+y+2)
>>> b.collect()
[(2, 4), (4, 10), (1, 1), (3, 1)
-
老子《道德经》第三十章,老子故里,中国鹿邑。 ↩