Apache Flink - SQL概览

原文来自“金竹”原载于云栖社区

SQL 简述

SQL 是 Structured Query Language 的缩写,最初是由美国计算机科学家Donald D. Chamberlin和 Raymond F. Boyce 在 20 世纪 70 年代早期从 Early History of SQL 中了解关系模型后在 IBM 开发的。该版本最初称为[SEQUEL: A Structured English Query Language](结构化英语查询语言),旨在操纵和检索存储在 IBM 原始准关系数据库管理系统 System R 中的数据。SEQUEL 后来改为 SQL,因为“SEQUEL”是英国 Hawker Siddeley 飞机公司的商标。我们看看这款用于特技飞行的英国皇家空军豪客 Siddeley Hawk T.1A (Looks great):

image.png

第一款 SQL 数据库

在 20 世纪 70 年代后期,Oracle 公司(当时叫 Relational Software,Inc.)开发了基于 SQL 的 RDBMS,并希望将其出售给美国海军,Central Intelligence 代理商和其他美国政府机构。 1979 年 6 月,Oracle 公司为 VAX 计算机推出了第一个商业化的 SQL 实现,即 Oracle V2。

ANSI-SQL 标准的采用

直到 1986 年,ANSI 和 ISO 标准组正式采用了标准的"数据库语言 SQL"语言定义。该标准的新版本发布于 1989,1992,1996,1999,2003,2006,2008,2011,以及最近的 2016。Apache Flink SQL 核心算子的语义设计也参考了19922011等 ANSI-SQL 标准。

SQL 操作及扩展

SQL 是专为查询包含在关系数据库中的数据而设计的,是一种基于 SET 操作的声明性编程语言,而不是像 C 语言一样的命令式编程语言。但是,各大关系数据库厂商在遵循 ANSI-SQL 标准的同时又对标准 SQL 进行扩展,由基于 SET(无重复元素)的操作扩展到基于 BAG(有重复元素)的操作,并且添加了过程编程语言功能,如:Oracle 的 PL/SQL, DB2 的 SQL PL,MySQL - SQL/PSM 以及 SQL Server 的 T-SQL 等等。
随着时间的推移 ANSI-SQL 规范不断完善,所涉及的功能不断丰富,比如在 ANSI-2011 中又增加了 Temporal Table 的标准定义,Temporal Table 的标准在结构化关系数据存储上添加了时间维度信息,这使得关系数据库中不仅可以对当前数据进行查询操作,根据时间版本信息也可以对历史数据进行操作。这些不断丰富的功能极大增强了 SQL 的应用领域。

大数据计算领域对 SQL 的应用

离线计算(批计算)

提及大数据计算领域不得不说 MapReduce 计算模型,MapReduce 最早是由 Google 公司研究提出的一种面向大规模数据处理的并行计算模型和方法,并发于 2004 年发表了论文Simplified Data Processing on Large Clusters
论文发表之后 Apache 开源社区参考 Google MapReduce,基于 Java 设计开发了一个称为 Hadoop 的开源 MapReduce 并行计算框架。很快得到了全球学术界和工业界的普遍关注,并得到推广和普及应用。
但利用 Hadoop 进行 MapReduce 的开发,需要开发人员精通 Java 语言,并了解 MapReduce 的运行原理,这样在一定程度上提高了 MapReduce 的开发门槛,所以在开源社区又不断涌现了一些为了简化 MapReduce 开发的开源框架,其中 Hive 就是典型的代表。HSQL 可以让用户以类 SQL 的方式描述 MapReduce 计算,比如原本需要几十行,甚至上百行才能完成的 wordCount,用户一条 SQL 语句就能完成了,这样极大的降低了 MapReduce 的开发门槛,进而也成功的将 SQL 应用到了大数据计算领域当中来。

实时计算(流计算)

SQL 不仅仅被成功的应用到了离线计算,SQL 的易用性也吸引了流计算产品,目前最热的 Spark,Flink 也纷纷支持了 SQL,尤其是 Flink 支持的更加彻底,集成了 Calcite,完全遵循 ANSI-SQL 标准。Apache Flink 在 low-level API 上面用 DataSet 支持批计算,用 DataStream 支持流计算,但在 High-Level API 上面利用 SQL 将流与批进行了统一,使得用户编写一次 SQL 既可以在流计算中使用,又可以在批计算中使用,为既有流计算业务,又有批计算业务的用户节省了大量开发成本。

SQL 高性能与简洁性

性能

SQL 经过传统数据库领域几十年的不断打磨,查询优化器已经能够极大的优化 SQL 的查询性能,Apache Flink 应用 Calcite 进行查询优化,复用了大量数据库查询优化规则,在性能上不断追求极致,能够让用户关心但不用担心性能问题。如下图(Alibaba 对 Apache Flink 进行架构优化后的组件栈)

相对于 DataStream 而言,SQL 会经过 Optimization 模块透明的为用户进行查询优化,用户专心编写自己的业务逻辑,不用担心性能,却能得到最优的查询性能!

简洁

就简洁性而言,SQL 与 DataSet 和 DataStream 相比具有很大的优越性,我们先用一个 WordCount 示例来直观的查看用户的代码量:

  • DataStream/DataSetAPI
... //省略初始化代码
// 核心逻辑
text.flatMap(new WordCount.Tokenizer()).keyBy(new int[]{0}).sum(1);

// flatmap 代码定义
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        public Tokenizer() {
        }

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split("\\W+");
            String[] var4 = tokens;
            int var5 = tokens.length;

            for(int var6 = 0; var6 < var5; ++var6) {
                String token = var4[var6];
                if (token.length() > 0) {
                    out.collect(new Tuple2(token, 1));
                }
            }

        }
    }
  • SQL
//省略初始化代码 
SELECT word, COUNT(word) FROM tab GROUP BY word;

我们直观的体会到相同的统计功能使用 SQL 的简洁性。

Flink SQL Job 的组成

我们做任何数据计算都离不开读取原始数据,计算逻辑和写入计算结果数据三部分,当然基于 Apache Flink SQL 编写的计算 Job 也离不开这三个部分,如下所示:

image.png

