spark算子深度解析

“以道佐人主者,不以兵强天下。
其事好远。
师之所处,荆棘生焉;大军之后,必有凶年。
善有果而已,不敢以取强。
果而勿矜,果而勿伐,果而勿骄。
果而不得已,果而勿强。
物壮则老,是谓不道,不道早已。”[1]

spark内置了非常多有用的算子(方法),通过对这些算子的组合就可以完成业务需要的功能,spark的编程归根结底就是对spark算子的使用,因此非常有必要对这些内置算子进行详细的归纳。
spark算子在大的方向上可以分为两类:

名称 说明
Transformation 变换、转换算子:不触发提交作业,只是完成作业中间过程处理;Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。Transformation参数类型为value或者key-value的形式。
Action 行动算子:触发SparkContext提交job作业。Action 算子会触发 Spark 提交作业(Job),并将数据输出 Spark系统。

上表中提到Transformation的参数类型分为两类:value和key-value形式,对其归纳如下:

value 类型

细类型 算子
输入分区与输出分区一对一型 map
flatMap
mapPartitions
glom
输入分区与输出分区多对一型 union
cartesain
输入分区与输出分区多对多型 groupBy
输出分区为输入分区子集型 filter
distinct
substract
sample
takeSample
Cache型 cache
persist

key-value类型

细类型 算子
输入分区与输出分区一对一 mapValues
对单个RDD或两个RDD聚集 单个RDD聚集: combineByKey reduceByKey partitionBy
两个RDD聚集: Cogroup
连接 join
leftOutJoin和 rightOutJoin

Action算子

细类型 算子
无输出 foreach
HDFS saveAsTextFile
saveAsObjectFile
Scala集合和数据类型 collect
collectAsMap
reduceByKeyLocally
lookup
count
top
reduce
fold
aggregate

接下来,以pyspark为编译环境对上述算子(不限于)进行详细解析。

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.0.cloudera2
      /_/

Using Python version 2.7.5 (default, Aug  4 2017 00:39:18)
SparkSession available as 'spark'.

map

map(f, preservesPartitioning=False)

map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD,RDD之间的元素是一对一关系。

>>> x = sc.parallelize([1,2,3,4])
>>> y = x.map(lambda x:(x**3))
>>> y.collect()
[1, 8, 27, 64]

filter

filter(f)

对RDD元素进行过滤,返回一个新的数据集,由经过func函数后返回值为true的原元素组成。

>>> y = x.filter(lambda x:x>2)
>>> y.collect()
[3, 4]   

flatMap

flatMap(f, preservesPartitioning=False)

类似于map,但是每一个输入元素会被映射为0到多个输入元素,RDD之间的元素是一对多关系。

>>> y = x.flatMap(lambda x:(x,x*100,x**2))
>>> y.collect()
[1, 100, 1, 2, 200, 4, 3, 300, 9, 4, 400, 16]

glom

glom()

返回一个RDD,它将每个分区中的所有元素合并到一个列表中。

>>> a = sc.parallelize([1,2,3,4],2) //第二个参数2,表示数据集切分的份数(slices)。Spark将会在集群上为每一份数据起一个任务。
>>> y = a.glom()
>>> y.collect()
[[1, 2], [3, 4]]  

mapPartitions

mapPartitions(f, preservesPartitioning=False)

Return a new RDD by applying a function to each partition of this RDD.

>>> xx = sc.parallelize([1,2,3,4], 2)
>>> def f(iter):
...     yield sum(iter)
... 
>>> yy = xx.mapPartitions(f)
>>> print 'xx原来分区信息:{0}'.format(xx.glom().collect())
xx原来分区信息:[[1, 2], [3, 4]]                                                 
>>> print 'xx经过f计算后的结果:{}'.format(yy.glom().collect())
xx经过f计算后的结果:[[3], [7]] 

关于yield的使用,请参考我的另一篇笔记《Python易筋经-yield when data is bigger》

mapPartitionsWithIndex

mapPartitionsWithIndex(f, preservesPartitioning=False)

Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.
和mapPartitions类似,也是针对每个分区处理,但是func函数需要两个入参,第一个表示partition分区索引,第二个入参表示每个分区的迭代器。

>>> x = sc.parallelize([1, 2, 3, 4], 2)
>>> def f(splitIndex, iterator): yield (splitIndex, sum(iterator))
... 
>>> y = x.mapPartitionsWithIndex(f)
>>> print 'x原来分区信息:{0}'.format(x.glom().collect())
x原来分区信息:[[1, 2], [3, 4]]                                                 
>>> print 'x经过f计算后的结果:{}'.format(y.glom().collect())
x经过f计算后的结果:[[(0, 3)], [(1, 7)]]

