Spark 例子

Spark 例子

最常用的转换操作有两个: mapfilter ,map(func)是将func应用到所有元素,得到一个新的RDD。filter是将func返回为true的元素过滤出来,组成一个新的RDD。一些比较常用的转换如下:

  • map(func) 返回一个新的分布式数据集,将数据源的每一个元素传递给函数 func 映射组成。
  • filter(func) 返回一个新的数据集,从数据源中选中一些元素通过函数 func 返回 true。
  • flatMap(func) 类似于 map,但是每个输入项能被映射成多个输出项(所以 func 必须返回一个 Seq,而不是单个 item)。
  • union(otherDataset) 两个RDD求并集
  • intersection(otherDataset) 两个RDD求交集
  • groupByKey() 作用于(K,V)的数据集,依据K对值进行归并,返回一个(K, Iterable)
  • reduceByKey(func) 作用于(K,V)的数据集,依据K对值使用func进行归约,返回一个(K,V)数据集
  • sortByKey([asending]) 返回一个依据K进行排序的数据集

最常用的动作就是reduce,将数据集归约为一个结果。一些比较常用的动作如下:

  • reduce(func) 按照func函数对数据集进行归约,func接受两个参数,返回一个结果,须满足结合律和交换律,以便于分布式计算。
  • count() 返回数据集的元素个数
  • first() 返回第一个元素
  • take(n) 以数组形式返回集合的前n个元素
  • saveAsTextFile(path) 将数据集保存为文本文件

读写文件

val lines = sc.textFile("file:///path_to_local/file")  
val lines = sc.textFile("hdfs:///path_to_hdfs/file")  
rdd.saveAsTextFile("hdfs://")

如果是parquet格式文件,可以用下面的办法,得到一个DataFrame,同样可以识别本地及hdfs文件,也可以识别目录及正则

val parquetFile = sqlContext.read.parquet("people.parquet")  
df.write.save("temp.parquet")  

JSON格式文件

val df = sqlContext.read.json("path to json file")  
val df = sqlContext.read.format("json").load("path to file")  
df.write.format("json").save("path to save")

统计字符数

val lines = sc.textFile("data.txt")     //读文件,得到以行字符串为单位的RDD  
val lineLengths = lines.map(s => s.length)    //转换,将字符串元素映射为其长度   
val totalLength = lineLengths.reduce((a, b) => a + b)   //动作,将所有元素加起来 

import org.apache.spark.SparkConf  
import org.apache.spark.SparkContext  
import org.apache.spark.SparkContext._  
  
object SparkWordCount {  
 def FILE_NAME:String = "word_count_results_";  
   
 def main(args:Array[String]) {  
 if (args.length < 1) {  
 println("Usage:SparkWordCount FileName");  
 System.exit(1);  
 }  
 val conf = new SparkConf().setAppName("Spark Exercise: Spark Version Word Count Program");  
 val sc = new SparkContext(conf);  
 val textFile = sc.textFile(args(0));  
 val wordCounts = textFile.flatMap(line => line.split(" ")).map(  
                                        word => (word, 1)).reduceByKey((a, b) => a + b)  
  
                                          
 wordCounts.saveAsTextFile(FILE_NAME+System.currentTimeMillis());  
 println("Word Count program running results are successfully saved.");  
 }  
}  

./spark-submit \  
--class com.ibm.spark.exercise.basic.SparkWordCount \  
--master spark://hadoop036166:7077 \  
--num-executors 3 \  
--driver-memory 6g --executor-memory 2g \  
--executor-cores 2 \  
/home/fams/sparkexercise.jar \  
hdfs://hadoop036166:9000/user/fams/*.txt  

求平均值