如上所示,一个完整的 Apache Flink SQL Job 由如下三部分:

  • Source Operator - Soruce operator 是对外部数据源的抽象, 目前 Apache Flink 内置了很多常用的数据源实现,比如上图提到的 Kafka。
  • Query Operators - 查询算子主要完成如图的 Query Logic,目前支持了 Union,Join,Projection,Difference, Intersection 以及 window 等大多数传统数据库支持的操作。
  • Sink Operator - Sink operator 是对外结果表的抽象,目前 Apache Flink 也内置了很多常用的结果表的抽象,比如上图提到的 Kafka。

Flink SQL 核心算子

目前 Flink SQL 支持 Union,Join,Projection,Difference, Intersection 以及 Window 等大多数传统数据库支持的操作,接下来为大家分别进行简单直观的介绍。

环境

为了很好的体验和理解 Apache Flink SQL 算子我们需要先准备一下测试环境,我们选择IDEA,以 ITCase 测试方式来进行体验。IDEA 安装这里不占篇幅介绍了,相信大家能轻松搞定!我们进行功能体验有两种方式,具体如下:

源码方式

对于开源爱好者可能更喜欢源代码方式理解和体验 Apache Flink SQL 功能,那么我们需要下载源代码并导入到 IDEA 中:

  • 下载源码:
// 下载源代码
git clone https://github.com/apache/flink.git study
// 进入源码目录
cd study
// 拉取稳定版release-1.6
git fetch origin release-1.6:release-1.6
//切换到稳定版
git checkout release-1.6
//将依赖安装到本地mvn仓库,耐心等待需要一段时间
mvn clean install -DskipTests
  • 导入到 IDEA
    将 Flink 源码导入到 IDEA 过程这里不再占用篇幅,导入后确保在 IDEA 中可以运行 org.apache.flink.table.runtime.stream.sql.SqlITCase 并测试全部通过,即证明体验环境已经完成。如下图所示:
image.png

如上图运行测试后显示测试通过,我们就可以继续下面的 Apache Flink SQL 功能体验了。

依赖 Flink 包方式

我们还有一种更简单直接的方式,就是新建一个 mvn 项目,并在 pom 中添加如下依赖:

