DataFrame虽然可以让Spark更好的处理结构化数据,但是存在编译时类型安全检查的问题。为此,Spark 1.6起对DataFrame进行了扩展,形成了一套新的API 即Dataset。本节就来介绍Dataset的相关知识。
1.什么是Dataset
Dataset是一个分布式的数据收集器。这是在Spark1.6之后新增的一个编程接口(API),兼顾了RDD优点(即强类型,可以使用功能强大的lambda表达式等)以及Spark SQL的执行器高效性的优点。所以可以把DataFrame看作是一种特殊的Dataset,即Dataset(Row)。
2.Dataset与DataFrame的关系
Dataset兼顾了RDD和DataFrame的优点,是对二者功能的进一步扩展和升级。Dataset与RDD和DataFrame的关系如下图所示:
3.创建Dataset
3.1使用序列创建Dataset
(1)定义case class
scala> case class MyData(a:Int, b:String)
defined class MyData
(2)生成序列数据
scala> val myseq = Seq(MyData(1,"Tom"), MyData(2, "Mary"))
myseq: Seq[MyData] = List(MyData(1,Tom), MyData(2,Mary))
(3)转换成Dataset
scala> val ds = myseq.toDS
ds: org.apache.spark.sql.Dataset[MyData] = [a: int, b: string]
(4)查看结果
scala> ds.show
3.2使用JSON数据创建Dataset
(1)定义case class
scala> case class Person(name:String, gender:String)
defined class Person
(2)读取JSON数据生成DataFrame
scala> val df = spark.read.json(sc.parallelize("""{"gender":"Male","name":"Tom"}"""::Nil))
df: org.apache.spark.sql.DataFrame = [gender: string, name: string]
(3)将DataFrame转换成Dataset
scala> val ds = df.as[Person]
ds: org.apache.spark.sql.Dataset[Person] = [gender: string, name: string]
(4)查看结果
scala> ds.show
3.3使用HDFS数据创建Dataset
(1)读取HDFS数据
# hdfs dfs -cat /input/data.txt
I love Beijing
I love China
Beijing is the capital of Chinascala> val linesDF = spark.read.text("hdfs://master:9000/input/data.txt")
linesDF: org.apache.spark.sql.DataFrame = [value: string]
(2)转换成Dataset
scala> val linesDS = linesDF.as[String]
linesDS: org.apache.spark.sql.Dataset[String] = [value: string]
(3)显示长度大于3个字符的单词
scala> val result = linesDS.flatMap(.split(" ")).filter(.length > 3)
result: org.apache.spark.sql.Dataset[String] = [value: string]
scala> result.show
(4)执行Wordcount程序
scala> val result = linesDS.flatMap(.split(" ")).map((,1)).groupByKey(x=>x._1).count
result: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]
scala> result.orderBy($"value").show
4.操作Dataset案例
数据:员工信息emp.json,部门信息dept.csv
(1)建立Dataset,查询薪水大于3000元的员工信息
scala> case class Emp(empno:Long, ename:String, job:String,
hiredate:String, mgr:String, sal:Long, comm:String, deptno:Long)
defined class Emp
scala> val empDF = spark.read.json("file:///root/input/emp.json")
empDF: org.apache.spark.sql.DataFrame = [comm: string, deptno: bigint ... 6 more fields]
scala> val empDS = empDF.as[Emp]
empDS: org.apache.spark.sql.Dataset[Emp] = [comm: string, deptno: bigint ... 6 more fields]
scala> empDS.filter(_.sal > 3000).show
(2)查询10号部门的员工信息
scala> empDS.filter(_.deptno == 10).show
(3)多表查询:查询每个员工的部门信息
创建部门表:
scala> case class Dept(deptno:Int, dname:String, loc:String)
defined class Dept
scala> val deptRDD = sc.textFile("file:///root/input/dept.csv").map(_.split(","))
deptRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[29] at map at <console>:24
scala> val deptDS = deptRDD.map(x=>Dept(x(0).toInt,x(1),x(2))).toDS
deptDS: org.apache.spark.sql.Dataset[Dept] = [deptno: int, dname: string ... 1 more field]
创建员工表:
scala> case class Emp(empno:Long, ename:String, job:String,
hiredate:String, mgr:String, sal:Long, comm:String, deptno:Long)
defined class Emp
scala> val empRDD = sc.textFile("file:///root/input/emp.csv").map(_.split(","))
empRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[33] at map at <console>:24
scala> val empDS = empRDD.map(x=>Emp(x(0).toInt,x(1),x(2),x(3),
x(4),x(5).toInt,x(6),x(7).toInt)).toDS
empDS: org.apache.spark.sql.Dataset[Emp] = [empno: bigint, ename: string ... 6 more fields]
执行多表查询:使用join实现等值连接
scala> val result = deptDS.join(empDS,"deptno")
result: org.apache.spark.sql.DataFrame = [deptno: int, dname: string ... 8 more fields]
scala> result.show
另一种写法:使用joinWith实现等值连接
scala> val result = deptDS.joinWith(empDS, deptDS("deptno")===empDS("deptno"))
result: org.apache.spark.sql.Dataset[(Dept, Emp)] = [_1: struct<deptno: int, dname: string ... 1 more field>, _2: struct<empno: bigint, ename: string ... 6 more fields>]
scala> result.show
注意:joinWith中的第二个参数是一个条件表达式,“等于”用“===”表示;joinWith和join的区别是,连接后新的Dataset的Schema(表结构)会不一样。
查看执行计划:
scala> result.explain