Flink SQL Query 语法(三)- Join

主要引用官方文档 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/joins/

操作符

Join

Flink SQL 支持对动态表进行 Join 操作。有几种不同类型的 Join 可以支持多种语义。

默认情况下,Join 的顺序不会优化。表按 FROM 子句中指定的顺序联接。用户可以调整 Join 查询的性能,通过首先列出更新频率最低的表,最后列出更新频率最高的表。

常规 Join

常规 Join 是最通用的联接类型,其中任何新记录或对联接任一侧的更改都是可见的,并影响整个联接结果。例如,如果左侧有一个新记录,它将与右侧所有以前和将来的记录 Join。

SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id

对于流式查询,常规 Join 的语法是最灵活的,并且允许输入表任何类型的更新(插入、更新、删除)。但是,此操作具有重要的操作含义:要求将连接输入的两边永远保持在 Flink 状态(State)中。因此,计算查询结果所需的状态可能会无限增长,这取决于所有输入表和中间联接结果的不同输入行的数量。可以为查询配置提供适当的状态生存时间(TTL),以防止状态大小过大。请注意,这可能会影响查询结果的正确性。

对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于聚合类型和不同分组键的数量。请提供具有有效保留时间间隔的查询配置,以防止状态大小过大。

INNER JOIN

内连接语义,目前仅支持 equi-join,即 join 的联合条件至少拥有一个相等谓词。不支持任何 cross join 和 theta join。

SELECT *
FROM Orders
INNER JOIN Product
ON Orders.product_id = Product.id

OUTER JOIN

外连接语义,Flink 支持 LEFT OUTER JOIN、RIGHT OUTER JOIN 和 FULL OUTER JOIN。目前仅支持 equi-join。

SELECT *
FROM Orders
LEFT JOIN Product
ON Orders.product_id = Product.id

SELECT *
FROM Orders
RIGHT JOIN Product
ON Orders.product_id = Product.id

SELECT *
FROM Orders
FULL OUTER JOIN Product
ON Orders.product_id = Product.id

Interval Joins(时间区间链接)

Interval join 是常规 join 的子集,目前仅支持 equi-join,可以使用流的方式进行处理。

Interval join 需要至少一个相等谓词和一个限制了双方时间的 join 条件,比如,以下谓词是合法的 interval join 条件:

  • ltime = rtime
  • ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
  • ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

下面的示例,所有在收到后四小时内发货的 order 会与他们相关的 shipment 进行 join。

SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time

对于流式查询,与常规 Join 相比,Interval join 只支持带有时间属性的 Append-only 表。由于时间属性是准单调递增的,Flink 可以在不影响结果正确性的情况下从其状态中删除旧值。

Temporal Joins

Temporal Joins 允许和版本表(Version/Temporal 表)进行 Join。这意味着可以在某个时间点检索值。左侧表可以是任意表,每一行与版本表(右侧表)对应行的版本进行 Join,语法如下:

SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
ON table1.column-name1 = table2.column-name1

Event Time Temporal Join

例如,假设我们有一个订单表,每个订单都有不同货币的价格。为了正确地将此表规范化为单一货币(如美元),每个订单都需要从下订单时起使用正确的货币换算率。

-- Create a table of orders. This is a standard
-- append-only dynamic table.
CREATE TABLE orders (
    order_id    STRING,
    price       DECIMAL(32,2),
    currency    STRING,
    order_time  TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time
) WITH (/* ... */);

-- Define a versioned table of currency rates. 
-- This could be from a change-data-capture
-- such as Debezium, a compacted Kafka topic, or any other
-- way of defining a versioned table. 
CREATE TABLE currency_rates (
    currency STRING,
    conversion_rate DECIMAL(32, 2),
    update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL
    WATERMARK FOR update_time AS update_time
) WITH (
   'connector' = 'upsert-kafka',
   /* ... */
);

SELECT 
     order_id,
     price,
     currency,
     conversion_rate,
     order_time,
FROM orders
LEFT JOIN currency_rates FOR SYSTEM TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency

order_id price currency conversion_rate  order_time
====== ==== ======  ============  ========
o_001    11.11  EUR        1.14                    12:00:00
o_002    12.51  EUR        1.10                    12:0600

注意:事件时间 Temporal Join 由左右两侧的水印触发;请确保连接的两侧已正确设置水印。

注意:事件时间 Temporal Join 需要 Join 条件的等价条件中包含主键。

Processing Time Temporal Join

使用 processing time 属性,Join 将始终返回给定键的最新值。

下面的处理时间 Temporal Join 示例显示了表 LatestRates Join Append-only 表 orders。LatestRates 是维度表(例如 HBase 表)。在时间 10:15、10:30、10:52,LatestRates 表的内容如下:

10:15> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        114
Yen           1

10:30> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        114
Yen           1

10:52> SELECT * FROM LatestRates;

currency   rate
======== ======
US Dollar   102
Euro        116     <==== changed from 114 to 116
Yen           1

在 10:15 和 10:30 时,LastestRates 的内容不变。欧元汇率在 10:52 从 114 变为 116。

Orders 是一个 Append-only 表,含有金额和货币的订单信息。例如,在 10:15 有一个2欧元的订单。

SELECT * FROM Orders;

amount currency
====== =========
     2 Euro             <== arrived at time 10:15
     1 US Dollar        <== arrived at time 10:30
     2 Euro             <== arrived at time 10:52

根据这些表,计算所有订单,转换为相同货币。

amount currency     rate   amount*rate
====== ========= ======= ============
     2 Euro          114          228    <== arrived at time 10:15
     1 US Dollar     102          102    <== arrived at time 10:30
     2 Euro          116          232    <== arrived at time 10:52

SQL 查询如下:

SELECT
  o.amount, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency

查询使用处理时间时,左侧的每条记录都将与右侧表的当前版本相关联,新的 Order 记录总是与最新版本的 LatestRates 连接在一起。

查询结果对于处理时间来说是不确定的。处理时间 Temporal join 最常用于通过连接外部表(维度表)补充信息。

Lookup Join

Lookup Join 通常用于通过连接外部表(维度表)补充信息,要求一个表具有处理时间属性,另一个表使 Lookup Source Connector(查看 Connector 特性)。

Lookup join 使用上面的处理时间 Temporal Join 语法,右侧的表支持 Lookup Source Connector。

-- Customers is backed by the JDBC connector and can be used for lookup joins
CREATE TEMPORARY TABLE Customers (
  id INT,
  name STRING,
  country STRING,
  zip STRING
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
  'table-name' = 'customers'
);

-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
    ON o.customer_id = c.id;

在上面的示例中,Orders 表使用 MySQL 的 Customers 表的数据连接维度信息。在 Customer 的行更新时,联接结果不会更新。Lookup join 连接还需要一个强制的相等连接谓词(o.customer_id = c.id)。

Array Expansion(数组展开)

为给定数组中的每个元素返回新行

SELECT order_id, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)

Table Function

连接表和 Table Function 的结果,左(外)表中的每一行将会与调用 Table Function 所产生的所有结果中相关行进行 join。用户自定义表函数( User-defined table functions,UDTFs ) 在执行前必须先注册。

INNER JOIN

若表函数返回了空结果,左表(outer)的行将会被删除。

SELECT order_id, res
FROM Orders,
LATERAL TABLE(table_func(order_id)) t(res)

LEFT OUTER JOIN

若表函数返回了空结果,将会保留相对应的外部行并用空值填充结果。

SELECT order_id, res
FROM Orders
LEFT OUTER JOIN LATERAL TABLE(table_func(order_id)) t(res)
  ON TRUE
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容