进一步理解DataFrame, Dataset, RDD

Dataset=RDD+schema

Dataset几乎就是一个RDD,除了它还包括一个schema,这个schema很多时候也是自动推导出来的,最简单的schema是包含一个名为value的列,它的类型可以是String,Int...

如下代码创建一个Dataset:

scala> import spark.implicits._
import spark.implicits._

scala> val ds = Seq(("bluejoe", 100), ("alex", 200)).toDS
ds: org.apache.spark.sql.Dataset[(String, Int)] = [_1: string, _2: int]

scala> ds.schema
res0: org.apache.spark.sql.types.StructType = StructType(StructField(_1,StringType,true), StructField(_2,IntegerType,false))

scala> ds.collect
res1: Array[(String, Int)] = Array((bluejoe,100), (alex,200))

这个Dataset就包含了2行记录,每个记录是一个Tuple2,如:(bluejoe,100)

可以针对这个Dataset做SQL查询:

scala> ds.select("_1").collect
res4: Array[org.apache.spark.sql.Row] = Array([bluejoe], [alex])

scala> ds.show
+-------+---+
|     _1| _2|
+-------+---+
|bluejoe|100|
|   alex|200|
+-------+---+
scala> ds.select("_1").show
+-------+
|     _1|
+-------+
|bluejoe|
|   alex|
+-------+
scala> ds.select(ds("_1")).show
+-------+
|     _1|
+-------+
|bluejoe|
|   alex|
+-------+
scala> ds.select($"_1").show
+-------+
|     _1|
+-------+
|bluejoe|
|   alex|
+-------+

ds.select("_1")与ds.select(ds("_1")),以及ds.select($"_1")等价
$"_1"神奇吗?一点都不神奇,$()其实是一个函数:

  implicit class StringToColumn(val sc: StringContext) {
    def $(args: Any*): ColumnName = {
      new ColumnName(sc.s(args: _*))
    }
  }

SQL列还可以进行运算操作:

scala> ds.select(ds("_2")+10).show
+---------+
|(_2 + 10)|
+---------+
|      110|
|      210|
+---------+
scala> ds.select($"_2"+10).show
+---------+
|(_2 + 10)|
+---------+
|      110|
|      210|
+---------+

+、-等运算符其实也被ColumnName定义了,这里不再赘述。

也可以使用map()对Dataset进行变形:

scala> ds.map(x=>(x._1.toUpperCase, x._2+10)).show
+-------+---+
|     _1| _2|
+-------+---+
|BLUEJOE|110|
|   ALEX|210|
+-------+---+

可以看出,map()函数会生成新的schema:

scala> ds.map(x=>(x._1.toUpperCase, x._2+10, true)).show
+-------+---+----+
|     _1| _2|  _3|
+-------+---+----+
|BLUEJOE|110|true|
|   ALEX|210|true|
+-------+---+----+

除了将一个Tuple转换成另外一个Tuple,还可以转成一个JavaBean:

scala> case class Person(name:String,age:Int){};
defined class Person
scala> val ds2=ds.map(x=>Person(x._1.toUpperCase, x._2+10))
ds2: org.apache.spark.sql.Dataset[Person] = [name: string, age: int]

scala> ds2.show
+-------+---+
|   name|age|
+-------+---+
|BLUEJOE|110|
|   ALEX|210|
+-------+---+

注意这个新的Dataset的每一行变成了一个Person对象:

scala> ds2.collect
res36: Array[Person] = Array(Person(BLUEJOE,110), Person(ALEX,210))

注意,不是任何对象都可以放到Dataset中:

scala> import org.apache.spark.sql._
import org.apache.spark.sql._

scala> ds.map(x=>Row(x._1.toUpperCase, x._2+10)).show
<console>:32: error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
       ds.map(x=>Row(x._1.toUpperCase, x._2+10)).show

DataFrame是Dataset[Row]的别名
A DataFrame is a Dataset organized into named columns.

Dataset可以转成DataFrame:

scala> val df=ds.toDF
df: org.apache.spark.sql.DataFrame = [_1: string, _2: int]

scala> df.collect
res33: Array[org.apache.spark.sql.Row] = Array([bluejoe,100], [alex,200])

注意看到DataFrame的每行确实是一个Row,观察源代码:

 def toDF(): DataFrame = new Dataset[Row](sparkSession, queryExecution, RowEncoder(schema))

实际上,toDF()使用一个RowEncoder来实现Tuple到Row的转码。

也可以使用as()函数来转换成DataFrame:

scala> ds.as[Row](RowEncoder(ds.schema)).collect
res55: Array[org.apache.spark.sql.Row] = Array([bluejoe,100], [alex,200])

DataFrame的map()函数具有一些陷阱,因为它实际上还是一个Dataset,所以它的每一行还是可以转换成任意对象(甚至是非Row对象!!):

scala> df.map(x=>(x(0).asInstanceOf[String].toLowerCase, x(1).asInstanceOf[Int]-10)).collect
res43: Array[(String, Int)] = Array((bluejoe,90), (alex,190))

看到没?这个map()之后的对象并不再是DataFrame了!!如果坚持要转变成DataFrame,就必须用到别扭的toDF():

scala> df.map(x=>(x(0).asInstanceOf[String].toLowerCase, x(1).asInstanceOf[Int]-10)).toDF.collect
res44: Array[org.apache.spark.sql.Row] = Array([bluejoe,90], [alex,190])

或者指定Encoder:

scala> df.map{x:Row=>Row(x(0).asInstanceOf[String].toLowerCase, x(1).asInstanceOf[Int]-10)}(RowEncoder(ds.schema)).collect
res52: Array[org.apache.spark.sql.Row] = Array([bluejoe,90], [alex,190])

别扭吗?真的很别扭!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,539评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,911评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,337评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,723评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,795评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,762评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,742评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,508评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,954评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,247评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,404评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,104评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,736评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,352评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,557评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,371评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,292评论 2 352

推荐阅读更多精彩内容