Spark实战 - 2 - RDD实战:Average Friends by Age

这篇要有一点map reduce基础哦
用到RDD的key value

Key & Value

和Hadoop一样……

比如我们有给一个Friend数据集,里面有有一个column是age,一个column是name
那么key就是age,value就是该年龄下name的总数

RDD可以hold住 Key/Value pairs:

// 
totalsByAge = rdd.map(age => (age, 1))

(33, 1)
(33, 1)
(12, 1)
(23, 1)

这些k/v对,是一个2个元素的tuple。

看到这里,其实你意识到了,scala/spark也没啥新鲜的。只是把k/v对映射成一个tuple了

bingo,现在我们就有了一个k/v组成的RDD了,撒花。

常用的RDD操作api

reduceByKey() 加和相同key的value。

rdd.reduceByKey((x, y) = >x + y) //此处的x、y并非k/v对。其实是value而已。

groupByKey() 将相同key的RDD group起来
sortByKey() 用key来排序RDD
keys(), values() 创建一个只要keys或values的RDD
还有一些join(), rightOuterJoin(), leftOuterJoin(), cogroup() 等等,可以看看文档。

RDD是不是有点像个NoSQL 的数据集?

如果不需要key的话

mapValues() & flatMapValues()这两个api是很好的选择,性能比较好。

开始战斗!

我们的数据集长这样

id, name, age, numFriends
0, jean, 33, 2
1, hugh, 55, 221
2, will, 33, 385

我们要做的,是找出各个年龄的friendNum的平均值。

第一步 Mapping

  1. 定义方法解析数据,读入文件
// 解析数据的方法,是不是和Hadoop的很像!
def parseLine(line: String)  = {
  val fields = line.split(",")
  val age = fields(2).toInt
  val numFriends = fields(3).toInt
  // (age, numFriends)
}

val lines = sc.textFile("path/to/the/file.csv")
val rdd = lines.map(parseLine) // 将parseLine传入作为mapping的方法
  1. Map阶段
    我们看一个比较吓人的代码
val totalsByAge = rdd.mapValues(x => (x, 1)).reduceByKey((x, y) => (x._1 + y._1, x_2 + y._2))

亲娘嘞这是啥??!!

先不急哈,一步一步来。
我们先看rdd.mapValues(x => (x, 1)),它return了啥?

(33, 385)=> (33, (385, 1))
(33, 2)=> (33, (2, 1))
(55, 221)=> (33, (55, 221))

这个方法把刚才那个parseLine的tuple们转化成了如上的模式。mapValues只是用了tuple的value。是不是好神奇?

然后我们来看.reduceByKey((x, y) => (x._1 + y._1, x_2 + y._2))
熟悉jQuery的小伙伴一定不陌生,这个方法的参数就是接着上一个方法的返回的rdd。
但是既然是reduceByKey,那么Key就不参与计算了,直接传入的(x, y)其实是(385, 1)以及(2, 1)
弄个清楚点的

(33, (385, 1)) => (385, 1) // 只取这个内嵌的tuple, 这个作为参数 x
|
V
(33, (2, 1)) => (2, 1) // 按照RDD顺序,下一个也是取内嵌的tuple,这个作为y
|
V
这一轮的返回值作为x传入,然后y取下一个的内嵌的tuple,递归着跑。(实在不明白,留言咱们讨论……)

这样就成了我们可以计算的RDD了!

  1. Reduce阶段
    这里就好简答啦。
    val averagesByAge = totalsByAge.mapValues(x=> x._1 / x._2)
(33, (387, 2)) => (33, 193.5)
...
  1. 获取结果
    此时虽然得到了结果,但是还无法显示,因为结果是个RDD,所有要多一步来显示它。
val results = averagesByAge.collect()
result.sorted.foreach(println)

是不是有种超神的感觉呢,嘿嘿嘿

运行起来走一个

如果不熟悉环境搭建的,请看我Spark实战 - 1 - 配置和运行 超级详细了
数据集:FakeFriend.csv 提取码:z3db

完整代码:

package com.ephraim.spark

import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.log4j._

object FriendsByAge {
  
  def parseLine(line: String) = {
      val fields = line.split(",")

      val age = fields(2).toInt
      val numFriends = fields(3).toInt

      (age, numFriends)
  }
  
  /** Our main function where the action happens */
  def main(args: Array[String]) {
   
    Logger.getLogger("org").setLevel(Level.ERROR)
        
    val sc = new SparkContext("local[*]", "FriendsByAge")
  
    val lines = sc.textFile("./fakefriends/fakefriends.csv")

    val rdd = lines.map(parseLine)

    val totalsByAge = rdd.mapValues(x => (x, 1)).reduceByKey( (x,y) => (x._1 + y._1, x._2 + y._2))

    val averagesByAge = totalsByAge.mapValues(x => x._1 / x._2)

    val results = averagesByAge.collect()

    results.sorted.foreach(println)
  }
    
}
(18,343)
(19,213)
(20,165)
(21,350)
(22,206)
(23,246)
(24,233)
(25,197)
(26,242)
(27,228)
(28,209)
(29,215)
(30,235)
(31,267)
(32,207)
(33,325)
(34,245)
(35,211)
(36,246)
(37,249)
(38,193)
(39,169)
(40,250)
(41,268)
(42,303)
(43,230)
(44,282)
(45,309)
(46,223)
(47,233)
(48,281)
(49,184)
(50,254)
(51,302)
(52,340)
(53,222)
(54,278)
(55,295)
(56,306)
(57,258)
(58,116)
(59,220)
(60,202)
(61,256)
(62,220)
(63,384)
(64,281)
(65,298)
(66,276)
(67,214)
(68,269)
(69,235)

跟我一样优秀的你,一定能得到一样的结果呢!

欢迎大家点赞,评论,打赏哈!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,324评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,356评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,328评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,147评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,160评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,115评论 1 296
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,025评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,867评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,307评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,528评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,688评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,409评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,001评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,657评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,811评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,685评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,573评论 2 353