Spar05 RDD 转换算子

RDD 简介

rdd 是 spark 对数据的核心抽象,全称弹性分布式数据集( Resilient Distributed Dataset 简称 RDD )
spark 中的 RDD 是一个不可变的分布式对象集合。每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点。RDD 可以包含 Python、Java、Scala 中任意类型的数据对象,甚至可以包含用户自定义对象。
在 spark 中对 RDD 的操作有创建 RDD、转化已有的 RDD 以及调用 RDD 操作进行求值。

创建 RDD

parallelize 方法

python 中的 parallelize() 方法

>>> sc.parallelize(['a','b','c'])
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

Scala 中的 parallelize() 方法

>>> sc.parallelize(['a','b','c'])
ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:195

textFile 方法

python textFile() 方法

sc.textFile('/path/file')

Scala textFile() 方法

sc.textFile('/path/file')

文件路劲说明
hadoop 的配置 $HADOOP_HOME/etc/hadoop/core-site.xml 中配置

 <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>

如果有上述配置textFile 默认从hdfs中读取文件
如果需要指定
指定从本地文件中读取

sc.textFile('file:///path/file')

指定从hdfs文件中读取

sc.textFile('hdfs:/path/file')

db中读取数据创建 rdd

可以从db读取数据组装成 list 在通过parallelize 方式创建

有了 RDD 必然需要多 RDD 中的数据进行操作 对 RDD 操作的函数 也称为算子

map 算子

将算子应用于每一个数据并返回一个新的RDD
python

>>> rdd = sc.parallelize([1,2,3])
>>> rdd1=rdd.map(lambda x:x+1)
>>> rdd1.collect()
[2, 3, 4]  

创建一个数字列表RDD map将每个数字加1操作
Scala

