Spark08 RDD KV数据

键值对 RDD 是 Spark 中许多操作所需要的常见数据类型。Spark 为包含简直对类型的 RDD 提供了一些专有的操作。这些 RDD 被称为 pair RDD。

创建 Pair RDD

在 Spark 中有很多种方法创建 Pair RDD。很多存储键值对的数据格式会在读取值时直接返回其键值对数据组成的 pair RDD。普通的 RDD 可通过 map 转换为 pair RDD。

构建键值对 RDD 的方法在不同语言中会有所不同。在 Python 中,需要返回一个二元组组成的 RDD

>>> rdd=sc.parallelize(['zhang 1','li 2'])
>>> pairs = rdd.map(lambda x : (x.split(' ')[0],x.split(' ')[1]))
>>> pairs.collect()
[('zhang', '1'), ('li', '2')]

Pair RDD 的转换操作

以键值对集合[(1,2),(3,4),(3,6)] 为例

函数名 目的 实例 结果
                                 
reduceByKey                                                                       
合并具有相同键的值 rdd.reduceByKey(lambda x,y : x + y) [('3', '46'), ('1', '2')]
groupByKey 对具有相同键值进行分组 rdd.groupByKey() [(1,[2]),(3,[4,6])]
combineByKey 使用不同的返回类型合并具有相同的键的值
mapValues(func) 对 pari RDD 的每个值应用一个函数而不改变键 rdd.mapValues(lambda x : x + 1) [(1, 3), (3, 5), (3, 7)]
flatMapValues 对 Pair RDD 中的每个值应用一个返回迭代器的函数,然后对每个元素都生成一个对应原键的键值对记录。 rdd.flatMapValues(lambda x : list(range(int(x)))) [('1', 0), ('1', 1), ('3', 0), ('3', 1), ('3', 2), ('3', 3), ('3', 0), ('3', 1), ('3', 2), ('3', 3), ('3', 4), ('3', 5)]
keys 返回一个仅包含键的 RDD rdd.keys() ['1', '3', '3']
values 返回一个仅包含值的 RDD rdd.values() ['2', '4', '6']
sortByKey 返回一个根据键排序的 RDD rdd.sortByKey() [('1', '2'), ('3', '4'), ('3', '6')]

针对两个 pair RDD 的转化操作

rdd = [(1,2),(3,4),(3,6)]
other = [(3,9)]

函数名 目的 实例 结果
                                 
subtractByKey                                                                        
删除 rdd 中键与 other 相同的键相同的元素 rdd.subtractByKey(other) [(1, 2)]
join 对两个 RDD 进行内连接 rdd.join(other) [(3, (4, 9)), (3, (6, 9))]
rightOuterJoin 对两个 RDD 进行右连接 rdd.rightOuterJoin(other) [(3, (4, 9)), (3, (6, 9))]
leftOuterJoin 对两个 RDD 进行左连接 rdd.leftOuterJoin(other) [(1, (2, None)), (3, (4, 9)), (3, (6, 9))]
rdd.cogroup 将两个 RDD 相同的键分组到一起 rdd.cogroup(other) [1,([2],[]),(3,([4,6],[9]))]

行动操作

例如:rdd = [(1,2),(3,4),(3,6)]

函数名 目的 实例 结果
countByKey 对每个键进对应的元素分别计数 rdd.countByKey() [(1, 1), (3, 2)]
collectAsMap 将结果以映射表的形式返回, rdd.collectAsMap() {1:2,3:6}
lookup 返回给定键的所有值 rdd.lookup(3) [4,6]

其他操作

键值对 RDD 也是 RDD 支持 RDD 所有算子 键值对 RDD 都支持
举例
Pyhton

>>> rdd = sc.parallelize([(1,2),(3,4),(3,6)])
# 筛选第二个元素大于3的元素
>>> res_rdd = rdd.filter(lambda kv: kv[1] > 3)
>>> res_rdd.collect()
[(3, 4), (3, 6)]