import org.apache.spark.SparkConf  
import org.apache.spark.SparkContext  
object AvgAgeCalculator {  
 def main(args:Array[String]) {  
 if (args.length < 1){  
 println("Usage:AvgAgeCalculator datafile")  
 System.exit(1)  
 }  
 val conf = new SparkConf().setAppName("Spark Exercise:Average Age Calculator")  
 val sc = new SparkContext(conf)  
 val dataFile = sc.textFile(args(0), 5);  
 val count = dataFile.count()  
 val ageData = dataFile.map(line => line.split(" ")(1))  
 val totalAge = ageData.map(age => Integer.parseInt(  
                                String.valueOf(age))).collect().reduce((a,b) => a+b)  
 println("Total Age:" + totalAge + ";Number of People:" + count )  
 val avgAge : Double = totalAge.toDouble / count.toDouble  
 println("Average Age is " + avgAge)  
 }  
}  

./spark-submit \  
 --class com.ibm.spark.exercise.basic.AvgAgeCalculator \  
 --master spark://hadoop036166:7077 \  
 --num-executors 3 \  
 --driver-memory 6g \  
 --executor-memory 2g \  
 --executor-cores 2 \  
 /home/fams/sparkexercise.jar \  
 hdfs://hadoop036166:9000/user/fams/inputfiles/sample_age_data.txt  

求男性/女性 最高 最低身高

object PeopleInfoCalculator {  
 def main(args:Array[String]) {  
 if (args.length < 1){  
 println("Usage:PeopleInfoCalculator datafile")  
 System.exit(1)  
 }  
 val conf = new SparkConf().setAppName("Spark Exercise:People Info(Gender & Height) Calculator")  
 val sc = new SparkContext(conf)  
 val dataFile = sc.textFile(args(0), 5);  
 val maleData = dataFile.filter(line => line.contains("M")).map(  
                              line => (line.split(" ")(1) + " " + line.split(" ")(2)))  
 val femaleData = dataFile.filter(line => line.contains("F")).map(  
                              line => (line.split(" ")(1) + " " + line.split(" ")(2)))  
  
 val maleHeightData = maleData.map(line => line.split(" ")(1).toInt)  
 val femaleHeightData = femaleData.map(line => line.split(" ")(1).toInt)  
  
 val lowestMale = maleHeightData.sortBy(x => x,true).first()  
 val lowestFemale = femaleHeightData.sortBy(x => x,true).first()  
  
 val highestMale = maleHeightData.sortBy(x => x, false).first()  
 val highestFemale = femaleHeightData.sortBy(x => x, false).first()  
 println("Number of Male Peole:" + maleData.count())  
 println("Number of Female Peole:" + femaleData.count())  
 println("Lowest Male:" + lowestMale)  
 println("Lowest Female:" + lowestFemale)  
 println("Highest Male:" + highestMale)  
 println("Highest Female:" + highestFemale)  
 }  
}  
./spark-submit \  
 --class com.ibm.spark.exercise.basic.PeopleInfoCalculator \  
 --master spark://hadoop036166:7077 \  
 --num-executors 3 \  
 --driver-memory 6g \  
 --executor-memory 3g \  
 --executor-cores 2 \  
 /home/fams/sparkexercise.jar \  
 hdfs://hadoop036166:9000/user/fams/inputfiles/sample_people_info.txt  

每行数据出现的次数最高的

=============

import org.apache.spark.SparkConf  
import org.apache.spark.SparkContext  
  
object TopKSearchKeyWords {  
 def main(args:Array[String]){  
 if (args.length < 2) {  
 println("Usage:TopKSearchKeyWords KeyWordsFile K");  
 System.exit(1)  
 }  
 val conf = new SparkConf().setAppName("Spark Exercise:Top K Searching Key Words")  
 val sc = new SparkContext(conf)  
 val srcData = sc.textFile(args(0))  
 val countedData = srcData.map(line => (line.toLowerCase(),1)).reduceByKey((a,b) => a+b)  
  
 val sortedData = countedData.map{ case (k,v) => (v,k) }.sortByKey(false)  
 val topKData = sortedData.take(args(1).toInt).map{ case (v,k) => (k,v) }  
 topKData.foreach(println)  
 }  
}  

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343

推荐阅读更多精彩内容