#启动Spark JobHistoryServer第一步:修改yarn-site.xmlyarn.log.server.urlhttp://192.168.183.102:19888/jobhistory/logs Yarn JobHistoryServer访问地址接着在node02上重启启动Jobhistory./mr-jobhistory-daemon.sh stop historyserver./mr-jobhistory-daemon.sh start historyserver第二步:在hadfs上 hdfs fs -mkdir /spark_logshdfs fs -mkdir -p /system/spark第三步:修改spark安装包conf目录下的spark-defaults.confspark.yarn.historyServer.address=192.168.183.101:18080spark.history.ui.port=18080spark.eventLog.enabled=truespark.eventLog.dir=hdfs:///spark_logsspark.history.fs.logDirectory=hdfs:///spark_logsspark.yarn.archive=hdfs:///system/spark/spark-libs.jar第四步:在spark的安装目录下 jar cv0f spark-libs.jar -C jars/ .hadoop fs -put spark-libs.jar /system/spark第五步:在spark的安装目录下 node01中 启动Spark History Serversbin/start-history-server.sh#提交spark任务到Yarn在spark的安装目录下bin/spark-submit \--class bigdata.spark.WordCount \--master yarn \--deploy-mode cluster \--driver-cores 1 \--driver-memory 1g \--num-executors 1 \--executor-cores 1 \--executor-memory 3g \./bigdatasparkpro1-1.0-SNAPSHOT.jar /wordcount/input /wordcount/out5 #spark演示#方法,函数,匿名函数,伴生对象#map的演示启动spark shellval rdd = sc.parallelize(List("hello world","hello spark","hello theJ","hello scala"),2)rdd.partitions.size #返回是两个分区scala> val mapRdd = rdd.map(_.split(" "))mapRdd: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[1] at map at:26scala> mapRdd.partitions.sizeres1: Int = 2 #仍然返回是两个分区scala> mapRdd.collectcollect collectAsyncscala> mapRdd.collectres2: Array[Array[String]] = Array(Array(hello, world), Array(hello, spark), Array(hello, theJ), Array(hello, scala)) #返回是数组类型scala> val rdd2 = rdd.flatMap(_.split(" ")) #把数据展平rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at:26scala> rdd2.partitions.sizeres3: Int = 2scala> rdd2.collectres4: Array[String] = Array(hello, world, hello, spark, hello, theJ, hello, scala) #展平后显示的数据集合是1个整体的数组scala> rdd2.distinct.collectres5: Array[String] = Array(scala, hello, world, spark, theJ) #展平去重后显示的数据集#演示重分区scala> val rdd = sc.parallelize(0 to 1000,3)rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at:24scala> rdd.partitionpartitioner partitionsscala> rdd.partitions.sizeres6: Int = 3scala> val rdd1 = rdd.coalesce(2)rdd1: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[7] at coalesce at:26scala> rdd1.partitionpartitioner partitionsscala> rdd1.partitions.sizeres7: Int = 2scala> val rdd1 = rdd.coalesce(5,true)rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[15] at coalesce at:26
scala> rdd1.partitions.size
res10: Int = 5
#union合并两个RDD,不去重
#mapPatitions,对分区进行操作,也需要返回迭代对象
与map方法类似,map是对rdd中的每一个元素进行操作,而mapPartitions(foreachPartition)则是对rdd中的每个分区的迭代器进行操作。如果在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。
SparkSql或DataFrame默认会对程序进行mapPartition的优化。
Demo
实现将每个数字变成原来的2倍的功能
比如:输入2,结果(2,4)
使用map
val a = sc.parallelize(1 to 9, 3)
def mapDoubleFunc(a : Int) : (Int,Int) = {
(a,a*2)
}
val mapResult = a.map(mapDoubleFunc)
println(mapResult.collect().mkString)
1
2
3
4
5
6
7
结果
(1,2)(2,4)(3,6)(4,8)(5,10)(6,12)(7,14)(8,16)(9,18)
1
2
使用mapPartitions
val a = sc.parallelize(1 to 9, 3)
def doubleFunc(iter: Iterator[Int]) : Iterator[(Int,Int)] = {
var res = List[(Int,Int)]()
while (iter.hasNext)
{
val cur = iter.next;
res .::= (cur,cur*2)
}
res.iterator
}
val result = a.mapPartitions(doubleFunc)
println(result.collect().mkString)
1
2
3
4
5
6
7
8
9
10
11
12
结果
(3,6)(2,4)(1,2)(6,12)(5,10)(4,8)(9,18)(8,16)(7,14)
# zip拉链操作
首先来看一下基本的api。
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
1
自身的RDD的值的类型为T类型,另一个RDD的值的类型为U类型。zip操作将这两个值连接在一起。构成一个元祖值。RDD的值的类型为元祖。
都是第i个值和第i个值进行连接。
zip函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常
val a = sc.parallelize(1 to 100, 3)
val b = sc.parallelize(101 to 200, 3)
a.zip(b).collect
//可以看到每个值都是对应的。
res1: Array[(Int, Int)] = Array((1,101), (2,102), (3,103), (4,104),
(5,105), (6,106), (7,107), (8,108), (9,109), (10,110), (11,111), (12,112),
(13,113), (14,114), (15,115), (16,116), (17,117), (18,118), (19,119),
(20,120), (21,121), (22,122), (23,123), (24,124), (25,125), (26,126),
(27,127), (28,128), (29,129), (30,130), (31,131), (32,132), (33,133)...
1
2
3
4
5
6
7
8
9
val a = sc.parallelize(1 to 100, 3)
val b = sc.parallelize(101 to 200, 3)
val c = sc.parallelize(201 to 300, 3)
//同样也可以多次进行zip操作,则返回的元祖值包含有多个值。
a.zip(b).zip(c).map((x) => (x._1._1, x._1._2, x._2 )).collect
res12: Array[(Int, Int, Int)] = Array((1,101,201), (2,102,202),
(3,103,203), (4,104,204), (5,105,205), (6,106,206), (7,107,207),
(8,108,208), (9,109,209), (10,110,210), (11,111,211), (12,112,212),
(13,113,213), (14,114,214), (15,115,215), (16,116,216), (17,117,217),
(18,118,218), (19,119,219), (20,120,220), (21,121,221), (22,122,222)...
#Key-value类型的处理
mapValues
? 针对[K,V]中的V值进行map操作
? groupByKdy
? 将RDD[K,V]中每个K对应的V值,合并到一个集合Iterable[V]中
? reduceByKey
? 将RDD[K,V]中每个K对应的V值根据传入的映射函数计算
? join
? 返回两个RDD根据K可以关联上的结果,join只能用于两个RDD
之间的关联,如果要多个RDD关联,需要关联多次
#RDD Action
collect
? 将一个RDD转换成数组,常用于调试
? saveAsTextFile
? 用于将RDD以文本文件的格式存储到文件系统中
? take
? 根据传入参数返回RDD的指定个数元素
? count
? 返回RDD中元素数量