2022-06-22-Flink-50(二. SQL手册)

1. DDL:CREATE

建表语句
CREATE TABLE [catalog_name.][db_name.]table_name
  (
    { <column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
  )
  [COMMENT table_comment]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  WITH (key1=val1, key2=val2, ...)

<column_definition>:
  column_name column_type [COMMENT column_comment]

<computed_column_definition>:
  column_name AS computed_column_expression [COMMENT column_comment]

<watermark_definition>:
  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
表中的列

1. 物理列

物理列是数据库中所说的常规列.其定义了物理介质中存储的数据字段名称,类型,顺序.其他类型的列可以在物理列之间声明,但不会影响最终的物理列的读取

CREATE TABLE MyTable (
  MyField1 INT,
  MyField2 STRING,
  MyField3 BOOLEAN
) WITH (
  ...
)

2. 元数据列

元数据是SQL标准的扩展,允许访问数据本身具有的一些元数据。元数据列由 METADATA 关键字标识

CREATE TABLE MyTable (
  MyField1 INT,
  MyField2 STRING,
  MyField3 BOOLEAN
  -- 读取 kafka 本身自带的时间戳
  `et` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka'
  ...
);

如果自定义的列名称和 Connector 中定义 metadata 字段的名称一样的话,FROM xxx 子句是可以被省略的

CREATE TABLE MyTable (
  MyField1 INT,
  MyField2 STRING,
  MyField3 BOOLEAN
  -- 读取 kafka 本身自带的时间戳
 `timestamp` TIMESTAMP_LTZ(3) METADATA
) WITH (
  'connector' = 'kafka'
  ...
);

如果自定义列的数据类型和 Connector 中定义的 metadata 字段的数据类型不一致的话,程序运行时会自动 cast 强转。但是这要求两种数据类型是可以强转的

关于FlinkSQL的每种Connector都提供了哪些metadata字段,可以参考如下:
Overview | Apache Flink

默认情况下,Flink SQL planner 认为 metadata 列是可以 读取 也可以写入 的。但是有些外部存储系统的元数据信息是只能用于读取,不能写入的。那么在往一个表写入的场景下,我们就可以使用 VIRTUAL 关键字来标识某个元数据列不写入到外部存储中(不持久化)。

CREATE TABLE MyTable (
  -- sink 时会写入
  `timestamp` BIGINT METADATA,
  -- sink 时不写入
  `offset` BIGINT METADATA VIRTUAL,
  `user_id` BIGINT,
  `name` STRING,
) WITH (
  'connector' = 'kafka'
  ...
);

3. 计算列
计算列其实就是在写建表的DDL时,可以拿已经有的一些列经过一些自定义的运算生成的新列,这些列本身时没有以物理形式存储到数据源的,计算列可以包含其他列,常量或者函数,但是不能写进一个子查询进去;如果只是简单的四则运算的话直接写在 DML 中就可以,但是计算列一般是用于定义时间属性的(因为在 SQL 任务中时间属性只能在 DDL 中定义,不能在 DML 语句中定义)。和虚拟 metadata 列是类似的,计算列也是只能读不能写的

Watermark

具体 SQL 语法标准是

WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

rowtime_column_name:表的事件时间属性字段。该列必须是 TIMESTAMP(3)、TIMESTAMP_LTZ(3) 类,这个时间可以是一个计算列
watermark_strategy_expression:定义 Watermark 的生成策略。Watermark 的一般都是由 rowtime_column_name 列减掉一段固定时间间隔。SQL 中 Watermark 的生产策略是:当前 Watermark 大于上次发出的 Watermark 时发出当前 Watermark
Watermark 的发出频率:Watermark 发出一般是间隔一定时间的,Watermark 的发出间隔时间可以由 pipeline.auto-watermark-interval 进行配置

Flink SQL 提供了几种 WATERMARK 生产策略:

  1. 有界无序:设置方式为 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit
  2. 严格升序:设置方式为 WATERMARK FOR rowtime_column AS rowtime_column
  3. 递增:设置方式为 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND
Create Table With子句

Flink SQL 已经提供了一系列的内置 Connector,具体可见 Overview | Apache Flink

Create Table Like 子句

CREATE Statements | Apache Flink

REATE TABLE Orders (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3)
) WITH ( 
    'connector' = 'kafka',
    'scan.startup.mode' = 'earliest-offset'
);

CREATE TABLE Orders_with_watermark (
    -- Add watermark definition
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    -- Overwrite the startup-mode
    'scan.startup.mode' = 'latest-offset'
)
LIKE Orders;

上面这个语句的效果就等同于:

CREATE TABLE Orders_with_watermark (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    'connector' = 'kafka',
    'scan.startup.mode' = 'latest-offset'
);

2. DML:With

应用场景(支持 Batch\Streaming):With 语句和离线 Hive SQL With 语句一样的

3. DML:SELECT & WHERE 子句

应用场景(支持 Batch\Streaming):With 语句和离线 Hive SQL With 语句一样的