<properties>
    <table.version>1.6-SNAPSHOT</table.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table_2.11</artifactId>
      <version>${table.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_2.11</artifactId>
      <version>${table.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_2.11</artifactId>
      <version>${table.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.11</artifactId>
      <version>${table.version}</version>
    </dependency>

    <dependency>
      <groupId>JUnit</groupId>
      <artifactId>JUnit</artifactId>
      <version>4.12</version>
    </dependency>

  </dependencies>

完成环境准备后,我们开始准备测试数据和写一个简单的测试类。

示例数据及测试类

测试数据

  • customer_tab 表 - 客户表保存客户 id,客户姓名和客户描述信息。字段及测试数据如下:
c_id c_name c_desc
c_001 Kevin from JinLin
c_002 Sunny from JinLin
c_003 JinCheng from HeBei
  • order_tab 表 - 订单表保存客户购买的订单信息,包括订单 id,订单时间和订单描述信息。 字段节测试数据如下:
o_id c_id o_time o_desc
o_oo1 c_002 2018-11-05 10:01:01 iphone
o_002 c_001 2018-11-05 10:01:55 ipad
o_003 c_001 2018-11-05 10:03:44 flink book
  • Item_tab
    商品表, 携带商品 id,商品类型,出售时间,价格等信息,具体如下:
itemID itemType onSellTime price
ITEM001 Electronic 2017-11-11 10:01:00 20
ITEM002 Electronic 2017-11-11 10:02:00 50
ITEM003 Electronic 2017-11-11 10:03:00 30
ITEM004 Electronic 2017-11-11 10:03:00 60
ITEM005 Electronic 2017-11-11 10:05:00 40
ITEM006 Electronic 2017-11-11 10:06:00 20
ITEM007 Electronic 2017-11-11 10:07:00 70
ITEM008 Clothes 2017-11-11 10:08:00 20
  • PageAccess_tab
    页面访问表,包含用户 ID,访问时间,用户所在地域信息,具体数据如下:
region userId accessTime
ShangHai U0010 2017-11-11 10:01:00
BeiJing U1001 2017-11-11 10:01:00
BeiJing U2032 2017-11-11 10:10:00
BeiJing U1100 2017-11-11 10:11:00
ShangHai U0011 2017-11-11 12:10:00
  • PageAccessCount_tab
    页面访问表,访问量,访问时间,用户所在地域信息,具体数据如下:
region userCount accessTime
ShangHai 100 2017.11.11 10:01:00
BeiJing 86 2017.11.11 10:01:00
BeiJing 210 2017.11.11 10:06:00
BeiJing 33 2017.11.11 10:10:00
ShangHai 129 2017.11.11 12:10:00
  • PageAccessSession_tab
    页面访问表,访问量,访问时间,用户所在地域信息,具体数据如下:
region userId accessTime
ShangHai U0011 2017-11-11 10:01:00
ShangHai U0012 2017-11-11 10:02:00
ShangHai U0013 2017-11-11 10:03:00
ShangHai U0015 2017-11-11 10:05:00
ShangHai U0011 2017-11-11 10:10:00
BeiJing U0110 2017-11-11 10:10:00
ShangHai U2010 2017-11-11 10:11:00
ShangHai U0410 2017-11-11 12:16:00

测试类

我们创建一个SqlOverviewITCase.scala 用于接下来介绍 Flink SQL 算子的功能体验。代码如下:

import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
import org.junit.rules.TemporaryFolder
import org.junit.{Rule, Test}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

class SqlOverviewITCase {
  val _tempFolder = new TemporaryFolder

  @Rule
  def tempFolder: TemporaryFolder = _tempFolder

  def getStateBackend: StateBackend = {
    new MemoryStateBackend()
  }

  // 客户表数据
  val customer_data = new mutable.MutableList[(String, String, String)]
  customer_data.+=(("c_001", "Kevin", "from JinLin"))
  customer_data.+=(("c_002", "Sunny", "from JinLin"))
  customer_data.+=(("c_003", "JinCheng", "from HeBei"))


  // 订单表数据
  val order_data = new mutable.MutableList[(String, String, String, String)]
  order_data.+=(("o_001", "c_002", "2018-11-05 10:01:01", "iphone"))
  order_data.+=(("o_002", "c_001", "2018-11-05 10:01:55", "ipad"))
  order_data.+=(("o_003", "c_001", "2018-11-05 10:03:44", "flink book"))

  // 商品销售表数据
  val item_data = Seq(
    Left((1510365660000L, (1510365660000L, 20, "ITEM001", "Electronic"))),
    Right((1510365660000L)),
    Left((1510365720000L, (1510365720000L, 50, "ITEM002", "Electronic"))),
    Right((1510365720000L)),
    Left((1510365780000L, (1510365780000L, 30, "ITEM003", "Electronic"))),
    Left((1510365780000L, (1510365780000L, 60, "ITEM004", "Electronic"))),
    Right((1510365780000L)),
    Left((1510365900000L, (1510365900000L, 40, "ITEM005", "Electronic"))),
    Right((1510365900000L)),
    Left((1510365960000L, (1510365960000L, 20, "ITEM006", "Electronic"))),
    Right((1510365960000L)),
    Left((1510366020000L, (1510366020000L, 70, "ITEM007", "Electronic"))),
    Right((1510366020000L)),
    Left((1510366080000L, (1510366080000L, 20, "ITEM008", "Clothes"))),
    Right((151036608000L)))

  // 页面访问表数据
  val pageAccess_data = Seq(
    Left((1510365660000L, (1510365660000L, "ShangHai", "U0010"))),
    Right((1510365660000L)),
    Left((1510365660000L, (1510365660000L, "BeiJing", "U1001"))),
    Right((1510365660000L)),
    Left((1510366200000L, (1510366200000L, "BeiJing", "U2032"))),
    Right((1510366200000L)),
    Left((1510366260000L, (1510366260000L, "BeiJing", "U1100"))),
    Right((1510366260000L)),
    Left((1510373400000L, (1510373400000L, "ShangHai", "U0011"))),
    Right((1510373400000L)))

  // 页面访问量表数据2
  val pageAccessCount_data = Seq(
    Left((1510365660000L, (1510365660000L, "ShangHai", 100))),
    Right((1510365660000L)),
    Left((1510365660000L, (1510365660000L, "BeiJing", 86))),
    Right((1510365660000L)),
    Left((1510365960000L, (1510365960000L, "BeiJing", 210))),
    Right((1510366200000L)),
    Left((1510366200000L, (1510366200000L, "BeiJing", 33))),
    Right((1510366200000L)),
    Left((1510373400000L, (1510373400000L, "ShangHai", 129))),
    Right((1510373400000L)))

  // 页面访问表数据3
  val pageAccessSession_data = Seq(
    Left((1510365660000L, (1510365660000L, "ShangHai", "U0011"))),
    Right((1510365660000L)),
    Left((1510365720000L, (1510365720000L, "ShangHai", "U0012"))),
    Right((1510365720000L)),
    Left((1510365720000L, (1510365720000L, "ShangHai", "U0013"))),
    Right((1510365720000L)),
    Left((1510365900000L, (1510365900000L, "ShangHai", "U0015"))),
    Right((1510365900000L)),
    Left((1510366200000L, (1510366200000L, "ShangHai", "U0011"))),
    Right((1510366200000L)),
    Left((1510366200000L, (1510366200000L, "BeiJing", "U2010"))),
    Right((1510366200000L)),
    Left((1510366260000L, (1510366260000L, "ShangHai", "U0011"))),
    Right((1510366260000L)),
    Left((1510373760000L, (1510373760000L, "ShangHai", "U0410"))),
    Right((1510373760000L)))

  def procTimePrint(sql: String): Unit = {
    // Streaming 环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)

    // 将order_tab, customer_tab 注册到catalog
    val customer = env.fromCollection(customer_data).toTable(tEnv).as('c_id, 'c_name, 'c_desc)
    val order = env.fromCollection(order_data).toTable(tEnv).as('o_id, 'c_id, 'o_time, 'o_desc)

    tEnv.registerTable("order_tab", order)
    tEnv.registerTable("customer_tab", customer)

    val result = tEnv.sqlQuery(sql).toRetractStream[Row]
    val sink = new RetractingSink
    result.addSink(sink)
    env.execute()
  }

  def rowTimePrint(sql: String): Unit = {
    // Streaming 环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setStateBackend(getStateBackend)
    env.setParallelism(1)
    val tEnv = TableEnvironment.getTableEnvironment(env)

    // 将item_tab, pageAccess_tab 注册到catalog
    val item =
      env.addSource(new EventTimeSourceFunction[(Long, Int, String, String)](item_data))
      .toTable(tEnv, 'onSellTime, 'price, 'itemID, 'itemType, 'rowtime.rowtime)

    val pageAccess =
      env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccess_data))
      .toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime)

    val pageAccessCount =
      env.addSource(new EventTimeSourceFunction[(Long, String, Int)](pageAccessCount_data))
      .toTable(tEnv, 'accessTime, 'region, 'accessCount, 'rowtime.rowtime)

    val pageAccessSession =
      env.addSource(new EventTimeSourceFunction[(Long, String, String)](pageAccessSession_data))
      .toTable(tEnv, 'accessTime, 'region, 'userId, 'rowtime.rowtime)

    tEnv.registerTable("item_tab", item)
    tEnv.registerTable("pageAccess_tab", pageAccess)
    tEnv.registerTable("pageAccessCount_tab", pageAccessCount)
    tEnv.registerTable("pageAccessSession_tab", pageAccessSession)

    val result = tEnv.sqlQuery(sql).toRetractStream[Row]
    val sink = new RetractingSink
    result.addSink(sink)
    env.execute()

  }

  @Test
  def testSelect(): Unit = {
    val sql = "替换想要测试的SQL"
    // 非window 相关用 procTimePrint(sql)
    // Window 相关用 rowTimePrint(sql)
  }

}

// 自定义Sink
final class RetractingSink extends RichSinkFunction[(Boolean, Row)] {
  var retractedResults: ArrayBuffer[String] = mutable.ArrayBuffer.empty[String]

  def invoke(v: (Boolean, Row)) {
    retractedResults.synchronized {
      val value = v._2.toString
      if (v._1) {
        retractedResults += value
      } else {
        val idx = retractedResults.indexOf(value)
        if (idx >= 0) {
          retractedResults.remove(idx)
        } else {
          throw new RuntimeException("Tried to retract a value that wasn't added first. " +
                                       "This is probably an incorrectly implemented test. " +
                                       "Try to set the parallelism of the sink to 1.")
        }
      }
    }
    retractedResults.sorted.foreach(println(_))
  }
}

// Water mark 生成器
class EventTimeSourceFunction[T](
  dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
  override def run(ctx: SourceContext[T]): Unit = {
    dataWithTimestampList.foreach {
      case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
      case Right(w) => ctx.emitWatermark(new Watermark(w))
    }
  }

  override def cancel(): Unit = ???
}

Select

SELECT 用于从数据集/流中选择数据,语法遵循 ANSI-SQL 标准,语义是关系代数中的投影(Projection),对关系进行垂直分割,消去某些列, 如下图所示:

image.png

SQL 示例

customer_tab选择用户姓名,并用内置的 CONCAT 函数拼接客户信息,如下:

SELECT c_name, CONCAT(c_name, ' come ', c_desc) as desc  FROM customer_tab;

Result

c_name desc
Kevin Kevin come from JinLin
Sunny Sunny come from JinLin
Jincheng Jincheng come from HeBei

特别说明

大家看到在 SELECT 不仅可以使用普通的字段选择,还可以使用ScalarFunction,当然也包括User-Defined Function,同时还可以进行字段的alias设置。其实SELECT可以结合聚合,在 GROUPBY 部分会进行介绍,一个比较特殊的使用场景是携带 DISTINCT 关键字,示例如下:

SQL 示例

在订单表查询所有的客户 id,消除重复客户 id, 如下:

SELECT DISTINCT c_id FROM order_tab;

Result

c_id
c_001
c_002

WHERE

WHERE 用于从数据集/流中过滤数据,与 SELECT 一起使用,语法遵循 ANSI-SQL 标准,语义是关系代数的 Selection,根据某些条件对关系做水平分割,即选择符合条件的记录,如下所示:

image.png

SQL 示例

customer_tab查询客户 id 为c_001c_003的客户信息,如下:

SELECT c_id, c_name, c_desc FROM customer_tab WHERE c_id = 'c_001' OR c_id = 'c_003';

Result

c_id c_name c_desc
c_001 Kevin from JinLin
c_003 JinCheng from HeBei

特别说明

我们发现WHERE是对满足一定条件的数据进行过滤,WHERE支持=, <, >, <>, >=, <=以及ANDOR等表达式的组合,最终满足过滤条件的数据会被选择出来。并且 WHERE 可以结合IN,NOT IN联合使用,具体如下:

SQL 示例 (IN 常量)

使用 INcustomer_tab查询客户 id 为c_001c_003的客户信息,如下:

SELECT c_id, c_name, c_desc FROM customer_tab WHERE c_id IN ('c_001', 'c_003');

Result

c_id c_name c_desc
c_001 Kevin from JinLin
c_003 JinCheng from HeBei

SQL 示例 (IN 子查询)

使用 IN和 子查询 在customer_tab查询已经下过订单的客户信息,如下:

SELECT c_id, c_name, c_desc FROM customer_tab WHERE c_id IN (SELECT c_id FROM order_tab);

Result

c_id c_name c_desc
c_001 Kevin from JinLin
c_002 Sunny from JinLin

IN/NOT IN 与关系代数

如上介绍 IN 是关系代数中的 Intersection, NOT IN 是关系代数的 Difference, 如下图示意:

  • IN(Intersection)
    image.png
  • NOT IN(Difference)
    image.png

GROUP BY

GROUP BY 是对数据进行分组的操作,比如我需要分别计算一下一个学生表里面女生和男生的人数分别是多少,如下:

image.png

SQL 示例

将 order_tab 信息按 customer_tab 分组统计订单数量,简单示例如下:

SELECT c_id, count(o_id) as o_count FROM order_tab GROUP BY c_id;

Result

c_id o_count
c_001 2
c_002 1

特别说明

在实际的业务场景中,GROUP BY 除了按业务字段进行分组外,很多时候用户也可以用时间来进行分组(相当于划分窗口),比如统计每分钟的订单数量:

SQL 示例

按时间进行分组,查询每分钟的订单数量,如下:

   SELECT SUBSTRING(o_time, 1, 16) AS o_time_min, count(o_id) AS o_count FROM order_tab GROUP BY SUBSTRING(o_time, 1, 16)

Result

o_time_min o_count
2018-11-05 10:01 2
2018-11-05 10:03 1

说明:如果我们时间字段是 timestamp 类型,建议使用内置的 DATE_FORMAT 函数。

UNION ALL

UNION ALL 将两个表合并起来,要求两个表的字段完全一致,包括字段类型、字段顺序,语义对应关系代数的 Union,只是关系代数是 Set 集合操作,会有去重复操作,UNION ALL 不进行去重,如下所示:

image.png

SQL 示例

我们简单的将customer_tab查询 2 次,将查询结果合并起来,如下:

SELECT c_id, c_name, c_desc  FROM customer_tab UNION ALL SELECT c_id, c_name, c_desc  FROM customer_tab

Result

c_id c_name c_desc
c_001 Kevin from JinLin
c_002 Sunny from JinLin
c_003 JinCheng from HeBei
c_001 Kevin from JinLin
c_002 Sunny from JinLin
c_003 JinCheng from HeBei

特别说明

UNION ALL 对结果数据不进行去重,如果想对结果数据进行去重,传统数据库需要进行 UNION 操作。

UNION

UNION 将两个流给合并起来,要求两个流的字段完全一致,包括字段类型、字段顺序,并其 UNION 不同于 UNION ALL,UNION 会对结果数据去重,与关系代数的 Union 语义一致,如下:
[图片上传失败...(image-a1f92a-1592206042833)]

SQL 示例

我们简单的将customer_tab查询 2 次,将查询结果合并起来,如下:

SELECT c_id, c_name, c_desc  FROM customer_tab UNION SELECT c_id, c_name, c_desc  FROM customer_tab

我们发现完全一样的表数据进行 UNION之后,数据是被去重的,UNION之后的数据并没有增加。

Result

c_id c_name c_desc
c_001 Kevin from JinLin
c_002 Sunny from JinLin
c_003 JinCheng from HeBei

特别说明

UNION 对结果数据进行去重,在实际的实现过程需要对数据进行排序操作,所以非必要去重情况请使用 UNION ALL 操作。

JOIN

JOIN 用于把来自两个表的行联合起来形成一个宽表,Apache Flink 支持的 JOIN 类型:

  • JOIN - INNER JOIN
  • LEFT JOIN - LEFT OUTER JOIN
  • RIGHT JOIN - RIGHT OUTER JOIN
  • FULL JOIN - FULL OUTER JOIN

JOIN 与关系代数的 Join 语义相同,具体如下:

image.png

SQL 示例 (JOIN)

INNER JOIN只选择满足ON条件的记录,我们查询customer_taborder_tab表,将有订单的客户和订单信息选择出来,如下:

SELECT * FROM customer_tab AS c JOIN order_tab AS o ON o.c_id = c.c_id

Result

c_id c_name c_desc o_id c_id o_time o_desc
c_001 Kevin from JinLin o_002 c_001 2018-11-05 10:01:55 ipad
c_001 Kevin from JinLin o_003 c_001 2018-11-05 10:03:44 flink book
c_002 Sunny from JinLin o_oo1 c_002 2018-11-05 10:01:01 iphone

SQL 示例 (LEFT JOIN)

LEFT JOININNER JOIN的区别是当右表没有与左边相 JOIN 的数据时候,右边对应的字段补NULL输出,语义如下:

[图片上传失败...(image-5939b9-1592206042833)]

对应的 SQL 语句如下(LEFT JOIN):

SELECT ColA, ColB, T2.ColC, ColE FROM TI LEFT JOIN T2 ON T1.ColC = T2.ColC ;
  • 细心的读者可能发现上面 T2.ColC 是添加了前缀 T2 了,这里需要说明一下,当两张表有字段名字一样的时候,我需要指定是从那个表里面投影的。

我们查询customer_taborder_tab表,将客户和订单信息选择出来如下:

SELECT * FROM customer_tab AS c LEFT JOIN order_tab AS o ON o.c_id = c.c_id

Result

c_id c_name c_desc o_id c_id o_time o_desc
c_001 Kevin from JinLin o_002 c_001 2018-11-05 10:01:55 ipad
c_001 Kevin from JinLin o_003 c_001 2018-11-05 10:03:44 flink book
c_002 Sunny from JinLin o_oo1 c_002 2018-11-05 10:01:01 iphone
c_003 JinCheng from HeBei NULL NULL NULL NULL

特别说明

RIGHT JOIN 相当于 LEFT JOIN 左右两个表交互一下位置。FULL JOIN相当于 RIGHT JOINLEFT JOIN 之后进行UNION ALL操作。

Window

在 Apache Flink 中有 2 种类型的 Window,一种是 OverWindow,即传统数据库的标准开窗,每一个元素都对应一个窗口。一种是 GroupWindow,目前在 SQL 中 GroupWindow 都是基于时间进行窗口划分的。

Over Window

Apache Flink 中对 OVER Window 的定义遵循标准 SQL 的定义语法。
按 ROWS 和 RANGE 分类是传统数据库的标准分类方法,在 Apache Flink 中还可以根据时间类型(ProcTime/EventTime)和窗口的有限和无限(Bounded/UnBounded)进行分类,共计 8 种类型。为了避免大家对过细分类造成困扰,我们按照确定当前行的不同方式将 OVER Window 分成两大类进行介绍,如下:

  • ROWS OVER Window - 每一行元素都视为新的计算行,即,每一行都是一个新的窗口。
  • RANGE OVER Window - 具有相同时间值的所有元素行视为同一计算行,即,具有相同时间值的所有行都是同一个窗口。

Bounded ROWS OVER Window

Bounded ROWS OVER Window 每一行元素都视为新的计算行,即,每一行都是一个新的窗口。

语义

我们以 3 个元素(2 PRECEDING)的窗口为例,如下图:

image.png

上图所示窗口 user 1 的 w5 和 w6, user 2 的 窗口 w2 和 w3,虽然有元素都是同一时刻到达,但是他们仍然是在不同的窗口,这一点有别于 RANGE OVER Window。

语法

Bounded ROWS OVER Window 语法如下:

SELECT 
    agg1(col1) OVER(
     [PARTITION BY (value_expression1,..., value_expressionN)] 
     ORDER BY timeCol
     ROWS 
     BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW) AS colName, 
... 
FROM Tab1
  • value_expression - 进行分区的字表达式;
  • timeCol - 用于元素排序的时间字段;
  • rowCount - 是定义根据当前行开始向前追溯几行元素。
SQL 示例

利用item_tab测试数据,我们统计同类商品中当前和当前商品之前 2 个商品中的最高价格。

SELECT  
    itemID,
    itemType, 
    onSellTime, 
    price,  
    MAX(price) OVER (
        PARTITION BY itemType 
        ORDER BY onSellTime 
        ROWS BETWEEN 2 preceding AND CURRENT ROW) AS maxPrice
  FROM item_tab

Result

itemID itemType onSellTime price maxPrice
ITEM001 Electronic 2017-11-11 10:01:00 20 20
ITEM002 Electronic 2017-11-11 10:02:00 50 50
ITEM003 Electronic 2017-11-11 10:03:00 30 50
ITEM004 Electronic 2017-11-11 10:03:00 60 60
ITEM005 Electronic 2017-11-11 10:05:00 40 60
ITEM006 Electronic 2017-11-11 10:06:00 20 60
ITEM007 Electronic 2017-11-11 10:07:00 70 70
ITEM008 Clothes 2017-11-11 10:08:00 20 20

Bounded RANGE OVER Window

Bounded RANGE OVER Window 具有相同时间值的所有元素行视为同一计算行,即,具有相同时间值的所有行都是同一个窗口。

语义

我们以 3 秒中数据(INTERVAL '2' SECOND)的窗口为例,如下图:

image.png

注意: 上图所示窗口 user 1 的 w6, user 2 的 窗口 w3,元素都是同一时刻到达,他们是在同一个窗口,这一点有别于 ROWS OVER Window。

语法

Bounded RANGE OVER Window 的语法如下:

SELECT 
    agg1(col1) OVER(
     [PARTITION BY (value_expression1,..., value_expressionN)] 
     ORDER BY timeCol
     RANGE 
     BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW) AS colName, 
... 
FROM Tab1
  • value_expression - 进行分区的字表达式;
  • timeCol - 用于元素排序的时间字段;
  • timeInterval - 是定义根据当前行开始向前追溯指定时间的元素行;
SQL 示例

我们统计同类商品中当前和当前商品之前 2 分钟商品中的最高价格。

SELECT  
    itemID,
    itemType, 
    onSellTime, 
    price,  
    MAX(price) OVER (
        PARTITION BY itemType 
        ORDER BY rowtime 
        RANGE BETWEEN INTERVAL '2' MINUTE preceding AND CURRENT ROW) AS maxPrice
  FROM item_tab
Result(Bounded RANGE OVER Window)
itemID itemType onSellTime price maxPrice
ITEM001 Electronic 2017-11-11 10:01:00 20 20
ITEM002 Electronic 2017-11-11 10:02:00 50 50
ITEM003 Electronic 2017-11-11 10:03:00 30 60
ITEM004 Electronic 2017-11-11 10:03:00 60 60
ITEM005 Electronic 2017-11-11 10:05:00 40 60
ITEM006 Electronic 2017-11-11 10:06:00 20 40
ITEM007 Electronic 2017-11-11 10:07:00 70 70
ITEM008 Clothes 2017-11-11 10:08:00 20 20

特别说明

OverWindow 最重要是要理解每一行数据都确定一个窗口,同时目前在 Apache Flink 中只支持按时间字段排序。并且 OverWindow 开窗与 GroupBy 方式数据分组最大的不同在于,GroupBy 数据分组统计时候,在SELECT中除了 GROUP BY 的 key,不能直接选择其他非 key 的字段,但是 OverWindow 没有这个限制,SELECT可以选择任何字段。比如一张表 table(a,b,c,d)4 个字段,如果按 d 分组求 c 的最大值,两种写完如下:

  • GROUP BY - SELECT d, MAX(c) FROM table GROUP BY d
  • OVER Window = SELECT a, b, c, d, MAX(c) OVER(PARTITION BY d, ORDER BY ProcTime())
    如上 OVER Window 虽然 PARTITION BY d,但 SELECT 中仍然可以选择 a,b,c 字段。但在 GROUPBY 中,SELECT 只能选择 d 字段。

Group Window

根据窗口数据划分的不同,目前 Apache Flink 有如下 3 种 Bounded Winodw:

  • Tumble - 滚动窗口,窗口数据有固定的大小,窗口数据无叠加;
  • Hop - 滑动窗口,窗口数据有固定大小,并且有固定的窗口重建频率,窗口数据有叠加;
  • Session - 会话窗口,窗口数据没有固定的大小,根据窗口数据活跃程度划分窗口,窗口数据无叠加。

说明: Aapche Flink 还支持 UnBounded 的 Group Window,也就是全局 Window,流上所有数据都在一个窗口里面,语义非常简单,这里不做详细介绍了。

Tumble

语义

Tumble 滚动窗口有固定 size,窗口数据不重叠,具体语义如下:

image.png

语法

Tumble 滚动窗口对应的语法如下:

SELECT 
    [gk],
    [TUMBLE_START(timeCol, size)], 
    [TUMBLE_END(timeCol, size)], 
    agg1(col1), 
    ... 
    aggn(colN)
FROM Tab1
GROUP BY [gk], TUMBLE(timeCol, size)
  • [gk] - 决定了流是 Keyed 还是/Non-Keyed;
  • TUMBLE_START - 窗口开始时间;
  • TUMBLE_END - 窗口结束时间;
  • timeCol - 是流表中表示时间字段;
  • size - 表示窗口的大小,如 秒,分钟,小时,天。
SQL 示例

利用pageAccess_tab测试数据,我们需要按不同地域统计每 2 分钟的淘宝首页的访问量(PV)。

SELECT  
    region,
    TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS winStart,  
    TUMBLE_END(rowtime, INTERVAL '2' MINUTE) AS winEnd,  
    COUNT(region) AS pv
FROM pageAccess_tab 
GROUP BY region, TUMBLE(rowtime, INTERVAL '2' MINUTE)
Result
region winStart winEnd pv
BeiJing 2017-11-11 02:00:00.0 2017-11-11 02:02:00.0 1
BeiJing 2017-11-11 02:10:00.0 2017-11-11 02:12:00.0 2
ShangHai 2017-11-11 02:00:00.0 2017-11-11 02:02:00.0 1
ShangHai 2017-11-11 04:10:00.0 2017-11-11 04:12:00.0 1

Hop

Hop 滑动窗口和滚动窗口类似,窗口有固定的 size,与滚动窗口不同的是滑动窗口可以通过 slide 参数控制滑动窗口的新建频率。因此当 slide 值小于窗口 size 的值的时候多个滑动窗口会重叠。

语义

Hop 滑动窗口语义如下所示:

image.png

语法

Hop 滑动窗口对应语法如下:

SELECT 
    [gk], 
    [HOP_START(timeCol, slide, size)] ,  
    [HOP_END(timeCol, slide, size)],
    agg1(col1), 
    ... 
    aggN(colN) 
FROM Tab1
GROUP BY [gk], HOP(timeCol, slide, size)
  • [gk] 决定了流是 Keyed 还是/Non-Keyed;
  • HOP_START - 窗口开始时间;
  • HOP_END - 窗口结束时间;
  • timeCol - 是流表中表示时间字段;
  • slide - 是滑动步伐的大小;
  • size - 是窗口的大小,如 秒,分钟,小时,天;
SQL 示例

利用pageAccessCount_tab测试数据,我们需要每 5 分钟统计近 10 分钟的页面访问量(PV).

SELECT  
  HOP_START(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE) AS winStart,  
  HOP_END(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE) AS winEnd,  
  SUM(accessCount) AS accessCount  
FROM pageAccessCount_tab 
GROUP BY HOP(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE)
Result
winStart winEnd accessCount
2017-11-11 01:55:00.0 2017-11-11 02:05:00.0 186
2017-11-11 02:00:00.0 2017-11-11 02:10:00.0 396
2017-11-11 02:05:00.0 2017-11-11 02:15:00.0 243
2017-11-11 02:10:00.0 2017-11-11 02:20:00.0 33
2017-11-11 04:05:00.0 2017-11-11 04:15:00.0 129
2017-11-11 04:10:00.0 2017-11-11 04:20:00.0 129

Session

Seeeion 会话窗口 是没有固定大小的窗口,通过 session 的活跃度分组元素。不同于滚动窗口和滑动窗口,会话窗口不重叠,也没有固定的起止时间。一个会话窗口在一段时间内没有接收到元素时,即当出现非活跃间隙时关闭。一个会话窗口 分配器通过配置 session gap 来指定非活跃周期的时长.

语义

Session 会话窗口语义如下所示:

image.png
语法

Seeeion 会话窗口对应语法如下:

SELECT 
    [gk], 
    SESSION_START(timeCol, gap) AS winStart,  
    SESSION_END(timeCol, gap) AS winEnd,
    agg1(col1),
     ... 
    aggn(colN)
FROM Tab1
GROUP BY [gk], SESSION(timeCol, gap)
  • [gk] 决定了流是 Keyed 还是/Non-Keyed;
  • SESSION_START - 窗口开始时间;
  • SESSION_END - 窗口结束时间;
  • timeCol - 是流表中表示时间字段;
  • gap - 是窗口数据非活跃周期的时长;
SQL 示例

利用pageAccessSession_tab测试数据,我们按地域统计连续的两个访问用户之间的访问时间间隔不超过 3 分钟的的页面访问量(PV).

SELECT  
    region, 
    SESSION_START(rowtime, INTERVAL '3' MINUTE) AS winStart,  
    SESSION_END(rowtime, INTERVAL '3' MINUTE) AS winEnd, 
    COUNT(region) AS pv  
FROM pageAccessSession_tab
GROUP BY region, SESSION(rowtime, INTERVAL '3' MINUTE)
Result
region winStart winEnd pv
BeiJing 2017-11-11 02:10:00.0 2017-11-11 02:13:00.0 1
ShangHai 2017-11-11 02:01:00.0 2017-11-11 02:08:00.0 4
ShangHai 2017-11-11 02:10:00.0 2017-11-11 02:14:00.0 2
ShangHai 2017-11-11 04:16:00.0 2017-11-11 04:19:00.0 1

UDX

Apache Flink 除了提供了大部分 ANSI-SQL 的核心算子,也为用户提供了自己编写业务代码的机会,那就是 User-Defined Function,目前支持如下三种 User-Defined Function:

  • UDF - User-Defined Scalar Function
  • UDTF - User-Defined Table Function
  • UDAF - User-Defined Aggregate Funciton

UDX 都是用户自定义的函数,那么 Apache Flink 框架为啥将自定义的函数分成三类呢?是根据什么划分的呢?Apache Flink 对自定义函数进行分类的依据是根据函数语义的不同,函数的输入和输出不同来分类的,具体如下:

UDX INPUT OUTPUT INPUT:OUTPUT
UDF 单行中的N(N>=0)列 单行中的1列 1:1
UDTF 单行中的N(N>=0)列 M(M>=0)行 1:N(N>=0)
UDAF M(M>=0)行中的每行的N(N>=0)列 单行中的1列 M:1(M>=0)

UDF

  • 定义
    用户想自己编写一个字符串联接的 UDF,我们只需要实现ScalarFunction#eval()方法即可,简单实现如下:
object MyConnect extends ScalarFunction {
  @varargs
  def eval(args: String*): String = {
    val sb = new StringBuilder
    var i = 0
    while (i < args.length) {
      if (args(i) == null) {
        return null
      }
      sb.append(args(i))
      i += 1
    }
    sb.toString
  }
}
  • 使用
 val fun = MyConnect
 tEnv.registerFunction("myConnect", fun)
 val sql = "SELECT myConnect(a, b) as str FROM tab"

UDTF

  • 定义
    用户想自己编写一个字符串切分的 UDTF,我们只需要实现TableFunction#eval()方法即可,简单实现如下:

ScalarFunction#eval()

class MySplit extends TableFunction[String] {
  def eval(str: String): Unit = {
    if (str.contains("#")){
      str.split("#").foreach(collect)
    }
  }

  def eval(str: String, prefix: String): Unit = {
    if (str.contains("#")) {
      str.split("#").foreach(s => collect(prefix + s))
    }
  }
}
  • 使用
val fun = new MySplit()
tEnv.registerFunction("mySplit", fun)
val sql = "SELECT c, s FROM MyTable, LATERAL TABLE(mySplit(c)) AS T(s)"

UDAF

  • 定义
    UDAF 要实现的接口比较多,我们以一个简单的 CountAGG 为例,做简单实现如下:
/** The initial accumulator for count aggregate function */
class CountAccumulator extends JTuple1[Long] {
  f0 = 0L //count
}

/**
  * User-defined count aggregate function
  */
class MyCount
  extends AggregateFunction[JLong, CountAccumulator] {

  // process argument is optimized by Calcite.
  // For instance count(42) or count(*) will be optimized to count().
  def accumulate(acc: CountAccumulator): Unit = {
    acc.f0 += 1L
  }

  // process argument is optimized by Calcite.
  // For instance count(42) or count(*) will be optimized to count().
  def retract(acc: CountAccumulator): Unit = {
    acc.f0 -= 1L
  }

  def accumulate(acc: CountAccumulator, value: Any): Unit = {
    if (value != null) {
      acc.f0 += 1L
    }
  }

  def retract(acc: CountAccumulator, value: Any): Unit = {
    if (value != null) {
      acc.f0 -= 1L
    }
  }

  override def getValue(acc: CountAccumulator): JLong = {
    acc.f0
  }

  def merge(acc: CountAccumulator, its: JIterable[CountAccumulator]): Unit = {
    val iter = its.iterator()
    while (iter.hasNext) {
      acc.f0 += iter.next().f0
    }
  }

  override def createAccumulator(): CountAccumulator = {
    new CountAccumulator
  }

  def resetAccumulator(acc: CountAccumulator): Unit = {
    acc.f0 = 0L
  }

  override def getAccumulatorType: TypeInformation[CountAccumulator] = {
    new TupleTypeInfo(classOf[CountAccumulator], BasicTypeInfo.LONG_TYPE_INFO)
  }

  override def getResultType: TypeInformation[JLong] =
    BasicTypeInfo.LONG_TYPE_INFO
}
  • 使用
val fun = new MyCount()
tEnv.registerFunction("myCount", fun)
val sql = "SELECT myCount(c) FROM MyTable GROUP BY  a"

Source&Sink

上面我们介绍了 Apache Flink SQL 核心算子的语法及语义,这部分将选取 Bounded EventTime Tumble Window 为例为大家编写一个完整的包括 Source 和 Sink 定义的 Apache Flink SQL Job。假设有一张淘宝页面访问表(PageAccess_tab),有地域,用户 ID 和访问时间。我们需要按不同地域统计每 2 分钟的淘宝首页的访问量(PV). 具体数据如下:

region userId accessTime
ShangHai U0010 2017-11-11 10:01:00
BeiJing U1001 2017-11-11 10:01:00
BeiJing U2032 2017-11-11 10:10:00
BeiJing U1100 2017-11-11 10:11:00
ShangHai U0011 2017-11-11 12:10:00

Source 定义

自定义 Apache Flink Stream Source 需要实现StreamTableSource, StreamTableSource中通过StreamExecutionEnvironmentaddSource方法获取DataStream, 所以我们需要自定义一个 SourceFunction, 并且要支持产生 WaterMark,也就是要实现DefinedRowtimeAttributes接口。

Source Function 定义

支持接收携带 EventTime 的数据集合,Either 的数据结构,Right 表示 WaterMark 和 Left 表示数据:

class MySourceFunction[T](dataWithTimestampList: Seq[Either[(Long, T), Long]]) 
  extends SourceFunction[T] {
  override def run(ctx: SourceContext[T]): Unit = {
    dataWithTimestampList.foreach {
      case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
      case Right(w) => ctx.emitWatermark(new Watermark(w))
    }
  }
  override def cancel(): Unit = ???
}

定义 StreamTableSource

我们自定义的 Source 要携带我们测试的数据,以及对应的 WaterMark 数据,具体如下:

class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {

  val fieldNames = Array("accessTime", "region", "userId")
  val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING))
  val rowType = new RowTypeInfo(
    Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
    fieldNames)

  // 页面访问表数据 rows with timestamps and watermarks
  val data = Seq(
    Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")),
    Right(1510365660000L),
    Left(1510365660000L, Row.of(new JLong(1510365660000L), "BeiJing", "U1001")),
    Right(1510365660000L),
    Left(1510366200000L, Row.of(new JLong(1510366200000L), "BeiJing", "U2032")),
    Right(1510366200000L),
    Left(1510366260000L, Row.of(new JLong(1510366260000L), "BeiJing", "U1100")),
    Right(1510366260000L),
    Left(1510373400000L, Row.of(new JLong(1510373400000L), "ShangHai", "U0011")),
    Right(1510373400000L)
  )

  override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
    Collections.singletonList(new RowtimeAttributeDescriptor(
      "accessTime",
      new ExistingField("accessTime"),
      PreserveWatermarks.INSTANCE))
  }

  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    execEnv.addSource(new MySourceFunction[Row](data)).setParallelism(1).returns(rowType)
  }

  override def getReturnType: TypeInformation[Row] = rowType

  override def getTableSchema: TableSchema = schema

}

