Spark Chapter 4 RDD 常用算子

# 1 RDD 常用操作

*transformations*, which create a new dataset from an existing one

RDDA ---transformations-->RDDB

RDDB = RDDA.map(...)

【面试重点】lazy——transformations 不会立刻计算内容,只会记录关系,只有遇到action才会进行计算。

操作包含:map/filter/gtoup by/distinct

*actions*, which return a value to the driver program after running a computation on the dataset.

操作包含:count,reduce、collect/^

【特点】

* transformation are lazy,nothing actually happens until an action is called;

* action triggers the computation;

* action returns values to driver or writes data to external storage;

# 2 Transformation算子

## 1map

map(func):将func函数作用到每个数据集上面,生成一个新的分布式数据集返回

```

def my_map2():

    data = ['cat','dog','tiger','lion']

    rdda = sc.parallelize(data)

    rddb = rdda.map(lambda x:(x,1))

    print(rddb.collection())

```

## 2filter

选出所有func返回值为true的值的元素,生成一个新的分布式数据集返回

```

def my_map():

    data = [1,2,3,4,5]

    rdd1 = sc.parallelize(data)

    rdd2 = rdd1.map(lambda x:x*2)

    rdd3 = rdd2.filter(lambda x:x>5)

    print(rdd3.collect())


<==>

# 链式编程

 print(sc.parallelize(data).map(lambda x:x*2).filter(lambda x:x>5).collect())

```

## 3flatMap

输入的item能够被map到0or多个items输出,返回值是一个Sequence

```

def my_flatMap():

  data = ['hello spark','hello world','hello world']

  rdd = sc.parallelize(data)

  print(rdd.flatMap(lambda line:line.split(" ")).collection())

```

 不是很懂这个区别

## 4groupByKey

把相同的Key的数据分发到一起

```

def my_groupBy():

  data = ['hello spark','hello world','hello world']

  rdd = sc.parallelize(data)

  rdd1 = rdd.flatMap(lambda line:line.split(" ")).map(lambda x:(x,1))

  rdd2 = rdd1.groupByKey()

  print(rdd2.collect())#直接输出会输出spark的参数,无法直接输出值

  print(rdd2.map(lambda x:{x[0]:list(x[1])}).collect()) #应该使用这种方式实现数据输出


```

## 5reduceByKey

把相同的Key的数据分发到一起,并进行相应的计算

```

def my_groupBy():

  data = ['hello spark','hello world','hello world']

  rdd = sc.parallelize(data)

  rdd1 = rdd.flatMap(lambda line:line.split(" ")).map(lambda x:(x,1))

  rdd2 = rdd1.reduceByKey(lambda a,b:a+b)

  print(rdd2.collect())

```

【面试考点:基础版本word count】

## 6) sortByKey()

默认升序列

需求1 :把wc的结果按照从大到小排列

```

def my_groupBy():

  data = ['hello spark','hello world','hello world']

  rdd = sc.parallelize(data)

  rdd1 = rdd.flatMap(lambda line:line.split(" ")).map(lambda x:(x,1))

  rdd2 = rdd1.reduceByKey(lambda a,b:a+b)

  rdd3 = rdd2.map(lambda x:(x[1],x[0])).sortByKey(False).map(lambda x:(x[1],x[0]))

  print(rdd3.collect())

```

## 7) Union

```

def my_union():

  a = sc.parallize([1,2,3])

  b = sc.parallize([2,3,4])

  a.union(b).collect()

```

## 8) distinct

```

def my_distinct():

  a = sc.parallize([1,2,3])

  b = sc.parallize([2,3,4])

  a.union(b).distinct().collect()

```

## 9) join

inner join

outer join:left/right/full

```

def my_join():

  a = sc.parallize([("A","a1"),("C","c1"),("D","d1")])

  b = sc.parallize([("B","b1"),("C","c2"),("C","c3")])

  a.join(b).collect() #这个表示内连接

  #结果[("C",("c1","c2")),("C",("c1","c3"))]


  a.leftOuterJoin(b).collect() #左外链接

  #结果:[("A",("a1",None)),("C",("c1","c2")),("C",("c1","c3")),("D",("d1",None))]


  a.RightOuterJoin(b).collect() #右外链接

  #结果:[("B",(None,"b1")),("C",("c1","c2")),("C",("c1","c3"))]


  a.fullOuterJoin(b).collection() #全连接

  #结果:[("A",("a1",None)),("B",(None,"b1")),("C",("c1","c2")),("C",("c1","c3")),("D",("d1",None))]

```

