快乐大数据第9课 Spark程序设计

#启动Spark JobHistoryServer第一步:修改yarn-site.xmlyarn.log.server.urlhttp://192.168.183.102:19888/jobhistory/logs Yarn JobHistoryServer访问地址接着在node02上重启启动Jobhistory./mr-jobhistory-daemon.sh stop historyserver./mr-jobhistory-daemon.sh start historyserver第二步:在hadfs上 hdfs fs -mkdir /spark_logshdfs fs -mkdir -p /system/spark第三步:修改spark安装包conf目录下的spark-defaults.confspark.yarn.historyServer.address=192.168.183.101:18080spark.history.ui.port=18080spark.eventLog.enabled=truespark.eventLog.dir=hdfs:///spark_logsspark.history.fs.logDirectory=hdfs:///spark_logsspark.yarn.archive=hdfs:///system/spark/spark-libs.jar第四步:在spark的安装目录下 jar cv0f spark-libs.jar -C jars/ .hadoop fs -put spark-libs.jar /system/spark第五步:在spark的安装目录下 node01中 启动Spark History Serversbin/start-history-server.sh#提交spark任务到Yarn在spark的安装目录下bin/spark-submit \--class bigdata.spark.WordCount \--master yarn \--deploy-mode cluster \--driver-cores 1 \--driver-memory 1g \--num-executors 1 \--executor-cores 1 \--executor-memory 3g \./bigdatasparkpro1-1.0-SNAPSHOT.jar /wordcount/input /wordcount/out5 #spark演示#方法,函数,匿名函数,伴生对象#map的演示启动spark shellval rdd = sc.parallelize(List("hello world","hello spark","hello theJ","hello scala"),2)rdd.partitions.size #返回是两个分区scala> val mapRdd = rdd.map(_.split(" "))mapRdd: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[1] at map at:26scala> mapRdd.partitions.sizeres1: Int = 2 #仍然返回是两个分区scala> mapRdd.collectcollect collectAsyncscala> mapRdd.collectres2: Array[Array[String]] = Array(Array(hello, world), Array(hello, spark), Array(hello, theJ), Array(hello, scala)) #返回是数组类型scala> val rdd2 = rdd.flatMap(_.split(" ")) #把数据展平rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at:26scala> rdd2.partitions.sizeres3: Int = 2scala> rdd2.collectres4: Array[String] = Array(hello, world, hello, spark, hello, theJ, hello, scala) #展平后显示的数据集合是1个整体的数组scala> rdd2.distinct.collectres5: Array[String] = Array(scala, hello, world, spark, theJ) #展平去重后显示的数据集#演示重分区scala> val rdd = sc.parallelize(0 to 1000,3)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at:24scala> rdd.partitionpartitioner partitionsscala> rdd.partitions.sizeres6: Int = 3scala> val rdd1 = rdd.coalesce(2)rdd1: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[7] at coalesce at:26scala> rdd1.partitionpartitioner partitionsscala> rdd1.partitions.sizeres7: Int = 2scala> val rdd1 = rdd.coalesce(5,true)rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[15] at coalesce at:26

scala> rdd1.partitions.size

res10: Int = 5

#union合并两个RDD,不去重

#mapPatitions,对分区进行操作,也需要返回迭代对象

与map方法类似,map是对rdd中的每一个元素进行操作,而mapPartitions(foreachPartition)则是对rdd中的每个分区的迭代器进行操作。如果在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。

SparkSql或DataFrame默认会对程序进行mapPartition的优化。

Demo

实现将每个数字变成原来的2倍的功能

比如:输入2,结果(2,4)

使用map

val a = sc.parallelize(1 to 9, 3)

def mapDoubleFunc(a : Int) : (Int,Int) = {

    (a,a*2)

}

val mapResult = a.map(mapDoubleFunc)

println(mapResult.collect().mkString)

1

2

3

4

5

6

7

结果

(1,2)(2,4)(3,6)(4,8)(5,10)(6,12)(7,14)(8,16)(9,18)

1

2

使用mapPartitions

val a = sc.parallelize(1 to 9, 3)

  def doubleFunc(iter: Iterator[Int]) : Iterator[(Int,Int)] = {

    var res = List[(Int,Int)]()

    while (iter.hasNext)

    {

      val cur = iter.next;

      res .::= (cur,cur*2)

    }

    res.iterator

  }

val result = a.mapPartitions(doubleFunc)

println(result.collect().mkString)

1

2

3

4

5

6

7

8

9

10

11

12

结果

(3,6)(2,4)(1,2)(6,12)(5,10)(4,8)(9,18)(8,16)(7,14)

# zip拉链操作

首先来看一下基本的api。

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

1

自身的RDD的值的类型为T类型,另一个RDD的值的类型为U类型。zip操作将这两个值连接在一起。构成一个元祖值。RDD的值的类型为元祖。

都是第i个值和第i个值进行连接。

zip函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常

val a = sc.parallelize(1 to 100, 3)

val b = sc.parallelize(101 to 200, 3)

a.zip(b).collect

//可以看到每个值都是对应的。

res1: Array[(Int, Int)] = Array((1,101), (2,102), (3,103), (4,104),

(5,105), (6,106), (7,107), (8,108), (9,109), (10,110), (11,111), (12,112),

(13,113), (14,114), (15,115), (16,116), (17,117), (18,118), (19,119),

(20,120), (21,121), (22,122), (23,123), (24,124), (25,125), (26,126),

(27,127), (28,128), (29,129), (30,130), (31,131), (32,132), (33,133)...

1

2

3

4

5

6

7

8

9

val a = sc.parallelize(1 to 100, 3)

val b = sc.parallelize(101 to 200, 3)

val c = sc.parallelize(201 to 300, 3)

//同样也可以多次进行zip操作,则返回的元祖值包含有多个值。

a.zip(b).zip(c).map((x) => (x._1._1, x._1._2, x._2 )).collect

res12: Array[(Int, Int, Int)] = Array((1,101,201), (2,102,202),

(3,103,203), (4,104,204), (5,105,205), (6,106,206), (7,107,207),

(8,108,208), (9,109,209), (10,110,210), (11,111,211), (12,112,212),

(13,113,213), (14,114,214), (15,115,215), (16,116,216), (17,117,217),

(18,118,218), (19,119,219), (20,120,220), (21,121,221), (22,122,222)...

#Key-value类型的处理

mapValues

? 针对[K,V]中的V值进行map操作

? groupByKdy

? 将RDD[K,V]中每个K对应的V值,合并到一个集合Iterable[V]中

? reduceByKey

? 将RDD[K,V]中每个K对应的V值根据传入的映射函数计算

? join

? 返回两个RDD根据K可以关联上的结果,join只能用于两个RDD

之间的关联,如果要多个RDD关联,需要关联多次

#RDD Action

collect

? 将一个RDD转换成数组,常用于调试

? saveAsTextFile

? 用于将RDD以文本文件的格式存储到文件系统中

? take

? 根据传入参数返回RDD的指定个数元素

? count

? 返回RDD中元素数量

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容