这篇要有一点map reduce基础哦
用到RDD的key value
Key & Value
和Hadoop一样……
比如我们有给一个Friend数据集,里面有有一个column是age,一个column是name
那么key就是age,value就是该年龄下name的总数
RDD可以hold住 Key/Value pairs:
//
totalsByAge = rdd.map(age => (age, 1))
(33, 1)
(33, 1)
(12, 1)
(23, 1)
这些k/v对,是一个2个元素的tuple。
看到这里,其实你意识到了,scala/spark也没啥新鲜的。只是把k/v对映射成一个tuple了
bingo,现在我们就有了一个k/v组成的RDD了,撒花。
常用的RDD操作api
reduceByKey()
加和相同key的value。
rdd.reduceByKey((x, y) = >x + y) //此处的x、y并非k/v对。其实是value而已。
groupByKey()
将相同key的RDD group起来
sortByKey()
用key来排序RDD
keys(), values()
创建一个只要keys或values的RDD
还有一些join(), rightOuterJoin(), leftOuterJoin(), cogroup()
等等,可以看看文档。
RDD是不是有点像个NoSQL 的数据集?
如果不需要key的话
mapValues() & flatMapValues()
这两个api是很好的选择,性能比较好。
开始战斗!
我们的数据集长这样
id, name, age, numFriends
0, jean, 33, 2
1, hugh, 55, 221
2, will, 33, 385
我们要做的,是找出各个年龄的friendNum的平均值。
第一步 Mapping
- 定义方法解析数据,读入文件
// 解析数据的方法,是不是和Hadoop的很像!
def parseLine(line: String) = {
val fields = line.split(",")
val age = fields(2).toInt
val numFriends = fields(3).toInt
// (age, numFriends)
}
val lines = sc.textFile("path/to/the/file.csv")
val rdd = lines.map(parseLine) // 将parseLine传入作为mapping的方法
- Map阶段
我们看一个比较吓人的代码
val totalsByAge = rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x_2 + y._2))
亲娘嘞这是啥??!!
先不急哈,一步一步来。
我们先看rdd.mapValues(x => (x, 1))
,它return了啥?
(33, 385)=> (33, (385, 1))
(33, 2)=> (33, (2, 1))
(55, 221)=> (33, (55, 221))
这个方法把刚才那个parseLine的tuple们转化成了如上的模式。mapValues
只是用了tuple的value。是不是好神奇?
然后我们来看.reduceByKey((x, y) => (x._1 + y._1, x_2 + y._2))
熟悉jQuery的小伙伴一定不陌生,这个方法的参数就是接着上一个方法的返回的rdd。
但是既然是reduceByKey,那么Key就不参与计算了,直接传入的(x, y)其实是(385, 1)
以及(2, 1)
。
弄个清楚点的
(33, (385, 1)) => (385, 1) // 只取这个内嵌的tuple, 这个作为参数 x
|
V
(33, (2, 1)) => (2, 1) // 按照RDD顺序,下一个也是取内嵌的tuple,这个作为y
|
V
这一轮的返回值作为x传入,然后y取下一个的内嵌的tuple,递归着跑。(实在不明白,留言咱们讨论……)
这样就成了我们可以计算的RDD了!
- Reduce阶段
这里就好简答啦。
val averagesByAge = totalsByAge.mapValues(x=> x._1 / x._2)
(33, (387, 2)) => (33, 193.5)
...
- 获取结果
此时虽然得到了结果,但是还无法显示,因为结果是个RDD,所有要多一步来显示它。
val results = averagesByAge.collect()
result.sorted.foreach(println)
是不是有种超神的感觉呢,嘿嘿嘿
运行起来走一个
如果不熟悉环境搭建的,请看我Spark实战 - 1 - 配置和运行 超级详细了
数据集:FakeFriend.csv 提取码:z3db
完整代码:
package com.ephraim.spark
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._
object FriendsByAge {
def parseLine(line: String) = {
val fields = line.split(",")
val age = fields(2).toInt
val numFriends = fields(3).toInt
(age, numFriends)
}
/** Our main function where the action happens */
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.ERROR)
val sc = new SparkContext("local[*]", "FriendsByAge")
val lines = sc.textFile("./fakefriends/fakefriends.csv")
val rdd = lines.map(parseLine)
val totalsByAge = rdd.mapValues(x => (x, 1)).reduceByKey( (x,y) => (x._1 + y._1, x._2 + y._2))
val averagesByAge = totalsByAge.mapValues(x => x._1 / x._2)
val results = averagesByAge.collect()
results.sorted.foreach(println)
}
}
(18,343)
(19,213)
(20,165)
(21,350)
(22,206)
(23,246)
(24,233)
(25,197)
(26,242)
(27,228)
(28,209)
(29,215)
(30,235)
(31,267)
(32,207)
(33,325)
(34,245)
(35,211)
(36,246)
(37,249)
(38,193)
(39,169)
(40,250)
(41,268)
(42,303)
(43,230)
(44,282)
(45,309)
(46,223)
(47,233)
(48,281)
(49,184)
(50,254)
(51,302)
(52,340)
(53,222)
(54,278)
(55,295)
(56,306)
(57,258)
(58,116)
(59,220)
(60,202)
(61,256)
(62,220)
(63,384)
(64,281)
(65,298)
(66,276)
(67,214)
(68,269)
(69,235)
跟我一样优秀的你,一定能得到一样的结果呢!
欢迎大家点赞,评论,打赏哈!