上述图标只是简单说明 键值对 RDD 的一些操作 下面通过一些实例来进一步说明其中一些操作。有些操作比较简单易懂,所以举例不会涵盖所有操作。

求平均值

给定一下数据求平均值,会设计 mapValues 和 reduceByKey

key value
a 1
b 0
b 3
c 2
d 6
d 9
d 7

通过 mapValues 转换为如下结果

key value
a (1,1)
b (0,1)
b (3,1)
c (2,1)
d (6,1)
d (9,1)
d (7,1)

value 为元组 元组第一个值位 key 对应的值 元组第二个值位 次数。要求平均值,需要就出所有值的和,和次数之和
通过 reduceByKey 求出 值之和,和次数之和
如下

key value
a (1,1)
b (3,2)
c (2,1)
d (22,3)

有了次数之和,和 value 之和 求平均值 value 之和除以次数

完整代码如下
Python

>>> rdd = sc.parallelize([('a',1),('b',0),('b',3),('c',2),('d',6),('d',7),('d',9)])
>>> map_rdd = rdd.mapValues(lambda x : (x,1))
>>> result = map_add.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])
>>> result.collect()
[('b', (3, 2)), ('c', (2, 1)), ('a', (1, 1)), ('d', (22, 3))]
# 计算平均值
>>> result = result.mapValues(lambda x : x[0] / x[1])
[('b', 1.5), ('c', 2.0), ('a', 1.0), ('d', 7.333333333333333)]

combineByKey 算子说明

combineByKey 算子的参数较多 在表格中没有举例用法
combineByKey 有三个参数
createCombiner: 初始化方法,如果是新元素 combineByKey 都会调用改方法来初始化,注意:这一过程会在每个分区中第一次出现各个键时发生。而不是在整个 RDD 中第一次出现一个键时发生
mergeValue: 分区内合并方法
mergeCombiners:分区间合并方法
利用 combineByKey 求平均值
Python

 rdd = sc.parallelize([('a',1),('b',0),('b',3),('c',2),('d',6),('d',7),('d',9)])
>>> res_rdd = rdd.combineByKey((lambda x:(x,1)),(lambda x,y:(x[0]+y,x[1]+1)),(lambda x,y:(x[0]+y[0],x[1]+y[1])))
>>> res_rdd.collect()
[('b', (3, 2)), ('c', (2, 1)), ('a', (1, 1)), ('d', (22, 3))]
# 计算平均值
>>> res_rdd = res_rdd.mapValues(lambda x : x[0] / x[1])
[('b', 1.5), ('c', 2.0), ('a', 1.0), ('d', 7.333333333333333)]

combineByKey 处理数据流程图
原数据为:

key value
a 1
b 0
b 3
c 2
d 6
d 9
d 7

假设分为3个区
分区一:

a 1
b 0
b 3

分区二:

c 2
d 6
d 9

分区三:

d 7

分区中合并间 后:
这里调用 (lambda x,y:(x[0]+y,x[1]+1)) 值相加 次数+1
如果遇到新元素会调用 (lambda x:(x,1)), 转换为 值-次数的 kv结构
分区一:

a (1,1)
b (3,2)

分区二:

c (2,1)
d (15,2)

分区三:

d (7,1)

分区见合并
调用方法: (lambda x,y:(x[0]+y[0],x[1]+y[1]))
所有值相加,次数相加

a (1,1)
b (3,2)
c (2,1)
d (22,3)

sortByKey 算子

sortByKey 两个参数
ascending: 是否结果为升序 默认True。True 为升序 False 为降序
keyfunc: 自定义排序规则
例:
Python

sortByKey(ascending=True, numPartitions=None, keyfunc=lambda x: str(x)):
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,544评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,430评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,764评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,193评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,216评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,182评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,063评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,917评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,329评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,543评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,722评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,425评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,019评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,671评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,825评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,729评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,614评论 2 353