getNumsPartitions

getNumPartitions()

Returns the number of partitions in RDD

>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> print '分区有{}个'.format(rdd.getNumPartitions())
分区有2

distinct

distinct(numPartitions=None)

Return a new RDD containing the distinct elements in this RDD.

>>> res = sorted(sc.parallelize([1, 1, 1, 2, 3, 2, 3]).distinct().collect())
>>> print '去重后的结果:{}'.format(res)                                        
去重后的结果:[1, 2, 3]

sample

sample(withReplacement, fraction, seed=None)

Return a sampled subset of this RDD.
Parameters:
withReplacement – can elements be sampled multiple times (replaced when sampled out)

fraction – expected size of the sample as a fraction of this RDD’s size without replacement: probability that each element is chosen; fraction must be [0, 1] with replacement: expected number of times each element is chosen; fraction must be >= 0

seed – seed for the random number generator
对原RDD进行采样,其中withReplacement表示是否有放回的抽样,fraction表示采样大小是原RDD的百分比,seed表示随机数生成器。fraction和seed相同,则每次返回的数据相同

>>> rdd = sc.parallelize(range(100), 4)
>>> y = rdd.sample(False, 0.1, 81)
>>> y.collect()
[4, 27, 40, 42, 43, 60, 76, 80, 86, 97]                                         
>>> y.count()
10

union 并集

union(other)

Return the union of this RDD and another one.

>>> rdd = sc.parallelize([1, 1, 2, 3])
>>> rdd1 = sc.parallelize([5, 3, 4, 6])
>>> print rdd.union(rdd1).collect()
[1, 1, 2, 3, 5, 3, 4, 6] 

intersection 交集

intersection(other)

Note that this method performs a shuffle internally.

>>> rdd = sc.parallelize([1, 1, 2, 3])
>>> rdd1 = sc.parallelize([5, 3, 4, 6])
>>> print rdd.intersection(rdd1).collect()
[3]

sortByKey

sortByKey(ascending=True, numPartitions=None, keyfunc=func)

Sorts this RDD, which is assumed to consist of (key, value) pairs.

>>> tmp = [('a', 1), ('f', 2), ('d', 3), ('c', 4), ('b', 5)]
>>> rdd = sc.parallelize(tmp, 2)
>>> print rdd.glom().collect()
[[('a', 1), ('f', 2)], [('d', 3), ('c', 4), ('b', 5)]]                          
>>> sort1 = rdd.sortByKey(True,1).glom().collect()
>>> sort2 = rdd.sortByKey(True,3).glom().collect()
>>> print sort1
[[('a', 1), ('b', 5), ('c', 4), ('d', 3), ('f', 2)]]
>>> print sort2
[[('a', 1), ('b', 5)], [('c', 4), ('d', 3)], [('f', 2)]]

sortBy

sortBy(keyfunc, ascending=True, numPartitions=None)

跟sortByKey类似,用索引的方式指定根据什么sort

