SparkSQL基本使用

  • 往Hadoop集群上上传测试数据,hdfs dfs -cat /person/employee.txt
    employee.txt
1,zhangxx,20,manager
2,wangxin,25,employee
3,wangergou,78,xixi
4,wawo,35,gogo
5,liwei,28,programmer
6,hanmeimei,29,UI

1.读取数据,将每一行的数据使用列分隔符分割
val lineRDD = sc.textFile("hdfs://hdp-sk-01:9000/employee.txt", 1).map(_.split(" ,"))

2.定义case class(相当于表的schema)
case class Employee(id:Int, name:String, age:Int,job:String)

3.导入隐式转换
import sqlContext.implicits._

4.将lineRDD转换成personRDD
val personRDD = lineRDD.map(x => Employee(x(0).toInt,x(1),x(2).toInt,x(3).toString))

5.将personRDD转换成DataFrame
val personDF = personRDD.toDF

6.对personDF进行处理

SQL风格语法

personDF.registerTempTable("t_person")
sqlContext.sql("select * from t_person order by age desc limit 2").show
sqlContext.sql("desc t_person").show
val result = sqlContext.sql("select * from t_person order by age desc")

7.保存结果

result.save("hdfs://hdp-sk-01:9000/sql/res1") //默认parquet文件形式
result.save("hdfs://hdp-sk-01:9000/sql/res2", "json") //指定json文件形式

JSON文件格式覆写HDFS上的JSON文件

import org.apache.spark.sql.SaveMode._
result.save("hdfs://hdp-sk-01:9000/sql/res2", "json" , Overwrite)

8.重新加载以前的处理结果(可选)

sqlContext.load("hdfs://hdp-sk-01:9000/sql/res1")//默认加载parquet文件形式
sqlContext.load("hdfs://hdp-sk-01:9000/sql/res2", "json")//json的存储文件以json形式加载

res2之前上传的时json文件,如果使用
sqlContext.load("hdfs://hdp-sk-01:9000/sql/res2")默认加载parquet文件形式,会报下面的错误。

TaskSetManager: Lost task 1.0 in stage 4.0 (TID 12, 192.168.92.112): java.io.IOException: Could not read footer: java.lang.RuntimeException: hdfs://hdp-sk-01:9000/sparksql/res2/part-r-00000-86cf6674-e647-4b71-bb88-dbe34fc36d0e is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [105, 34, 125, 10]

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容