Sink 定义

我们简单的将计算结果写入到 Apache Flink 内置支持的 CSVSink 中,定义 Sink 如下:

def getCsvTableSink: TableSink[Row] = {
    val tempFile = File.createTempFile("csv_sink_", "tem")
    // 打印sink的文件路径,方便我们查看运行结果
    println("Sink path : " + tempFile)
    if (tempFile.exists()) {
      tempFile.delete()
    }
    new CsvTableSink(tempFile.getAbsolutePath).configure(
      Array[String]("region", "winStart", "winEnd", "pv"),
      Array[TypeInformation[_]](Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG))
  }

构建主程序

主程序包括执行环境的定义,Source/Sink 的注册以及统计查 SQL 的执行,具体如下:

def main(args: Array[String]): Unit = {
    // Streaming 环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)

    // 设置EventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //方便我们查出输出数据
    env.setParallelism(1)

    val sourceTableName = "mySource"
    // 创建自定义source数据结构
    val tableSource = new MyTableSource

    val sinkTableName = "csvSink"
    // 创建CSV sink 数据结构
    val tableSink = getCsvTableSink

    // 注册source
    tEnv.registerTableSource(sourceTableName, tableSource)
    // 注册sink
    tEnv.registerTableSink(sinkTableName, tableSink)

    val sql =
      "SELECT  " +
      "  region, " +
      "  TUMBLE_START(accessTime, INTERVAL '2' MINUTE) AS winStart," +
      "  TUMBLE_END(accessTime, INTERVAL '2' MINUTE) AS winEnd, COUNT(region) AS pv " +
      " FROM mySource " +
      " GROUP BY TUMBLE(accessTime, INTERVAL '2' MINUTE), region"

    tEnv.sqlQuery(sql).insertInto(sinkTableName);
    env.execute()
  }

