JSON综合性复杂案例

查询成绩为80分以上的学生的基本信息与成绩信息
Student.json
{"name":"Leo", "score":85}
{"name":"Marry", "score":99}
{"name":"Jack", "score":74}

/**
  * JSON数据源
  * @author Administrator
  *
  */
public class JSONDataSource {

​public static void main(String[] args) {
​​SparkConf conf = new SparkConf()​​​.setAppName("JSONDataSource");  
​​JavaSparkContext sc = new JavaSparkContext(conf);
​​SQLContext sqlContext = new SQLContext(sc);
​​// 针对json文件,创建DataFrame(针对json文件创建DataFrame)
​​DataFrame studentScoresDF = sqlContext.read().json​​​​"hdfs://spark1:9000/spark-study/students.json");  
// 针对学生成绩信息的DataFrame,注册临时表,查询分数大于80分的学生的姓名
​​// (注册临时表,针对临时表执行sql语句)
​​studentScoresDF.registerTempTable("student_scores");
​​DataFrame goodStudentScoresDF = sqlContext.sql(​​​​"select name,score from student_scores where score>=80");
// (将DataFrame转换为rdd,执行transformation操作)
​​List<String> goodStudentNames = goodStudentScoresDF.javaRDD().map(

new Function<Row, String>() {

​​​​​private static final long serialVersionUID = 1L;

​​​​​@Override
​​​​​public String call(Row row) throws Exception {
​​​​​​return row.getString(0);
​​​​​}
​​​​}).collect();

​​// 然后针对JavaRDD<String>,创建DataFrame
​​// (针对包含json串的JavaRDD,创建DataFrame)
​​List<String> studentInfoJSONs = new ArrayList<String>();
​​studentInfoJSONs.add("{\"name\":\"Leo\", \"age\":18}");  
​​studentInfoJSONs.add("{\"name\":\"Marry\", \"age\":17}");  
​​studentInfoJSONs.add("{\"name\":\"Jack\", \"age\":19}");
​​JavaRDD<String> studentInfoJSONsRDD = sc.parallelize(studentInfoJSONs);
​​DataFrame studentInfosDF = sqlContext.read().json(studentInfoJSONsRDD);
​​// 针对学生基本信息DataFrame,注册临时表,然后查询分数大于80分的学生的基本信息
​​studentInfosDF.registerTempTable("student_infos");  
​​String sql = "select name,age from student_infos where name in (";        
for(int i = 0; i < goodStudentNames.size(); i++) {
​​​sql += "'" + goodStudentNames.get(i) + "'";
​​​if(i < goodStudentNames.size() - 1) {
​​​​sql += ",";
​​​}
​​}
​​sql += ")";

​​DataFrame goodStudentInfosDF = sqlContext.sql(sql);
​​// 然后将两份数据的DataFrame,转换为JavaPairRDD,执行join transformation
​​// (将DataFrame转换为JavaRDD,再map为JavaPairRDD,然后进行join)
​​JavaPairRDD<String, Tuple2<Integer, Integer>> goodStudentsRDD = ​​​​goodStudentScoresDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {

private static final long serialVersionUID = 1L;

​​​​​@Override
​​​​​public Tuple2<String, Integer> call(Row row) throws Exception {
​​​​​​return new Tuple2<String, Integer>(row.getString(0),
​​​​​​​​Integer.valueOf(String.valueOf(row.getLong(1))));  
​​​​​}
​​​​}).join(goodStudentInfosDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {

private static final long serialVersionUID = 1L;

​​​​​@Override
​​​​​public Tuple2<String, Integer> call(Row row) throws Exception {
​​​​​​return new Tuple2<String, Integer>(row.getString(0),
​​​​​​​​Integer.valueOf(String.valueOf(row.getLong(1))));  
​​​​​}
​​​​}));

// 然后将封装在RDD中的好学生的全部信息,转换为一个JavaRDD<Row>的格式
​​// (将JavaRDD,转换为DataFrame)
​​JavaRDD<Row> goodStudentRowsRDD = goodStudentsRDD.map(

​​​​new Function<Tuple2<String,Tuple2<Integer,Integer>>, Row>() {

​​​​​private static final long serialVersionUID = 1L;

​​​​​@Override
​​​​​public Row call(
​​​​​​​Tuple2<String, Tuple2<Integer, Integer>> tuple) ​​​​​​​throws Exception {
​​​​​​return RowFactory.create(tuple._1, tuple._2._1, tuple._2._2);
​​​​​}
​​​​});

