Spark06 RDD 行动算子

行动操作

行动操作会把最终计算的结果返回到驱动程序,或写入外部的存储系统中。由于行动操作需要产生实际的输出,他们会强制执行那些必须用到的RDD的转换操作。

非行动操作的其他操作都是惰性求值的。这意味着在被调用行动操作之前Spark不会开始计算。

惰性求值意味着当我们对 RDD 调用转换操作 (例如 map() )时,操作不会立即执行。相反,Spark 会在内部记录下所要求的执行的操作的相关信息。我们不应该把 RDD 看做存放特定数据的数据集,而最好把每个 RDD 看做通过转化操作构建出来、记录如何计算的指令集。把数据读取到 RDD 仍然是惰性的。所以当调用 textFile() 时,数据不会读取进来, 而是在必要时才会读取。

常见的非行动算子有 map flatMap filter distinct sample union intersection subtract cartesian 等 用法详情见上一篇 Spar05 RDD 转换算子

collect 算子

返回 RDD 中所有元素

Python

>>> rdd = sc.parallelize(['a','b','c'])
>>> rdd.collect()
['a', 'b', 'c']

count 算子

计算 RDD 中的元素个数

Python

>>> rdd = sc.parallelize(['a','b','c'])
>>> rdd.count()
3

countByValue 算子

计算各个元素在 RDD 中出现的次数

Python

>>> rdd = sc.parallelize(['a','a','b','c'])
>>> rdd.countByValue()
defaultdict(<class 'int'>, {'a': 2, 'b': 1, 'c': 1})

take(num) 算子

从 RDD 中返回 num个元素

Python

>>> rdd = sc.parallelize(['a','b','c'])
>>> rdd.take(2)
['a', 'b']

top(num)

从RDD中返回最前面的 num 个元素

>>> rdd = sc.parallelize(['a','b','c'])
>>> rdd.top(2)
['c', 'b']

takeOrdered(num,key=ordering)

从 RDD 中按照顺序返回前num个元素

Python

>>> rdd = sc.parallelize([1,3,5,2,9,7])
>>> rdd.takeOrdered(3,key=lambda x:-x)
[9, 7, 5]

对复杂类型操作

Python

>>> rdd = sc.parallelize([('a',1),('b',3),('c',5),('d',2),('e',9),('f',7)])
>>> rdd.takeOrdered(3,key=lambda x:x[1])
[('a', 1), ('d', 2), ('b', 3)]

takeSample(withReplacement,num,[sed])

从 RDD 中返回任意一些元素
withReplacement: true表示有放回的采样,false表示无放回采样
num: 表示返回的采样数据的个数
sed: 表示用于指定的随机数生成器种子

Python

>>> rdd = sc.parallelize([1,3,5,2,9,7])
>>> rdd.takeSample(False,3)
[3, 9, 1]
>>> rdd.takeSample(False,3)
[2, 3, 5]
>>> rdd.takeSample(False,3)
[7, 3, 9]
>>> rdd.takeSample(False,3)
[7, 9, 1]

reduce(func)

并行整合 RDD 中所有的元素 例如 就和

Python

>>> rdd = sc.parallelize([1,3,5,2,9,7])
>>> rdd.reduce(lambda x,y:x+y)
27

fold(zero,func)

与 reduct 类型 需要提供一个初始值 zero是0值 也称为初始值

Python

>>> rdd = sc.parallelize([1,3,5,2,9,7])
>>> rdd.fold(0,lambda x,y:x+y)
27

与reduce 返回结果一致 假如zero不是0 而是1 结果是多少呢

>>> rdd.fold(1,lambda x,y:x+y)
32

结果并不是28而是32
这是为什么呢?
因为 RDD 是分区存放的 计算 fold 每个分区计算 让后将各个分区计算的结果在计算。所以每个分区计算时都有 zero 值
我们来看 rdd 的分区情况

>>> rdd.glom().collect()
[[1], [3, 5], [2], [9, 7]]

所以计算逻辑为:
先计算各个分区 第一列为 zero
1 + 1 = 2
1 + 3 + 5 = 9
1 + 2 = 3
1 + 9 + 7 = 17
各个分区求和:
1 + 2 + 9 + 3 + 17 = 32 (第一个1位zero)

aggregate(zeroValue, seqOp, combOp)

zeroValue:初始值
seqOp:每个分区操作
combOp:分区间操作

例如求 1,2,3,4,5,6,7,8,9,10求和
Python

>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
>>> rdd.aggregate(0,lambda acc,value:acc+value,lambda acc1,acc2:acc1+acc2)
55

seqOp:的第一个参数 acc 是上一次计算结果 value 是当前值
combOp:参数 acc1和acc2 都是各个分区计算结果

在 例如求 1,2,3,4,5,6,7,8,9,10 的平均值

Python

>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
>>> res = rdd.aggregate((0,0),lambda acc,value:(acc[0]+value,acc[1]+1),lambda acc1,acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1]))
>>> res[0]/res[1]
5.5

foreach(func)

对每个元素使用给定的函数 func

Python

>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
>>> rdd.foreach(lambda x:print(str(x)+'\n'))
1

2

5

6

3

4

7

8

9

10

持久化(缓存)

如前所说,Spark RDD 是惰性求值的,而有时我们期望能多次使用同一个 RDD。如果简单的对 RDD 做调用操作,Spark 每次会重新计算 RDD 以及它的所有依赖。这在迭代算法中消耗格外大,因为迭代算法长长会多次使用同一组数据。例如

Python

>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
>>> rdd1 = rdd.map(lambda x : x * x)
>>> rdd1.count()
10
>>> rdd1.collect()
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

在调用 rdd1.count 和 rdd1.collect() 都会计算 map。为了避免多次计算同一个 RDD ,可以让Spark 对数据进行持久化。Spark 会将 RDD 的节点分别保存到他们求出的分区上。如果有一个节点数据丢失,Spark 会计算丢失分区的数据。持久化方法persist

Python

>>> import pyspark.StorageLevel
>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
>>> rdd1 = rdd.map(lambda x : x * x)
>>> rdd1.persist(StorageLevel.MEMORY_ONLY)

下表是各个缓存级别及说明:

级别 使用空间 CPU时间 是否内存中 是否磁盘上 备注
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK 中等 部分 部分 如果数据在内存放不下,则溢写到磁盘
MEMORY_AND_DISK_SER 部分 部分 如果数据在内存放不下,则溢写到磁盘。在内存中存放序列化的数据
DISK_ONLY

如果有必要在存储级别的末尾加上 '_2' 来把持久化数据存为两份

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,509评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,806评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,875评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,441评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,488评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,365评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,190评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,062评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,500评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,706评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,834评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,559评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,167评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,779评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,912评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,958评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,779评论 2 354