执行并查看运行结果

执行主程序后我们会在控制台得到 Sink 的文件路径,如下:

Sink path : /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem

Cat 方式查看计算结果,如下:

jinchengsunjcdeMacBook-Pro:FlinkTableApiDemo jincheng.sunjc$ cat /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem
ShangHai,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1
BeiJing,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1
BeiJing,2017-11-11 02:10:00.0,2017-11-11 02:12:00.0,2
ShangHai,2017-11-11 04:10:00.0,2017-11-11 04:12:00.0,1

表格化如上结果:

region winStart winEnd pv
BeiJing 2017-11-11 02:00:00.0 2017-11-11 02:02:00.0 1
BeiJing 2017-11-11 02:10:00.0 2017-11-11 02:12:00.0 2
ShangHai 2017-11-11 02:00:00.0 2017-11-11 02:02:00.0 1
ShangHai 2017-11-11 04:10:00.0 2017-11-11 04:12:00.0 1

上面这个端到端的完整示例也可以应用到本篇前面介绍的其他算子示例中,只是大家根据 Source 和 Sink 的 Schema 不同来进行相应的构建即可!

总结

本篇概要的向大家介绍了 SQL 的由来,Apache Flink SQL 大部分核心功能,并附带了具体的测试数据和测试程序,最后以一个 End-to-End 的示例展示了如何编写 Apache Flink SQL 的 Job 收尾。本篇着重向大家介绍 Apache Flink SQL 的使用,后续我们再继续探究每个算子的实现原理。

附录

关于点赞和评论

本系列文章难免有很多缺陷和不足,真诚希望读者对有收获的篇章给予点赞鼓励,对有不足的篇章给予反馈和建议,先行感谢大家!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容