​​// 创建一份元数据,将JavaRDD<Row>转换为DataFrame
​​List<StructField> structFields = new ArrayList<StructField>();
​​structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
​​structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));  
​​structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));  
​​StructType structType = DataTypes.createStructType(structFields);  
​​DataFrame goodStudentsDF = sqlContext.createDataFrame(goodStudentRowsRDD, structType);

// 将好学生的全部信息保存到一个json文件中去
// (将DataFrame中的数据保存到外部的json文件中去)         
goodStudentsDF.write().format("json").save("hdfs://spark1:9000/spark-study/good-students");  
​}
}

查看结果:
Hadoop fs –text /spark-study/good-students/part-r*

Scala版本

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.LongType


/**
* @author Administrator
*/
object JSONDataSource {

def main(args: Array[String]): Unit = {
val conf = new SparkConf()
   .setAppName("JSONDataSource")  
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

// 创建学生成绩DataFrame
val studentScoresDF = sqlContext.read.json("hdfs://spark1:9000/spark-study/students.json")

// 查询出分数大于80分的学生成绩信息,以及学生姓名
studentScoresDF.registerTempTable("student_scores")
val goodStudentScoresDF = sqlContext.sql("select name,score from student_scores where score>=80")
val goodStudentNames = goodStudentScoresDF.rdd.map { row => row(0) }.collect()  IDEa

// 创建学生基本信息DataFrame
val studentInfoJSONs = Array("{\"name\":\"Leo\", \"age\":18}",
   "{\"name\":\"Marry\", \"age\":17}",
   "{\"name\":\"Jack\", \"age\":19}")
val studentInfoJSONsRDD = sc.parallelize(studentInfoJSONs, 3);
val studentInfosDF = sqlContext.read.json(studentInfoJSONsRDD)  

// 查询分数大于80分的学生的基本信息
studentInfosDF.registerTempTable("student_infos")

var sql = "select name,age from student_infos where name in ("
for(i <- 0 until goodStudentNames.length) {
 sql += "'" + goodStudentNames(i) + "'"
 if(i < goodStudentNames.length - 1) {
   sql += ","
 }
}
sql += ")"  

val goodStudentInfosDF = sqlContext.sql(sql)

// 将分数大于80分的学生的成绩信息与基本信息进行join
val goodStudentsRDD =
   goodStudentScoresDF.rdd.map { row => (row.getAs[String]("name"), row.getAs[Long]("score")) }
       .join(goodStudentInfosDF.rdd.map { row => (row.getAs[String]("name"), row.getAs[Long]("age")) })  

// 将rdd转换为dataframe
val goodStudentRowsRDD = goodStudentsRDD.map(
   info => Row(info._1, info._2._1.toInt, info._2._2.toInt))  
       
val structType = StructType(Array(
   StructField("name", StringType, true),
   StructField("score", IntegerType, true),
   StructField("age", IntegerType, true)))  
   
val goodStudentsDF = sqlContext.createDataFrame(goodStudentRowsRDD, structType)  

// 将dataframe中的数据保存到json中
goodStudentsDF.write.format("json").save("hdfs://spark1:9000/spark-study/good-students-scala")  
 }

}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容

  • Scaladoc是什么:scala api文档,包含了scala所有的api以及使用说明,class、object...
    义焃阅读 2,577评论 0 1
  • 母亲有着一口说不清汉话的腔调 脸被高原的风和太阳侵蚀的有团高原红 一副珊瑚耳环 盘着藏式发型,鬓发成雪 拾山中珍宝...
    卓倪阅读 478评论 1 1
  • 1在生命中,从来就没有一帆风顺的,总会有磕磕碰碰的,所以自己不要埋怨生活的不满。人,一定要知道满足,正所谓知足者常...
    QT_陈生阅读 755评论 1 0
  • 【单边账中资金冻结的具体处理流程】 (1)因网点误操作导致挂失冻结: 客户卡片未丢失,但在受理撤销“延迟转账”...
    卡宝BOC阅读 426评论 0 0
  • 每一次出远门,总有些所见所闻、所思所想,难怪余秋雨能写出《行者无疆》《文化苦旅》。太原之行结束,我也整一篇“太原甜...
    悠着点打卡阅读 727评论 0 4