scala> val rdd = sc.parallelize(List(1,2,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val rdd1 = rdd.map(_+1)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:25
scala> rdd1.collect()
res0: Array[Int] = Array(2, 3, 4)   

从python和Scala的方式来看 语法上略有差异 本质操作还是一样的

flatMap 算子

flatMap 也是作用于每个元素并返回值,将新值组装一个新的 rdd 与 map 不同的是 map返回一个元素 flatMap返回一个列表 flatMap通常用来切分单词
python

>>> rdd = sc.parallelize(['张三 李四','王五 李丽'])
>>> rdd1 = rdd.flatMap(lambda line : line.split(' '))
>>> rdd1.collect()
['张三', '李四', '王五', '李丽']

假如用map做同样操作

>>> rdd = sc.parallelize(['张三 李四','王五 李丽'])
>>> rdd1 = rdd.map(lambda line : line.split(' '))
>>> rdd1.collect()
[['张三', '李四'], ['王五', '李丽']]

通过比较结果很清晰flatMap与map不同。 map返回元素作为rdd 的一个元素 即使是list 也是 rdd 的一个元素 只是这个元素是list flatMap返回list中每个元素都是rdd中的一个元素

Scala

scala> val rdd = sc.parallelize(List("张三 李四","王五 李丽"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:24

scala> val rdd1 = rdd.flatMap(_.split(" "))
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at flatMap at <console>:25

scala> rdd1.collect()
res1: Array[String] = Array(张三, 李四, 王五, 李丽)

distinct 算子

去重
python

>>> rdd = sc.parallelize(['a','a','b','c'])
>>> rdd1 = rdd.distinct()
>>> rdd1.collect()
['b', 'c', 'a'] 

Scala

scala> val rdd = sc.parallelize(List("a","a","b","c"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24

scala> val rdd1 = rdd.distinct()
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at distinct at <console>:25
scala> rdd1.collect()
res2: Array[String] = Array(a, b, c)

sample 算子

从原 rdd 中抽样 有三个参数

  • withReplacement: true表示有放回的采样,false表示无放回采样
  • fraction:期望样本的大小作为RDD大小的一部分
    当withReplacement=false时:选择每个元素的概率;分数一定是[0,1] ;
    当withReplacement=true时:选择每个元素的期望次数; 分数必须大于等于0。
  • seed:随机数生成器的种子

如: 元素不可以多次抽样:withReplacement=false,每个元素被抽取到的概率为0.5:fraction=0.5
python

>>> rdd = sc.parallelize(['a','a','b','c'])
>>> rdd.sample(False,0.5).collect()
['b', 'c']
>>> rdd.sample(False,0.5).collect()
['a', 'c']
>>> rdd.sample(False,0.5).collect()
['a', 'b', 'c']

如: 元素可以多次抽样:withReplacement=true,每个元素被抽取到的期望次数为2:fraction=2
python

>>> rdd = sc.parallelize(['a','a','b','c'])
>>> rdd.sample(True,2).collect()
['a', 'a', 'a', 'b', 'b', 'b', 'c', 'c', 'c']
>>> rdd.sample(True,2).collect()
['a', 'a', 'a', 'a', 'b', 'c', 'c']
>>> rdd.sample(True,2).collect()
['a', 'a', 'a', 'b', 'c', 'c', 'c']

以上算子都是对单个 rdd 操作 下面介绍两个 rdd 之间操作的算子

union 算子

生成一个包含两个rdd中所有元素的rdd
python

>>> rdd = sc.parallelize(['a','b','c'])
>>> rdd1 = sc.parallelize(['c','d','d'])
>>> rdd.union(rdd1).collect()
['a', 'b', 'c', 'c', 'd', 'd']

intersection 算子

求两个rdd的共同元素

>>> rdd = sc.parallelize(['a','b','c'])
>>> rdd1 = sc.parallelize(['c','d','d'])
>>> rdd.intersection(rdd1).collect()
['c']

这个返回结果会去重
比如:

>>> rdd = sc.parallelize(['a','b','c','c'])
>>> rdd1 = sc.parallelize(['c','d','d'])
>>> rdd.intersection(rdd1).collect()
['c']

rdd 中有两个'c' 通过intersection后仍然只有一个'c'

subtract 算子

移除一个 RDD中的内容

>>> rdd = sc.parallelize(['a','b','c','c','a'])
>>> rdd1 = sc.parallelize(['c','d','d'])
>>> rdd.subtract(rdd1).collect()
['a', 'a', 'b']

次例中表示从 rdd中移除含rdd1中元素

cartesian 算子

计算两个算子的笛卡尔积

>>> rdd = sc.parallelize(['a','b','c'])
>>> rdd1 = sc.parallelize(['1','2','3'])
>>> rdd.cartesian(rdd1).collect()
[('a', '1'), ('a', '2'), ('a', '3'), ('b', '1'), ('b', '2'), ('b', '3'), ('c', '1'), ('c', '2'), ('c', '3')]

在计算相似度时特别有用 比如rdd是用户集 rdd1上商品集 需要给用户推荐商品 需计算用户与每个商品的推荐指数 就会用到用户集和商品集的笛卡尔乘积。只是特别注意,求大规模的rdd笛卡尔积开销巨大

以上介绍 rdd 基本算子 下一篇介绍 rdd 行动算子

glom 算子

按分区进行分组
Python

>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
>>> rdd.glom().collect()
[[1, 2], [3, 4], [5, 6], [7, 8, 9, 10]]

说明将数据分为4个分区 第一个分区有数据 1,2 。 第二个分区数据 3,4。第三个分区 5,6 。 第四个分区 7,8,9,10

repartition 将 RDD 进行重新分区

Pyhon

>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
>>> rdd.glom().collect()
[[1, 2], [3, 4], [5, 6], [7, 8, 9, 10]]
>>> rdd.repartition(2).glom().collect()
[[1, 2, 5, 6, 7, 8, 9, 10], [3, 4]]

repartition 会对RDD 数据进行重新组合比较消息性能,但在某些特定场合配置foreachPartition 使用。会对新能有极大提升

conf = SparkConf().setMaster("local[*]").setAppName("My Demo")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
rdd1 = rdd.repartition(2)
def func(ite):
  for i, value in enumerate(ite):
    # 这里会在executor中执行 这里print 只是个例子 说明怎么使用ite
    print(value)

rdd1.foreachPartition(func)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,657评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,889评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,057评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,509评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,562评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,443评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,251评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,129评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,561评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,779评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,902评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,621评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,220评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,838评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,971评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,025评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,843评论 2 354