流式概念
Flink 的 Table API 和 SQL 是流批统一的 API。 这意味着 Table API & SQL 在无论有限的批式输入还是无限的流式输入下,都具有相同的语义。 因为传统的关系代数以及 SQL 最开始都是为了批式处理而设计的, 关系型查询在流式场景下不如在批式场景下容易懂。
1、状态管理
一个表程序(Table program)可以配置一个 state backend 和多个不同的 checkpoint 选项 以处理对不同状态大小和容错需求。这可以对正在运行的 Table API & SQL 管道(pipeline)生成 savepoint,并在这之后用其恢复应用程序的状态。
状态使用
- 由于 Table API & SQL 程序是声明式的,管道内的状态会在哪以及如何被使用并不明确。 Planner 会确认是否需要状态来得到正确的计算结果, 管道会被现有优化规则集优化成尽可能少地使用状态。
- 形如 SELECT ... FROM ... WHERE 这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。
- 然而诸如 join、 聚合或去重操作需要在 Flink 抽象的容错存储内保存中间结果。flink进行 join 操作
提供了优化窗口和时段Join聚合 以利用 watermarks概念来让保持较小的状态规模。
2、动态表
- Flink 如何在++无界数据集++上实现与++数据库引擎在有界数据上的处理++具有相同的语义
1) DataStream 上的关系查询
关系代数 / SQL | 流处理 |
---|---|
关系(或表)是有界(多)元组集合。 | 流是一个无限元组序列。 |
对批数据(例如关系数据库中的表)执行的查询可以访问完整的输入数据。 | 流式查询在启动时不能访问所有数据,必须“等待”数据流入。 |
批处理查询在产生固定大小的结果后终止。 | 流查询不断地根据接收到的记录更新其结果,并且始终不会结束。 |
如何使用++关系查询++和++sql++处理流计算呢?-- 2)3)4)
2)动态表 & 连续查询(Continuous Query)
动态表 是 Flink 的支持流数据的 Table API 和 SQL 的核心概念。与表示批处理数据的静态表不同,动态表是随时间变化的。可以像查询静态批处理表一样查询它们。
3)在流上定义表
- 连续查询:在动态表上计算一个连续查询,并生成一个新的动态表。连续查询从不终止,并根据其输入表上的更新更新其结果表。在任何时候,连续查询的结果在语义上与以批处理模式在输入表快照上执行的相同查询的结果相同。
4)表到流的转换
动态表可以像普通数据库表一样通过 INSERT、UPDATE 和 DELETE 来不断修改。它可能是一个只有一行、不断更新的表,也可能是一个 insert-only 的表,没有 UPDATE 和 DELETE 修改,或者介于两者之间的其他表。
3、时间属性
背景:
确定性概念“如果一个操作在重复相同的输入值时能保证计算出相同的结果,那么该操作就是确定性的”。流计算任务中的动态函数,会造成不确定性。
时间属性:
Flink 可以基于几种不同的 时间 概念来处理数据。
- 处理时间 指的是执行具体操作时的机器时间(大家熟知的绝对时间, 例如 Java的 System.currentTimeMillis()) )
- 事件时间 指的是数据本身携带的时间。这个时间是在事件产生时的时间。
- 摄入时间 指的是数据进入 Flink 的时间;在系统内部,会把它当做事件时间来处理
1)处理时间
共有三种方法可以定义处理时间
- 在创建表的 DDL 中定义:处理时间属性可以在创建表的 DDL 中用计算列的方式定义,用 PROCTIME() 就可以定义处理时间,函数 PROCTIME() 的返回类型是 TIMESTAMP_LTZ 。
user_action_time AS PROCTIME() -- 声明一个额外的列作为处理时间属性
- 在 DataStream 到 Table 转换时定义
处理时间属性可以在 schema 定义的时候用 .proctime 后缀来定义。时间属性一定不能定义在一个已有字段上,所以它只能定义在 schema 定义的最后。 - 使用 TableSource 定义:在实现了 DefinedProctimeAttribute 的 TableSource 中定义
2)事件时间
事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。它可以保证从外部存储读取数据后产生可以复现(replayable)的结果。为了能够处理乱序的事件,并且区分正常到达和晚到的事件,Flink 需要从事件中获取事件时间并且产生 watermark(watermarks)。 同样也有3中定义方式
- 在 DDL 中定义:WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式,同时标记这个已有字段为时间属性字段
-- 声明 user_action_time 是事件时间属性,并且用 延迟 5 秒的策略来生成 watermark
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
在 DataStream 到 Table 转换时定义:事件时间属性可以用 .rowtime 后缀在定义 DataStream schema 的时候来定义(可以是在schema的结尾追加一个新字段,也可以替换一个已经存在的字段)
使用 TableSource 定义:在实现了 DefinedRowTimeAttributes 的 TableSource 中定义。
4、时态表
时态表(Temporal Table)是一张随时间变化的表。
版本表: 如果时态表中的记录可以追踪和并访问它的历史版本,这种表我们称之为版本表,来自数据库的 changelog 可以定义成版本表。
普通表: 如果时态表中的记录仅仅可以追踪并和它的最新版本,这种表我们称之为普通表,来自数据库 或 HBase 的表可以定义成普通表。
声明版本表
在 Flink 中,定义了主键约束和事件时间属性的表就是版本表
-- 定义一张版本表
CREATE TABLE product_changelog (
product_id STRING,
product_name STRING,
product_price DECIMAL(10, 4),
update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
PRIMARY KEY(product_id) NOT ENFORCED, -- (1) 定义主键约束
WATERMARK FOR update_time AS update_time -- (2) 通过 watermark 定义事件时间
) WITH (
'connector' = 'kafka',
'topic' = 'products',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'value.format' = 'debezium-json'
);
声明版本视图
Flink 也支持定义版本视图只要一个视图包含主键和事件时间便是一个版本视图。
CREATE VIEW versioned_rates AS
SELECT currency, rate, currency_time -- (1) `currency_time` 保留了事件时间
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY currency -- (2) `currency` 是去重 query 的 unique key,可以作为主键
ORDER BY currency_time DESC) AS rowNum
FROM RatesHistory ) -- RatesHistory是一张版本表
WHERE rowNum = 1;
声明普通表
普通表的声明和 Flink 建表 DDL 一致
-- 用 DDL 定义一张 HBase 表,然后我们可以在 SQL 中将其当作一张时态表使用
-- 'currency' 列是 HBase 表中的 rowKey
CREATE TABLE LatestRates (
currency STRING,
fam1 ROW<rate DOUBLE>
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'rates',
'zookeeper.quorum' = 'localhost:2181'
);