【2019-06-23】RDD编程

image.png

(1)弹性分布数据集

RDD以分区的形式分布在集群中多个机器上,每个分区代表了数据集的一个子集。分区定义了spark中数据的并行单位。Spark 框架并行处理多个分区,一个 分区内的数据对象则是顺序处理。创建 RDD 最简单的方法是在本地对象集合上调用 SparkContext 的 parallelize 方法。

(2)RDD基础

#在python中使用textFile创建一个字符串的RDD
'''
测试文本abc.txt
Jimmy   A   1
Jimmy   B   8
Sam C   10
Tom A   7
'''
lines=sc.textFile("/user/test/abc.txt")
pythonlines=lines.filter(lambda line:"Jimmy" in line)
 pythonlines.first()
#u'Jimmy\tA\t1'

spark或shell 会话都按照如下方式工作:
(i)从外部数据创建出输入RDD
(ii)使用诸如filter()这样的转化操作对RDD进行转化,以定义新的RDD
(iii)告诉spark对需要被重用的中间结果RDD执行persist()操作。
(iv)使用行动操作来触发一次并行计算,spark会对计算进行优化后再执行。

(3)创建RDD

#python 中的parallelize()方法
lines=sc.parallelize(["pandas","i like pands"])

(4)RDD操作
RDD 操作两种类型:转化操作和行动操作

# filter 转化操作
lines=sc.textFile("/user/test/abc.txt")
flines=lines.filter(lambda x:"Sam" in x)

# union() 转化操作
glines=lines.filter(lambda x:"Tom" in x)
linesunion=flines.union(glines)

# 行动操作,计数
print linesunion.count()
# 2

(5)向spark传递函数

# 在python中传递函数 
word=lines.filter(lambda s:"Tom" in s)
def containsTom(s):
  return "Tom" in s
word=lines.filter(containsTom)

在scala中,可以把定义的内联函数、方法的引用或静态方法传递给spark。

#scala 中传递函数
import org.apache.spark.rdd.RDD
class SearchFunction(val query:String){
      def isMatch(s:String):Boolean={
      s.contains(query)
      }
      def getMatchedNoReference(rdd:RDD[String]):RDD[String]={
      val query_ = this.query
     rdd.flatMap(x => x.split(query_))
    }
}

(4)常见的转化操作和行动操作
基本RDD
--针对各个元素的转化操作
最常用的转化操作是map()和filter()。转化操作map()接收一个函数,把这个函数用于RDD的每个元素,将函数的返回结果作为结果RDD中的对应元素的值。

map

# python 中计算RDD中各个值的平方
nums=sc.parallelize([1,2,3,5])
squared=nums.map(lambda x:x*x).collect()
for num in squared:
      print "%i" %(num)
'''
1
4
9
25
'''
// scala中计算RDD中各个值的平方
val input=sc.parallelize(List(1,2,3,4))
val result=input.map(x=>x*x)
println(result.collect().mkString(","))
// 1,4,9,16  

flatMap

#python 中的flatMap()将行数据切分为单词
lines = sc.parallelize(["hello world","hi"])
words=lines.flatMap(lambda line:line.split(" "))
words.first()
# hello

//scala 中的flatMap()中将行数据切分为单词
val lines=sc.parallelize(List("hello world","hi"))
val words=lines.flatMap(line=>line.split(" "))
words.first()
//res1: String = hello

