Flink Table API 和 SQL

Apache Flink 具有两个关系型API:Table API 和SQL,用于统一流和批处理。
Table API 是用于 Scala 和 Java 语言的查询API,允许以非常直观的方式组合关系运算符的查询,例如 select,filter 和 join。Flink SQL 的支持是基于实现了SQL标准的 Apache Calcite。无论输入是批输入(DataSet)还是流输入(DataStream),任一接口中指定的查询都具有相同的语义并指定相同的结果。

Table API 和 SQL 还没有完全支持并且正在积极开发中。

要使用 Table API 和SQL,需要将以下依赖引入项目:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table_2.11</artifactId>
  <version>1.6.1</version>
</dependency>

Table API 和SQL

批处理和流式传输的 Table API 和SQL程序都遵循相同的模式。以下代码示例显示了常见的程序结构:

// 批处理使用 ExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// 创建 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 注册 Table
tableEnv.registerTable("table1", ...)

// Table API query
val tapiResult = tableEnv.scan("table1").select(...)

// SQL query
val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table2 ...")

// Sink query result
tapiResult.writeToSink(...)

// execute
env.execute()

TableEnvironment

TableEnvironment 是 Table API 和SQL集成的核心概念,它负责:

  • 在内部目录中注册表
  • 注册外部目录
  • 执行SQL查询
  • 注册用户定义的函数
  • DataStream 或 DataSet 转换为 Table
  • 持有 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

Table 总是与特定的 TableEnvironment 绑定。不能在同一查询中组合不同 TableEnvironments 的表(例如,union 或 join)。创建 TableEnvironment:

// STREAMING QUERY
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for streaming queries
val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)

// BATCH QUERY
val bEnv = ExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for batch queries
val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)

注册 Table

TableEnvironment 维护一个按名称注册的表的目录。有两种类型的表,输入表(input table)和输出表(output table)。输入表可以在 Table API 和SQL查询中引用,并提供输入数据。输出表可用于将 Table API 或SQL查询的结果发送到外部系统。

输入表的注册源:

  • Table API 或SQL查询的结果表
  • 访问外部数据的 TableSource,例如文件,数据库或消息系统
  • DataStream 或 DataSet。

输出表的注册源:TableSink

代码示例:

val tableEnv = TableEnvironment.getTableEnvironment(env)

// from Table API or SQL
val projTable: Table = tableEnv.scan("X").select(...)
tableEnv.registerTable("projectedTable", projTable)

// from TableSource
val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)
tableEnv.registerTableSource("CsvTable", csvSource)

// from TableSink
val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)

// define the field names and types
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)

tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)

注册外部目录

外部目录(external catalog)可以提供有关外部数据库和表的信息(如名称,schema,统计信息以及访问信息)。可以通过实现 ExternalCatalog 接口创建外部目录,并在 TableEnvironment 中注册:

// 获取一个 StreamTableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 创建一个外部目录
val catalog: ExternalCatalog = new InMemoryExternalCatalog

// 注册外部目录
tableEnv.registerExternalCatalog("InMemCatalog", catalog)

查询

Table API

Table API 是一个 Scala 和 Java 的语言集成查询API,是基于 Table类。Table类代表了一个流或者批表,并提供方法来使用关系型操作。这些方法返回一个新的 Table 对象,这个新的 Table 对象代表着输入的 Table 应用关系型操作后的结果。下面的例子展示了一个简单的 Table API 聚合查询:

// 获取一个 StreamTableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 注册一个名叫 Orders 的表 ...

// 扫描注册的 Orders 表
val orders = tableEnv.scan("Orders")

