[译]Spark SQL Start(2.4.3)

原文地址: https://spark.apache.org/docs/latest/sql-programming-guide.html


Spark SQL是用于处理结构化数据的spark模块。与基本的Spark RDD API不同,Spark SQL提供的接口为Spark提供了更多的数据结构和计算信息。在内部,Spark SQL使用这些额外的信息来执行额外的优化。有几种方法可以与Spark SQL交互,包括SQL和DataSet API。当计算结果时,会使用相同的执行引擎,而与你用来实现计算的API/语言无关。这种统一意味着开发人员可以很容易地在不同的API之间来回切换,基于这些Spark提供了非常自然的转换表达式。

本页中的所有的示例代码使用spark分布式中的示例数据, 可以运行在spark-shell, pyspark-shell 或者sparkR shell中.


Spark SQL的一个用途是执行SQL查询。Spark SQL还可以用于从现有的Hive服务中读取数据。有关如何配置此功能的详细信息,请参阅Hive Tables 部分。从其他编程语言运行SQL时,结果将作为数据集/数据帧(Dataset/DataFrame)返回。你还可以使用命令行或通过jdbc/odbc与SQL接口交互。

Datasets and DataFrames

数据集是数据的分布式集合。DataSet是Spark 1.6中添加的一个新接口,它提供了RDD的优点(强类型、使用强大lambda函数的能力)以及Spark SQL优化的执行引擎的优化。数据集可以从jvm对象构造,然后使用函数转换(mapflatmapfilter,等等)进行操作。数据集的API在Scala和Java中是可用的, 但是Python不支持数据集API。但是,由于python的动态特性,数据集API的许多好处已经可用(你可以自然地按名称访问行的字段'row.columnname`)。R的情况类似。

数据帧是一个由命名列组成的数据集。在概念上,它相当于关系数据库中的一个表或R/Python中的一个数据帧,但在底层做了更多优化。数据帧可以从一系列源构建,例如:结构化数据文件、Hive中的表、外部数据库或现有RDD。数据帧 API在Scala、Java、Python和R中都有效. 在Scala和Java中,数据帧可以表示为Row的数据集。在Scala API中,DataFrame表示为Dataset[Row], 而在Java API中,用 Dataset<Row>来表示DataFrame

Get Start

Starting Point: SparkSession

Spark Sql的入口是SparkSession, 创建SparkSession只需要调用SparkSession.builder():

SparkSession spark = SparkSession
  .appName("Java Spark SQL basic example")
  .config("spark.some.config.option", "some-value")

Spark 2.0中的SparkSession为Hive提供了内置支持,包括使用HiveQL编写查询、访问 Hive UDF, 以及从Hive表读取数据的功能。要使用这些功能,你不需要安装Hive。

Creating DataFrames(创建数据帧)

应用使用SparkSession, 可以从现有RDD,Hive表, Spark数据源中创建数据帧.
举个例子, 下面的代码从Json文件中创建数据帧:

Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");

// Displays the content of the DataFrame to stdout
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

Untyped Dataset Operations (aka DataFrame Operations)


就像上面提到的, 在Spark2.0中, Scala和Java API的数据帧只是Row类型的数据集. 这些操作也叫"非类型化转换", 与强类型的Scala/Java数据集的"类型化转换"相反.


// Print the schema in a tree format
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+


除了简单的列引用和表达式外,数据集还具有丰富的函数库,包括字符串操作、日期处理、常见数学运算等。完整列表在DataFrame Function Reference

Running SQL Queries Programmatically


// Register the DataFrame as a SQL temporary view

Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

Global Temporary View

Spark SQL中的临时视图是spark会话范围的,并且如果创建它的会话终止,它将消失。如果你希望拥有一个在所有会话之间共享的临时视图,并在Spark应用程序终止之前保持活动状态,那么你可以创建一个全局临时视图。全局临时视图绑定到系统保留的数据库global_temp,我们必须使用限定名称来引用它,例如select * from global_temp.view1

// Register the DataFrame as a global temporary view

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

Creating Datasets(创建数据集)


public static class Person implements Serializable {
  private String name;
  private int age;

  public String getName() {
    return name;

  public void setName(String name) {
    this.name = name;

  public int getAge() {
    return age;

  public void setAge(int age) {
    this.age = age;

// Create an instance of a Bean class
Person person = new Person();

// Encoders are created for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+

// Encoders for most common types are provided in class Encoders
Encoder<Integer> integerEncoder = Encoders.INT();
Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
Dataset<Integer> transformedDS = primitiveDS.map(
    (MapFunction<Integer, Integer>) value -> value + 1,
transformedDS.collect(); // Returns [2, 3, 4]

// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

Interoperating with RDDs(与RDD的交互)

Spark SQL支持两种方法将现有RDD转换为数据集。第一种方法使用反射来推断包含特定对象类型的RDD的模式。这种基于反射的方法可以得到更简洁的代码,并且在编写Spark应用程序时,当你已经知道模式时,它可以很好地工作。


Inferring the Schema Using Reflection(使用反射推断模式)

Spark SQL支持自动将JavaBeans的RDD转换为数据帧。使用反射获得的BeanInfo定义了表的模式。目前,Spark SQL不支持包含Map字段的JavaBeans。不过,支持嵌套的javaBeans和ListArray字段。您可以通过创建一个类来创建JavaBean,该类实现了Serializable接口并且所有字段都有getter和setter。

// Create an RDD of Person objects from a text file
JavaRDD<Person> peopleRDD = spark.read()
  .map(line -> {
    String[] parts = line.split(",");
    Person person = new Person();
    return person;

// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// Register the DataFrame as a temporary view

// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");

// The columns of a row in the result can be accessed by field index
Encoder<String> stringEncoder = Encoders.STRING();
Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
    (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(
    (MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

Programmatically Specifying the Schema(编程指定模式)

When JavaBean classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a Dataset<Row> can be created programmatically with three steps.

  • 1.从原始RDD创建基于Row的RDD
  • 2.使用StructType创建匹配RDD中Row数据的模式
  • 3.使用createDataFrame方法将模式应用到基于Row的RDD


// Create an RDD
JavaRDD<String> peopleRDD = spark.sparkContext()
  .textFile("examples/src/main/resources/people.txt", 1)

// The schema is encoded in a string
String schemaString = "name age";

// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {
  StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
StructType schema = DataTypes.createStructType(fields);

// Convert records of the RDD (people) to Rows
JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
  String[] attributes = record.split(",");
  return RowFactory.create(attributes[0], attributes[1].trim());

// Apply the schema to the RDD
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);

// Creates a temporary view using the DataFrame

// SQL can be run over a temporary view created using DataFrames
Dataset<Row> results = spark.sql("SELECT name FROM people");

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
Dataset<String> namesDS = results.map(
    (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+


内置的数据帧函数提供常见的聚合,如count()countDistinct()avg()max()min()等。虽然这些函数是为数据帧设计的,但Spark SQL在Scala和Java中的某些函数还具有类型安全的版本,可用于强类型数据集。此外,用户不仅限于预定义的聚合函数,还可以创建自己的聚合函数。

Untyped User-Defined Aggregate Functions(非类型化的用户自定义聚合函数)


public static class MyAverage extends UserDefinedAggregateFunction {

  private StructType inputSchema;
  private StructType bufferSchema;

  public MyAverage() {
    List<StructField> inputFields = new ArrayList<>();
    inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true));
    inputSchema = DataTypes.createStructType(inputFields);

    List<StructField> bufferFields = new ArrayList<>();
    bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));
    bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
    bufferSchema = DataTypes.createStructType(bufferFields);
  // Data types of input arguments of this aggregate function
  public StructType inputSchema() {
    return inputSchema;
  // Data types of values in the aggregation buffer
  public StructType bufferSchema() {
    return bufferSchema;
  // The data type of the returned value
  public DataType dataType() {
    return DataTypes.DoubleType;
  // Whether this function always returns the same output on the identical input
  public boolean deterministic() {
    return true;
  // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
  // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
  // the opportunity to update its values. Note that arrays and maps inside the buffer are still
  // immutable.
  public void initialize(MutableAggregationBuffer buffer) {
    buffer.update(0, 0L);
    buffer.update(1, 0L);
  // Updates the given aggregation buffer `buffer` with new input data from `input`
  public void update(MutableAggregationBuffer buffer, Row input) {
    if (!input.isNullAt(0)) {
      long updatedSum = buffer.getLong(0) + input.getLong(0);
      long updatedCount = buffer.getLong(1) + 1;
      buffer.update(0, updatedSum);
      buffer.update(1, updatedCount);
  // Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
  public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
    long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
    long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
    buffer1.update(0, mergedSum);
    buffer1.update(1, mergedCount);
  // Calculates the final result
  public Double evaluate(Row buffer) {
    return ((double) buffer.getLong(0)) / buffer.getLong(1);

// Register the function to access it
spark.udf().register("myAverage", new MyAverage());

Dataset<Row> df = spark.read().json("examples/src/main/resources/employees.json");
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

Dataset<Row> result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees");
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+

Type-Safe User-Defined Aggregate Functions(类型化用户自定义聚合函数)

User-defined aggregations for strongly typed Datasets revolve around the Aggregator abstract class. For example, a type-safe user-defined average can look like:

public static class Employee implements Serializable {
  private String name;
  private long salary;

  // Constructors, getters, setters...


public static class Average implements Serializable  {
  private long sum;
  private long count;

  // Constructors, getters, setters...


public static class MyAverage extends Aggregator<Employee, Average, Double> {
  // A zero value for this aggregation. Should satisfy the property that any b + zero = b
  public Average zero() {
    return new Average(0L, 0L);
  // Combine two values to produce a new value. For performance, the function may modify `buffer`
  // and return it instead of constructing a new object
  public Average reduce(Average buffer, Employee employee) {
    long newSum = buffer.getSum() + employee.getSalary();
    long newCount = buffer.getCount() + 1;
    return buffer;
  // Merge two intermediate values
  public Average merge(Average b1, Average b2) {
    long mergedSum = b1.getSum() + b2.getSum();
    long mergedCount = b1.getCount() + b2.getCount();
    return b1;
  // Transform the output of the reduction
  public Double finish(Average reduction) {
    return ((double) reduction.getSum()) / reduction.getCount();
  // Specifies the Encoder for the intermediate value type
  public Encoder<Average> bufferEncoder() {
    return Encoders.bean(Average.class);
  // Specifies the Encoder for the final output value type
  public Encoder<Double> outputEncoder() {
    return Encoders.DOUBLE();

Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);
String path = "examples/src/main/resources/employees.json";
Dataset<Employee> ds = spark.read().json(path).as(employeeEncoder);
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

MyAverage myAverage = new MyAverage();
// Convert the function to a `TypedColumn` and give it a name
TypedColumn<Employee, Double> averageSalary = myAverage.toColumn().name("average_salary");
Dataset<Double> result = ds.select(averageSalary);
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351