伪集合操作
最简单的集合操作是union(other),返回一个包含两个RDD中所有元素的RDD。RDD.distinct()转化操作可以生成一个包含不同元素的新RDD,distinct开销很大,需要将所有数据通过网络进行混洗(shuffle)。
spark 提供了intersection(other)方法,只返回两个RDD中都有的元素。intersection会在运行时去掉所有重复的元素(单个RDD内重复的元素也会一起移除),intersection()和union()的概念类似,但是intersection性能差很多,它需要通过网络混洗数据来发现共有的元素。
计算笛卡尔积 cartesian(other)转化操作会返回所有可能的(a,b)对。
单个RDD操作
map() 将函数应用于RDD中的每个元素,将返回值构成新的RDD,rdd.map(x=>x+1)
flatMap()将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD,通常用来切分单词。rdd.flatMap(x=>x.to(3))
filter() 返回一个由通过传给filter()的函数的元素组成吃的RDD。rdd.filter(x=>x!=1)
distinct()去重 rdd.distinct()
sample(withReplacement,fraction,[seed]) 采样,是否替换。 rdd.sample(false,0.5)

两个RDD操作
union 生成一个包含两个RDD中所有元素的RDD。rdd.union(other)
intersection() 求两个RDD共同的元素的RDD。rdd.intersection(other)
subtract() 移除一个RDD中的内容。rdd.subtract()
cartesian() 与另一个RDD的笛卡尔积。 rdd.cartesian(other)

行动操作
基本的RDD最长见的行动操作reduce()。接收一个函数作为参数,这个函数要操作两个相同元素类型的RDD数据并返回一个同样类型的新元素。

#python 中的reduce()
sum=rdd.reduce(lambda x,y:x+y)


//scala中的reduce()
val sum=rdd.reduce((x,y)=>x+y)

fold和reduce类似,接收一个与reduce()接收的函数签名相同的函数,再加上一个初始值来作为每个分区第一次调用时的结果。
aggregate()函数把从返回值类型必须与所操作的RDD类型相同的限制中解放出来。

#python 中的aggregate()
nums=sc.parallelize([1,2,3,4])
seq0p=(lambda x,y:(x[0]+y,x[1]+1))
comb0p=(lambda x,y:(x[0]+y[0],x[1]+y[1]))
nums.aggregate((0,0),seq0p,comb0p)
//(10, 4)



//scala中的aggregate()
val result=input.aggregate((0,0))(
(acc,value)=>(acc._1+value,acc._2+1),
(acc1,acc2)=>(acc1._1+acc2._1,acc1._2+acc2._2)
)

基本的RDD行动操作
collect() 返回RDD中的所有元素。rdd.collect()
count() RDD中的元素个数。 rdd.count()
countByValue() 各元素在RDD中出现的次数。rdd.countByValue()
take(num) 从RDD中返回num个元素。rdd.take(num)
top(num) 从RDD中按照提供的顺序返回最前面的num个元素。rdd.top()
takeOrdered(num) 从RDD中按照提供的顺序返回最前面的num个元素。rdd.takeOrdered(num)
takeSample(withReplacement,num,[seed]) 从RDD中返回一些任意元素。
reduce(func) 并行整合RDD中所有数据。rdd.reduce((x,y)=>x+y)
fold(zero)(func) 和reduce一样,但是需要初始值。rdd.fold(0)((x,y)=>x+y)
aggregate(zerovalue)(seq0p,comb0p)和reduce相似,但是通常返回不同类型的函数。rdd.aggregate((0,0))((x,y)=>(x._1+y,x._2+1),(x,y)=>(x._1+y._1,x._2+y._2)
foreach(func) 对RDD中的每个元素使用给定的函数。rdd.foreach(func)

(5)持久化(缓存)
SparkRDD是惰性求值的。有时候希望能多次使用同一个RDD,可以给RDD选择不同的持久化级别。在Scala和Java中,默认情况下,persist()会把数据以序列化的形式缓存在JVM的堆空间中,在python中,会始终序列化要持久化存储的数据,所以持久化级别默认值就是以序列话后的对象存储在JVM的堆空间中。

持久化级别
//scala中使用persist()
import org.apache.spark.storage.StorageLevel
val result=input.map(x=>x*x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))
)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,542评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,822评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,912评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,449评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,500评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,370评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,193评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,074评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,505评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,722评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,841评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,569评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,168评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,783评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,918评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,962评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,781评论 2 354

推荐阅读更多精彩内容