// 计算所有来自法国的客户的收入
val revenue = orders
  .filter('cCountry === "FRANCE")
  .groupBy('cID, 'cName')
  .select('cID, 'cName, 'revenue.sum AS 'revSum)

// 执行查询

SQL

Flink SQL 集成是基于 Apache Calcite,Apache Calcite 实现了标准的SQL。下面的例子展示了如何指定一个查询并返回结果:

// 获取一个 StreamTableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 注册一个名叫 Orders 的表

// 计算所有来自法国的客户的收入
val revenue = tableEnv.sqlQuery("""
  |SELECT cID, cName, SUM(revenue) AS revSum
  |FROM Orders
  |WHERE cCountry = 'FRANCE'
  |GROUP BY cID, cName
  """.stripMargin)

// 执行查询

指定将其结果插入已注册表的更新查询:

// 注册一个名叫 RevenueFrance 的输出表

// 计算所有来自法国的客户的收入,并更新到 RevenueFrance 表
tableEnv.sqlUpdate("""
  |INSERT INTO RevenueFrance
  |SELECT cID, cName, SUM(revenue) AS revSum
  |FROM Orders
  |WHERE cCountry = 'FRANCE'
  |GROUP BY cID, cName
  """.stripMargin)

// 执行查询

混合使用 Table API 和SQL,Table API 和SQL查询可以很容易地合并因为它们都返回 Table 对象:

  1. Table API 查询可以基于SQL查询结果的 Table 来进行
  2. SQL查询可以基于 Table API 查询的结果来定义

输出表

要输出 Table 可以写入 TableSink。TableSink 是通用接口,支持各种文件格式(如:CSV,Apache Parquet,Apache Avro)、存储系统(如:JDBC,Apache HBase,Apache Cassandra,Elasticsearch)或消息系统(如:Apache Kafka,RabbitMQ)。

批处理 Table 只能写入 BatchTableSink,而流式处理 Table 需要 AppendStreamTableSink,RetractStreamTableSink 或 UpsertStreamTableSink。有关可用接收器的详细信息,请参阅 Sources & Sinks

有两种方法可以发送表:

  • Table.writeToSink(TableSink sink) 自动匹配 schema
  • Table.insertInto(String sinkTable) 使用特定 schema

以下示例显示如何发出Table:

// 获取一个 StreamTableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 使用Table API和/或SQL查询获取一个 Table
val result: Table = ...

// 创建一个 TableSink
val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")

// METHOD 1:
//   将结果表写入 TableSink
result.writeToSink(sink)

// METHOD 2:
//   注册指定 schema 的 TableSink
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation] = Array(Types.INT, Types.STRING, Types.LONG)
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink)
//   将结果表写入 TableSink
result.insertInto("CsvSinkTable")

// 执行程序

与 DataStream 和 DataSet API 集成

Table API 和SQL查询可以很容易地进行集成并嵌入到 DataStream 和 DataSet 程序中。例如,可以查询一个外部表(来自关系型数据库的表),做一些处理(如过滤、映射、聚合或者关联元数据),然后使用 DataStream 或者 DataSet API(以及在这些API之上构建的任何库,例如CEP或 Gelly) 进行进一步处理。

同样,Table API 或者SQL查询也可以应用于 DataStream 或者 DataSet 程序的结果中。这种交互可以通过将 DataStream 或者 DataSet 转换成一个 Table 及将 Table 转换成 DataStream 或者 DataSet 来实现。

Scala 隐式转换

Scala Table API 支持 DataSet,DataStream 以及 Table 间的隐式转换。需要引入 org.apache.flink.table.api.scala._org.apache.flink.api.scala._

DataStream 或 DataSet 转换为 Table

DataStream 或 DataSet 可以在 TableEnvironment 中注册为表,表的 schema 根据注册的 DataStream 或 DataSet 的数据类型来定:

val stream: DataStream[(Long, String)] = ...

// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream)

// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)

也可以直接转换为表,而不需要注册:

val stream: DataStream[(Long, String)] = ...

// convert the DataStream into a Table with default fields '_1, '_2
val table1: Table = tableEnv.fromDataStream(stream)

// convert the DataStream into a Table with fields 'myLong, 'myString
val table2: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)

Table 转换为 DataStream 或 DataSet

Table 可以转换为 DataStream 或者 DataSet,通过这种方式,DataStream 或者 DataSet 程序就可以基于 Table API 或者SQL查询的结果来执行了。

当将一个 Table 转换为 DataStream 或 DataSet 时,需要指定生成的 DataStream 或 DataSet 的数据类型,即需要转换表的行的数据类型,通常最方便的转换类型是 Row,下面列表描述了不同选项的功能:

  1. Row:字段按位置、任意数量字段映射,支持 null 值,无类型安全访问
  2. POJO:字段按名称(POJO 字段命名为 Table 字段)、任意数量字段映射,支持 null 值,类型安全访问
  3. Case Class:字段按位置映射,不支持 null 值,类型安全访问
  4. Tuple:字段按位置映射,不得多于22(Scala)或 25(Java)个字段,不支持 null 值,类型安全访问
  5. Atomic Type:Table 必须有一个字段,不支持 null 值,类型安全访问

Table 转换 DataStream

流式查询的结果表会动态地更新,每个新的记录到达输入流时结果就会发生变化。有两种模式将 Table 转换为 DataStream:

  1. Append Mode:只适用于当动态表仅由 INSERT 修改时,之前的结果不会被更新。
  2. Retract Mode:始终都可以使用此模式,使用一个 boolean 标识来编码 INSERTDELETE 更改。