Table table = tableEnv.sqlQuery("SELECT a,b FROM (VALUES(1,1),(2,2)) as t (a,b)"); //自定义 Source 的数据
Table table = tableEnv.sqlQuery("SELECT a,UDF(b) FROM kafka_table"); //使用 UDF 做字段标准化处理

4. DML:SELECT DISTINCT 子句

对于实时任务,计算时的状态可能会无限增长

5. DML: 窗口聚合

  1. 滚动窗口(TUMBLE)
  2. 滑动窗口(HOP)
  3. Session 窗口(SESSION)
  4. 累计窗口(CUMULATE)
Window TVF 支持 Grouping Sets、Rollup、Cube

6. DML:Group 聚合

窗口聚合和 Group by 聚合的差异在于:
  1. 本质区别:窗口聚合是具有时间语义的,其本质是想实现窗口结束输出结果之后,后续有迟到的数据也不会对原有的结果发生更改了,即输出结果值是定值(不考虑 allowLateness)。而 Group by 聚合是没有时间语义的,不管数据迟到多长时间,只要数据来了,就把上一次的输出的结果数据撤回,然后把计算好的新的结果数据发出
  2. 运行层面:窗口聚合是和 时间 绑定的,窗口聚合其中窗口的计算结果触发都是由时间(Watermark)推动的。Group by 聚合完全由数据推动触发计算,新来一条数据去根据这条数据进行计算出结果发出;由此可见两者的实现方式也大为不同
Group 聚合支持 Grouping sets、Rollup、Cube

7. DML:Over 聚合

SELECT order_id, order_time, amount,
  SUM(amount) OVER (
    PARTITION BY product
    ORDER BY order_time
    RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW //按时间区间聚合
  //ROWS BETWEEN 5 PRECEDING AND CURRENT ROW //按行聚合
  ) AS one_hour_prod_amount_sum
FROM Orders
SELECT
  agg_func(agg_col) OVER (
    [PARTITION BY col1[, col2, ...]]
    ORDER BY time_col
    range_definition),
  ...
FROM ...

range_definition:这个标识聚合窗口的聚合数据范围,在 Flink 中有两种指定数据范围的方式。第一种为 按照行数聚合,第二种为 按照时间区间聚合

当然,如果你在一个SELECT 中有多个聚合窗口的聚合方式,FlinkSQL提供一种简化方法

//over 窗口
Table table6 =  tableEnv.sqlQuery("select user_name, COUNT(1) OVER w as cnt from clickTable WINDOW w as (PARTITION BY user_name order by et rows between unbounded preceding and current row) ");

8. DML:Joins(重要)

Join | Apache Flink

  1. Regular Join:流与流的 Join,包括 Inner Equal Join、Outer Equal Join
    Join 的流程是左流新来一条数据之后,会和右流中符合条件的所有数据做 Join,然后输出。流的上游是无限的数据,所以要做到关联的话,Flink 会将两条流的所有数据都存储在 State 中,所以 Flink 任务的 State 会无限增大,因此你需要为 State 配置合适的 TTL,以防止 State 过大
  2. Interval Join:流与流的 Join,两条流一段时间区间内的 Join
  3. Temporal Join:流与流的 Join,包括事件时间,处理时间的 Temporal Join,类似于离线中的快照 Join
  4. Lookup Join:流与外部维表的 Join
  5. Array Expansion:表字段的列转行,类似于 Hive 的 explode 数据炸开的列转行
  6. Table Function:自定义函数的表字段的列转行,支持 Inner Join 和 Left Outer Join

9. DML:集合操作

  1. UNION ALL:将集合合并,不做去重
  2. Intersect:交集并且去重
  3. Intersect ALL:交集不做去重
  4. Except:差集并且去重
  5. Except ALL:差集不做去重

10. DML:Order By、Limit 子句(开发不用)

11. DML:TopN 子句

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
   FROM table_name)
WHERE rownum <= N [AND conditions]

12. DML:Window TopN

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]
       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
   FROM table_name) -- windowing TVF
WHERE rownum <= N [AND conditions]

13. DML:Deduplication

应用场景:Deduplication 其实就是去重,也即上文介绍到的 TopN 中 row_number = 1 的场景,比如上游数据发重了,或者计算 DAU 明细数据等场景,都可以使用 Deduplication 语法去做去重

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
       ORDER BY time_attr [asc|desc]) AS rownum
   FROM table_name)
WHERE rownum = 1
CREATE TABLE Orders (
  order_time  STRING,
  user        STRING,
  product     STRING,
  num         BIGINT,
  proctime AS PROCTIME()
) WITH (...);

-- remove duplicate rows on order_id and keep the first occurrence row,
-- because there shouldn't be two orders with the same order_id.
SELECT order_id, user, product, num
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime ASC) AS row_num
  FROM Orders)
WHERE row_num = 1

14. EXPLAIN 子句

EXPLAIN PLAN FOR <query_statement_or_insert_statement>

15. USE、 SHOW、 LOAD、UNLOAD 子句

USE 语句 | Apache Flink

18. SQL Hints

Hints | Apache Flink

参考...
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342