- 往Hadoop集群上上传测试数据,
hdfs dfs -cat /person/employee.txt
employee.txt
1,zhangxx,20,manager
2,wangxin,25,employee
3,wangergou,78,xixi
4,wawo,35,gogo
5,liwei,28,programmer
6,hanmeimei,29,UI
1.读取数据,将每一行的数据使用列分隔符分割
val lineRDD = sc.textFile("hdfs://hdp-sk-01:9000/employee.txt", 1).map(_.split(" ,"))
2.定义case class(相当于表的schema)
case class Employee(id:Int, name:String, age:Int,job:String)
3.导入隐式转换
import sqlContext.implicits._
4.将lineRDD
转换成personRDD
val personRDD = lineRDD.map(x => Employee(x(0).toInt,x(1),x(2).toInt,x(3).toString))
5.将personRDD
转换成DataFrame
val personDF = personRDD.toDF
6.对personDF
进行处理
SQL风格语法
personDF.registerTempTable("t_person")
sqlContext.sql("select * from t_person order by age desc limit 2").show
sqlContext.sql("desc t_person").show
val result = sqlContext.sql("select * from t_person order by age desc")
7.保存结果
result.save("hdfs://hdp-sk-01:9000/sql/res1") //默认parquet文件形式
result.save("hdfs://hdp-sk-01:9000/sql/res2", "json") //指定json文件形式
以JSON
文件格式覆写HDFS上的JSON
文件
import org.apache.spark.sql.SaveMode._
result.save("hdfs://hdp-sk-01:9000/sql/res2", "json" , Overwrite)
8.重新加载以前的处理结果(可选)
sqlContext.load("hdfs://hdp-sk-01:9000/sql/res1")//默认加载parquet文件形式
sqlContext.load("hdfs://hdp-sk-01:9000/sql/res2", "json")//json的存储文件以json形式加载
res2
之前上传的时json
文件,如果使用
sqlContext.load("hdfs://hdp-sk-01:9000/sql/res2")
默认加载parquet
文件形式,会报下面的错误。
TaskSetManager: Lost task 1.0 in stage 4.0 (TID 12, 192.168.92.112): java.io.IOException: Could not read footer: java.lang.RuntimeException: hdfs://hdp-sk-01:9000/sparksql/res2/part-r-00000-86cf6674-e647-4b71-bb88-dbe34fc36d0e is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [105, 34, 125, 10]