0301 Getting Started

转载请注明出处,谢谢合作~

该篇中的示例暂时只有 Scala 版本~

上手 Spark SQL

入口:SparkSession

Spark 应用程序的编程入口是 SparkSession 类,可以通过 SparkSession.builder() 创建一个基础的 SparkSession

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」。

Spark 2.0 中的 SparkSession 内置了对 Hive 的支持,包括使用 HiveSQL 编写查询语句,使用 Hive UDF,以及从 Hive 表中读取数据。这些功能需要首先安装好 Hive。

创建 DataFrame

应用程序可以使用 SparkSession 通过一个现有的 RDD(existing RDD),通过 Hive 表,或者通过 Spark 数据源(Spark data sources)创建 DataFrame。

下面的示例通过一个 JSON 文件创建 DataFrame:

val df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」。

类型无关的 DataFrame 算子

DataFrame 针对操作结构化的数据提供了特定的算子(Scala, Java, PythonR)。

上文提到,对于 Spark 2.0 中的 Scala 和 Java API, DataFrame 只是 Row 类型的 Dataset。相较于强类型相关的 Dataset,这些算子是类型无关的。

这里我们展示一些使用 Dataset 进行 结构化数据处理的基础样例:

// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」。

此类操作 Dataset 的算子的完整列表详见 API Documentation

除了简单的列引用和表达式,Dataset 还有一个强大的函数库,包括操作字符串,日期计算,常见的数据计算等等。完整的函数列表参见 DataFrame Function Reference

编程中使用 SQL 查询

SparkSessionsql 方法可以让应用程序通过编程使用 SQL 查询,返回值一个 DataFrame

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」。

全局临时视图

Spark SQL 中的临时视图在当前 SparkSession 存在范围内有效,一旦 SparkSession 结束,临时视图就消失了。如果需要在不同的应用程序之间共享临时视图,即使 SparkSession 结束依旧存在,可以使用全局临时视图。全局临时视图与一个系统保留数据库 global_temp 绑定,必须使用全限定名称来使用,比如 SELECT * FROM global_temp.view1

// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」。

创建 Dataset

Dataset 跟 RDD 类似,但是不像 RDD 那样使用 Java 或者 Kryo 序列化器,在计算以及网络传输过程中 Dataset 使用一个特定的 Encoder 来序列化对象。尽管 encoder 和标准的序列化器都是用来将一个对象转换为字节,encoder 采用的是动态代码生成的,并且采用了一种特殊的格式,Spark 可以对这种格式进行像过滤、排序和哈希运行而不用将其反序列化为对象。

case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」。

与 RDD 交互

Spark SQL 支持两种不同的方式把一个现有的 RDD 转换为 Dataset。第一种方式是通过反射推断一个定义了类型的 RDD 的表结构。这种基于反射的方式可以使代码简洁,在已知表结构的场景下工作良好。

第二种方式是通过编程的方式构建一个表结构对象,并把它赋予一个现有的 RDD。尽管这种方式相对复杂,但是能够在无法得知运行时类型的情况下创建 Dataset。

通过反射推断表结构

Spark SQL 的 Scala 接口自动支持将一个样本类类型的 RDD 转换为一个 DataFrame。样本类定义了表结构,通过反射获取类中的字段名称并将其应用为列名。样本类可以嵌套,还可以包含像 SeqArray 这样的复杂类型。该 RDD 会被隐式转换为一个 DataFrame,之后可以注册成一张表,该表可以通过 SQL 进行查询。

// For implicit conversions from RDDs to DataFrames
import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
  .textFile("examples/src/main/resources/people.txt")
  .map(_.split(","))
  .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
  .toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))

完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」

编程指定表结构

如果样本类无法被实现创建(例如,一行数据以字符串的格式编码,或者是需要被解析的文本类型的数据集,以及对于不同用户来说需要抽取的字段不同),可以分三步编程创建一个 DataFrame

  1. 通过一个 RDD 创建一个 Row 类型的 RDD;
  2. 创建一个 StructType 类型的表结构对象,需要与第 1 步 Row 中的数据相对应;
  3. 通过 SparkSession 提供的 createDataFrame 方法将第 1 步生成的 RDD 和第 2 步生成的表结构结合起来。

例如:

import org.apache.spark.sql.Row

import org.apache.spark.sql.types._

// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
  .map(_.split(","))
  .map(attributes => Row(attributes(0), attributes(1).trim))

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+

完整的示例代码位于 Spark 安装包的「examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala」。

标量函数

标量函数通过一行数据只返回一个单值,而不是像聚合函数那样接收多行数据返回一个单值。Spark SQL 支持许多内置标量函数(Built-in Scalar Functions),同事也支持自定义标量函数(User Defined Scalar Functions)。

聚合函数

聚合函数接收多行数据返回一个单值。内置的聚合函数(Built-in Aggregation Functions)提供了常见的聚合函数,比如 count(), countDistinct(), avg(), max(), min() 等等。用户不用受预定义聚合函数的限制,可以定义自己的聚合函数,详情参见 User Defined Aggregate Functions

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,185评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,652评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,524评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,339评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,387评论 6 391
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,287评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,130评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,985评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,420评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,617评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,779评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,477评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,088评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,716评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,857评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,876评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,700评论 2 354