>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
>>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]                              
>>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()
[('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]

cartesian

cartesian(other)

返回两个rdd的笛卡尔积。

>>> rdd = sc.parallelize([1, 2])
>>> rdd_1 = sc.parallelize([3,4])
>>> rdd.cartesian(rdd_1).collect()
[(1, 3), (1, 4), (2, 3), (2, 4)]  

groupBy

groupBy(f, numPartitions=None, partitionFunc=<function portable_hash>)

根据f逻辑结果进行分组。

>>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
>>> result = rdd.groupBy(lambda x: x % 2).collect()
>>> sorted([(x, sorted(y)) for (x, y) in result])
[(0, [2, 8]), (1, [1, 1, 3, 5])]

groupByKey

groupByKey(numPartitions=None, partitionFunc=<function portable_hash>)

groupByKey([numTasks])是数据分组操作,在一个由(K, V)键值对组成的数据集上调用,返回一个(K, Seq[V])对的数据集。

注意,如果要对每个键执行聚合(比如求和或平均值),使用reduceByKey或aggregateByKey将提供更好的性能。

>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.groupByKey().mapValues(len).collect())
[('a', 2), ('b', 1)]                                                            
>>> sorted(rdd.groupByKey().mapValues(list).collect())
[('a', [1, 1]), ('b', [1])]

pipe

pipe(command, env=None, checkCode=False

将由管道元素创建的RDD返回到一个forked外部进程。
参数:checkCode——是否检查shell命令的返回值。

>>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()
[u'1', u'2', u'', u'3']

>>> rdd = sc.parallelize(['spark2.3.0', 'kafka', 'hbase'])
>>> rdd2 = rdd.pipe('grep -i "ar"')
>>> print '经过pipe处理过后的数据:{}'.format(rdd2.collect())
经过pipe处理过后的数据:[u'spark2.3.0']  

foreach

foreach(f)

Applies a function to all elements of this RDD.

>>> def f(x): print(x)
... 
>>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)

max, min, sum, count

>>> x = sc.parallelize(range(10))
>>> x.collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> print '最大值:{}'.format(x.max())
最大值:9                                                                       
>>> print '最小值:{}'.format(x.min())
最小值:0
>>> print '总和:{}'.format(x.sum())
总和:45
>>> print '总个数:{}'.format(x.count())
总个数:10

mean, variance, sampleVariance, stdev, sampleStdev

>>> x.collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> print '平均值:{}'.format(x.mean())
平均值:4.5                                                                     
>>> print '方差:{}'.format(x.variance())
方差:8.25
>>> print '样本方差:{}'.format(x.sampleVariance())
样本方差:9.16666666667
>>> print '总体标准偏差:{}'.format(x.stdev())
总体标准偏差:2.87228132327
>>> print '样本标准偏差:{}'.format(x.sampleStdev())
样本标准偏差:3.0276503541

countByKey, countByValue

countByKey()

Count the number of elements for each key, and return the result to the master as a dictionary.

countByValue()

Return the count of each unique value in this RDD as a dictionary of (value, count) pairs.

>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.countByKey().items())
[('a', 2), ('b', 1)] 

>>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
[(1, 2), (2, 3)]

first, top, take, takeOrdered

first()
top(num, key=None)

从按降序排列的RDD中获取前N个元素,或者有可选的key函数决定顺序。

take(num)

Take the first num elements of the RDD.

akeOrdered(num, key=None)

从按升序排列的RDD中获取N个元素,或者由可选key函数指定。

>>> rdd = sc.parallelize([10, 4, 2, 12, 3])
>>> rdd.first()
10                                                                              
>>> rdd.top(1)
[12]
>>> rdd.top(2)
[12, 10]
>>> rdd.top(2,key=str)
[4, 3]
>>> rdd.take(3)
[10, 4, 2]    

subtract

subtract(other, numPartitions=None)

Return each value in self that is not contained in other.

>>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
>>> y = sc.parallelize([("a", 3), ("c", None)])
>>> sorted(x.subtract(y).collect())
[('a', 1), ('b', 4), ('b', 5)]

cache

cache()

Persist this RDD with the default storage level (MEMORY_ONLY).

presist

persist()

Set this RDD’s storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. If no storage level is specified defaults to (MEMORY_ONLY).

cache只有一个默认的缓存级别MEMORY_ONLY ,而persist可以根据情况设置其它的缓存级别。

reduce

reduce(f)

Reduces the elements of this RDD using the specified commutative and associative binary operator. Currently reduces partitions locally.

>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
15
>>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
10

reduceByKey

reduceByKey(func, numPartitions=None, partitionFunc=<function portable_hash>)

Merge the values for each key using an associative and commutative reduce function.
This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.
Output will be partitioned with numPartitions partitions, or the default parallelism level if numPartitions is not specified. Default partitioner is hash-partition.

把一个函数作用在一个RDD上,这个函数必须接收两个参数,reduce把结果继续和RDD的下一个元素做累积计算。注意:reduceByKey的函数是针对具有相同key的二元组

>>> from operator import add
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> sorted(rdd.reduceByKey(add).collect())
[('a', 2), ('b', 1)] 

>>> num = sc.parallelize([1, 4, 2, 3, 4, 4, 2, 4]) 
>>> pairs = num.map(lambda x: (x, 1))
>>> pairs.collect()
[(1, 1), (4, 1), (2, 1), (3, 1), (4, 1), (4, 1), (2, 1), (4, 1)]                
>>> a = pairs.reduceByKey(lambda x, y: x+y+1)
>>> a.collect()
[(2, 3), (4, 7), (1, 1), (3, 1)]
>>> b = pairs.reduceByKey(lambda x, y: x+y+2)
>>> b.collect()
[(2, 4), (4, 10), (1, 1), (3, 1)

  1. 老子《道德经》第三十章,老子故里,中国鹿邑。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352

推荐阅读更多精彩内容