Spark SQL

Chapter5 Spark SQL:结构化数据文件处理

5.1 认识Spark SQL

5.2 掌握DataFrame基础操作

5.2.1 创建DataFrame对象

1.结构化数据文件创建
from HDFS / Parquest创建: val dfUsers=sqlContext.read.load("user/root/sparkSql/...")
JSON: val dfPeople = sqlContext.read.format("json").load("...")
2.外部数据库创建
val url="jdbc:mysql://193.168.128.130/test"
val jdbcDF=sqlContext.read.format("jdbc").options(Map("url" -> url, "user" -> "root", "password" -> "root", "dbtable" -> "people")).load()
3.RDD创建
第一种方式,先定义case class,case class Person(name:String, age:Int);
再val data=sc.textFile("...").map(_.split(","));
再val people=data.map(p=>Person(p(0),p(1).trim.toInt)).toDF()
第二种方式,无法提前定义case class时,编程制定Schema将RDD转换城DF
4.Hive中的表创建
hiveContext.sql("use test");
val people=hiveContext.sql("select * from people")

5.2.2 DataFrame查看数据

函数或方法,例 movies.printSchema
printSchema,打印数据模式;
show,查看数据;
first/head/take/takeAsList,获取若干行数据;
collect/collectAsList,获取所有数据

5.2.3 DataFrame查询操作

1.将DF注册成临时表,peopleDF.registerTempTable("peopleTemTab");
用SQL语句查询,val personsRDD=sqlContext.sql("select ...").rdd;
personsRDD.collect
2.直接再DF对象上查询

1.条件查询
已有DF对象rating和user
(1) where
val userWhere=user.where("gender='F' and age=18")
(2) filter
val userFilter=user.filter("gender='F' and age=18")

2.查询指定字段的数据信息
(1) select
val userSelect=user.select(" ")
(2) selectExpr: 对制定字段特殊处理
val userSelectExpr=user.selectExpr(" ", "replace(gender) as sex", " ")
(3) col/apply
获取一个字段,返回Column类型

  1. limit
    val userlimit=user.limit(3)

4.orderBy/sort
val userOrderBy=user.orderBy(desc(" "))
val userSort=user.sort(asc(" "))
5.groupBy
val userGroupBy=user.groupBy("gender")
返回GroupedData对象
常用方法:max(), min(), mean(), sum(), count()
val userGroupByCount=user.groupBy("gender").count
6.join
val dfJoin=user.join(rating, "userId"),根据userId字段连接rating, user

5.2.4 DataFrame输出操作

import spark.sql.SaveMode
people.save("...", "json", SaveMode.Overwrite)
将DF输出为表
people.saveAsTable(" ", SaveMode.Overwrite)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容