// 有两个字段的 Table(String name, Integer age)
val table: Table = ...

// 将 Table 转换为 Row 类型的 Append DataStream
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)

// 将 Table 转换为 Tuple2<String, Integer> 类型的 Append DataStream  
val dsTuple: DataStream[(String, Int)] dsTuple = 
  tableEnv.toAppendStream[(String, Int)](table)

// 将 Table 转换为 Row 类型的 Retact DataStream
//   一个 ReactDataStream 的类型X为表示为 DataStream[(Boolean, X)]
//   boolean 字段指定了更改的类型
//   True 是 INSERT, false 是 DELETE
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

Table 转换 DataSet

// 有两个字段的 Table(String name, Integer age)
val table: Table = ...

// 将 Table 转换为 Row 类型的 DataSet
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)

//  将 Table 转换为 Tuple2<String, Integer> 类型的 DataSet
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)

将数据类型映射到表模式(Table schema)

DataStream 和 DataSet API 支持多种数据类型,如:Tuple、POJO、case class 及 Row 类型。

原子类型

Flink 将原生类型(Integer、Double、String...)或泛型类型视为原子类型(Atomic type)。一个原子类型的 DataStream 或 DataSet 可以转换为只有一个属性的 Table,属性的类型根据原子类型推算,并且必须指定属性的名称。

val stream: DataStream[Long] = ...

// convert DataStream into Table with default field name "f0"
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with field name "myLong"
val table: Table = tableEnv.fromDataStream(stream, 'myLong)

Tuple 和 Case Class

Flink 支持 Scala 原生的 Tuple 类型,也为 Java 提供了 Tuple 类。两种类型的 DataStream 和 DataSet 都可以被转换为 Table。通过为所有字段提供名称(基于位置的映射),可以重命名字段。如果未指定字段名,则使用默认字段名。基于名称的映射允许使用别名(as)重新排序字段。

val stream: DataStream[(Long, String)] = ...

// convert DataStream into Table with renamed default field names '_1, '_2
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with field names "myLong", "myString" (position-based)
val table: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)

// convert DataStream into Table with reordered fields "_2", "_1" (name-based)
val table: Table = tableEnv.fromDataStream(stream, '_2, '_1)

// convert DataStream into Table with projected field "_2" (name-based)
val table: Table = tableEnv.fromDataStream(stream, '_2)

// convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based)
val table: Table = tableEnv.fromDataStream(stream, '_2 as 'myString, '_1 as 'myLong)

// define case class
case class Person(name: String, age: Int)
val streamCC: DataStream[Person] = ...

// convert DataStream into Table with default field names 'name, 'age
val table = tableEnv.fromDataStream(streamCC)

// convert DataStream into Table with field names 'myName, 'myAge (position-based)
val table = tableEnv.fromDataStream(streamCC, 'myName, 'myAge)

// convert DataStream into Table with reordered and aliased fields "myAge", "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName)

POJO

Flink 支持使用 POJO 作为复合类型。当将一个 POJO 类型的 DataStream 或 DataSet 转换为 Table 而不指定字段名称时,Table 的字段名称将采用 POJO 原生的字段名称。重命名原始的 POJO 字段需要关键字AS,因为 POJO 没有固定的顺序,名称映射需要原始名称并且不能通过位置来完成。

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// Person is a POJO with field names "name" and "age"
val stream: DataStream[Person] = ...

// convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName)

// convert DataStream into Table with projected field "name" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name)

// convert DataStream into Table with projected and renamed field "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)

Row

Row 类型支持任意数量的字段,并且支持 null 值。字段名称可以通过 RowTypeInfo 来指定或者将一个 Row 类型的 DataStream 或 DataSet 转换为 Table 时指定。Row 类型支持按位置和名字映射。可以通过为所有字段提供名称(基于位置)或为 映射/排序/重命名(基于名称)单独选择字段来重命名字段。

// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
val stream: DataStream[Row] = ...

// convert DataStream into Table with default field names "name", "age"
val table: Table = tableEnv.fromDataStream(stream)

// convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
val table: Table = tableEnv.fromDataStream(stream, 'myName, 'myAge)

// convert DataStream into Table with renamed fields "myName", "myAge" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName, 'age as 'myAge)

// convert DataStream into Table with projected field "name" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name)

// convert DataStream into Table with projected and renamed field "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)

Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/common.html

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,240评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,328评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,182评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,121评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,135评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,093评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,013评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,854评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,295评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,513评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,678评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,398评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,989评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,636评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,801评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,657评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,558评论 2 352

推荐阅读更多精彩内容