# 3 Action算子

## 1) 常用算子

collect、count、take、reduce、saveAsTextFile、foreach

## 2) 示例

```

def my_action():

  data = [1,2,3,4,5,6,7]

  rdd = sc.parallize(data)

  rdd.collect() #显示

  rdd.count() #计数

  rdd.take(3) #取前三个[1,2,3]

  rdd.max()/rdd.min()/rdd.sum() #集合计算


  rdd.reduce(lambda a,b:a+b)

  rdd.foreach(lambda x:print(x))


  rdd.saveAsTextFile() #文件存储

```

# 4 Spark RDD实战

## 1)词评统计案例,多角度迭代

1)input:1个文件,多个文件(文件夹),不同后缀名

2)开发步骤分析:

文本内容的每一行转成一个个的单词:flatMap

单词==>(单词,1):map

把所有单词相同的计数相加,得到最终的结果:reduceByKey

### 版本1 写到控制台

```

from pyshark import SparkConf,SparkContext

if __name__ = "__main__":

    if len(sys.srgv)!=2:

        print("Usage:wordcount",file = sys.stderr)

    conf = SparkConf()

    sc = SparkContext(conf = conf)

    def printResult():

        counts = sc.textFile(sys.argv[1])\

            .flatMap(lambda line:line.split("\t"))\

            .map(lambda x:(x,1))\

            .reduceByKey(lambda a,b:a+b)

        output = counts.collect()

        for(word,count) in output:

            print("%s:%i"%(word,count))

    printResult()


    sc.stop()

```

tips: 在脚本的参数中添加文本地址的参数 传入相关脚本参数

在命令行提交代码:

```

./spark -submit --master local[2] --name spark0402 /home/hadoop/script/spark0402.py file:///home/hadoop/data/hello.txt

```

支持文件匹配模式

```

1 file:///home/hadoop/data/hello.txt

2 file:///home/hadoop/data/

3 file:///home/hadoop/data/*.txt

```

tips:

复制文件

cp ../hello.txt 1

cp ../hello.txt 2

cp ../hello.txt 1.txt

### 版本2 保存到文件

```

if len(sys.srgv)!=3:

    print("Usage:wordcount ",file = sys.stderr)

    sys.exit(-1)


def saveFile():

    sc.textFile(sys.argv[1]) \

        .flatMap(lambda line: line.split("\t")) \

        .map(lambda x: (x, 1)) \

        .reduceByKey(lambda a, b: a + b).\

        .saveAsTextFile(sys.argv[2])


```

tips: 查看分区的内容

```

more part-0000*

```

### 作业:降序排列

## 2Top N 案例实战

1)input:1/n文件文件夹后缀名

2)求某个维度的topn

3)开发步骤分析

文本内容的每一行根据需求提出你所需要的字段

单词==>(单词,1):map

把所有单词相同的计数相加,得到最终的结果:reduceByKey

取最多出现次数的降序:sortByKey

```

counts = sc.textFile(sys.argv[1])\

        .flatMap(lambda line:line.split("\t"))\

         .map(lambda x:(x[5],1))\

         .reduceByKey(lambda a,b:a+b)\

         .map(lambda x:(x[1],x[0]))\

         .sortByKey()\

         .map(lambda x:(x[1],x[0])).take(5)

for(word,count) in output:

  print("%s:%i"%(word,count))         

```

思考:如果统计数据会发生数据倾斜

## 3)平均数案例实战

开发步骤分析:

1)取出年龄map

2)计算年龄总和reduce

3)计算记录综述count

4)求平均数

```

ageData = sc.textFile(sys.argv[1])\

        .flatMap(lambda x:x.split("\t")[1])\

totalAge = ageData.map(age:int(age)).reduce(a,b:a+b)

counts = ageData.count()       

avgAge = totalAge/counts

print(ageAge) 

```

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,002评论 6 509
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,777评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,341评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,085评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,110评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,868评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,528评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,422评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,938评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,067评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,199评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,877评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,540评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,079评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,192评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,514评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,190评论 2 357

推荐阅读更多精彩内容