一. Flink 1.9 新特性
总的变更:https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12344601
发布时间:2019.8.22
1. Flink Table API/SQL
1.1 Flink SQL DDL 支持
到目前为止,Flink SQL 已经支持 DML 语句(如 SELECT,INSERT)。但是外部表(table source 和 table sink)必须通过 Java/Scala 代码或配置文件的方式注册。1.9 版本中,支持 SQL DDL 语句的方式注册和删除表(CREATE TABLE,DROP TABLE)。不过目前还没有增加流特定的语法扩展来定义时间戳抽取和 watermark 生成策略等。流式的需求也将会在下一版本中完整支持。
flink1.6
- 只支持dml(select和insert)
flink1.9
- define/alter/delete sink/source table
- define/alter/delete view
- define/replace/delete 用户定义的类型type(eg: CREATE [ OR REPLACE ] TYPE name AS fieldType)
- 可以加载外部的function来作为udf使用
1.2 新 Blink Table/SQL Planner
1.2.1 现在有两个插件化的查询处理器来执行 Table API 和 SQL:1.9 以前的 Flink 处理器和新的基于 Blink 的处理器。两个查询处理器之间的语义和功能大部分是一致的,但未完全对齐。不过,Blink 的查询处理器尚未完全集成。因此,1.9 之前的 Flink 处理器仍然是 1.9 版本的默认处理器,建议用于生产设置。
1.2.2 基于 Blink 的查询处理器还提供了更强大的流处理能力,包括一些社区期待已久的新功能(如维表 Join,TopN,去重)和聚合场景缓解数据倾斜的优化,以及内置更多常用的函数。
1.2.3 使用限制(非常细节的东西),在后面的版本会支持:
- batch job如果使用blink planner,需要使用TableEnvironment,不能用BatchTableEnvironment。
- StreamTableSink的实现类需要实现consumeDataStream()方法,而不是emitDataStream()方法。
- 不支持Table.flatAggregate。
- batch job不支持session/count window。
- Blink planner只支持新Catalog API,不再支持ExternalCatalog。
1.3 Table API / SQL 的其他改进
1.3.1 重构 Table API / SQL 的类型系统
Flink 1.9 实现了一个新的数据类型系统,以便从 Table API 中移除对 Flink TypeInformation的依赖,并提高其对 SQL 标准的遵从性,不过还在进行中,预计将在下一版本1.10完工,并且在 Flink 1.9 中,UDF 尚未移植到新的类型系统上。如果直接升级到1.9,等1.10完善之后,改动还会比较大。由于blink planner和新的数据类型系统是同时进行的,新的planner并没有支持所有的数据类型,并且以后新的planner可能对有的数据类型不再进行支持(风险点)。
1.3.2 为 Table API / SQL 的 Java 用户去除 Scala 依赖
1.3.3 Table API 的多列和多行转换
1.3.4 重构和统一 Catalog API
在此之前,通过 Table API 或 SQL 定义的表都无法持久化保存;从 Flink 1.9 起,这些表的元数据可以被持久化到 catalog 中。这意味着用户可以在 Hive Metastore Catalog 中创建 Kafka 表,并在 query 中直接引用该表。
1.3.5 Hive 集成预览(beta)
最近,社区开始为 Flink Table API 和 SQL 实现一个连接到 Hive Metastore 的外部 catalog。在 Flink 1.9 中,用户能够查询和处理存储在 Hive 中多种格式的数据。 Hive 集成还包括支持在 Flink Table API / SQL 中使用 Hive 的 UDF。
在以前,Table API / SQL 中定义的表一直是临时的。新的 catalog 连接器允许在 Metastore 中持久化存储那些使用 SQL DDL 语句创建的表。这意味着可以直接连接到 Metastore 并注册一个表,例如,Kafka topic 的表。从现在开始,只要 catalog 连接到 Metastore,就可以查询该表。
请注意 Flink 1.9 中提供的 Hive 支持目前还是实验性的,下一个版本中将稳定这些功能。
1.3.6 为hbase增加Upsert table sink factory
1.3.7 为Table API增加map、flatmap、aggregate算子
2. Runtime & core
2.1 细粒度批作业恢复 (FLIP-1)
批作业(DataSet、Table API 和 SQL)从 task 失败中恢复的时间被显著缩短。在 Flink 1.9 之前,批处理作业中的 task 失败是通过取消所有 task 并重新启动整个作业来恢复的,即作业从头开始,所有进度都会废弃。在 1.9 版本中,Flink 将中间结果保留在网络 shuffle 的边缘,并使用这些数据恢复仅受故障影响的 tasks,即处在同一个 failover region (故障区)的 tasks。
“Region” 的故障策略也能同时提升 “embarrassingly parallel” 类型的流作业恢复速度,也就是没有任何像 keyBy、rebalance 等 shuffle 的作业。当这种作业在恢复时,只有受影响的故障区 task 需要重启。对于其他类型的流作业,故障恢复行为与之前的版本一样。
2.2 Stop-with-Savepoint,https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212
2.2.1 Rest API中,增加SUSPEND/TERMINATE命令。CheckpointType中增加了一种SYNC_SAVEPOINT类型,在suspending/terminating 作业的时候,savepoint采用同步的方式来制作。
2.2.2 “Cancel-with-savepoint”是停止、重启或升级 Flink 作业的一个常用操作。然而,当前的实现并没有保证输出到 exactly-once sink 的外部存储的数据持久化。为了改进停止作业时的端到端语义,Flink 1.9 引入了一种新的 SUSPEND 模式,可以带 savepoint 停止作业,保证了输出数据的一致性。可以使用 Flink CLI 来 suspend 一个作业:bin/flink stop -p [:targetSavepointDirectory] :jobId。最终作业的状态会在成功时设置成 FINISHED 状态,方便用户区别操作是否失败。
备注:原来是cancel命令可以指定做不做save point,现在cancel/stop命令中都可以指定做不做save point,只是stop的时候会先挂起job。
2.3 State Processing API, 从1.9开始,Flink state可以对外暴露,在外部可以使用client来查询(beta)。https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html
2.4 在SQL-Client 的yaml配置文件中支持外部catalog。
2.5 source增加对 Parquet file的支持.
2.6 重构 Flink WebUI。重新设计的 UI 是 1.9.0 的默认版本,不过仍保留了切换到旧版 WebUI 的按钮。
二. Flink 1.10 新特性
- flink1.10新特性
发布时间:2020.2.11
1.1 内存管理和配置优化
原来的问题
原来TaskExecutor内存配置有缺点,难做资源利用优化,比如:
- 批和流的配置模型不同;
- 流计算场景,比如RocksDB这种不是基于堆内存的state backend,需要用户进行复杂的配置
优化点
1. 管理的内存拓展
- 批作业仍然可以继续使用heap和off-heap内存。
- 流作业如果使用RocksDBStateBackend,只能使用off-heap内存
- 为了批流集群配置统一,管理内存只能使用off-heap
2. 简化RocksDB配置
- 配置RocksDB原来需要手动调试,比如减小JVM堆大小、设置Flink使用的堆外内存。现在Flink的开箱配置即可完成,且只需要简单地调整managed内存的大小即可调整RocksDBStateBackend的内存预算。
1.2 job提交的逻辑统一
原来提交作业是由环境变量负责的,并且和部署方式有关(比如:Yarn,Kubernetes, Mesos)。
在flink1.10,作业提交逻辑抽象出Executor接口,新增加的ExecutorCLI为任意一个执行目标提供了统一的配置参数。另外,引入JobClient来获取JobExecutionResult,这样结果获取和作业的提交逻辑解耦。
1.3 原生Kubernetes集成(Beta)
1.4 Table API/SQL:hive集成已经可以在生产环境使用
Flink 1.9 推出了beta Hive 集成。
1. Batch SQL 原生分区支持
2. flink1.10 hive集成还引入许多数据读取方面的优化
- 投影下推
- limit下推
- 读取数据时orc向量化
3. SQL DDL中支持watermark 和计算列
4. 其他DDL的拓展
- UDF
CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
三. Flink 1.11 新特性
- flink1.11新特性,该部分内容转自 Flink 1.11.0 发布,有哪些值得关注的新特性?
发布时间:2020.7.7
改进一: 生态完善和易用性提升
1 Table & SQL 支持 Change Data Capture(CDC)
CDC 被广泛使用在复制数据、更新缓存、微服务间同步数据、审计日志等场景,很多公司都在使用开源的 CDC 工具,如 MySQL CDC。通过 Flink 支持在 Table & SQL 中接入和解析 CDC 是一个强需求,在过往的很多讨论中都被提及过,可以帮助用户以实时的方式处理 changelog 流,进一步扩展 Flink 的应用场景,例如把 MySQL 中的数据同步到 PG 或 ElasticSearch 中,低延时的 temporal join 一个 changelog 等。
除了考虑到上面的真实需求,Flink 中定义的“Dynamic Table”概念在流上有两种模型:append 模式和 update 模式。通过 append 模式把流转化为“Dynamic Table”在之前的版本中已经支持,因此在 1.11.0 中进一步支持 update 模式也从概念层面完整的实现了“Dynamic Table”。
为了支持解析和输出 changelog,如何在外部系统和 Flink 系统之间编解码这些更新操作是首要解决的问题。考虑到 source 和 sink 是衔接外部系统的一个桥梁,因此 FLIP-95 在定义全新的 Table source 和 Table sink 接口时解决了这个问题。
在公开的 CDC 调研报告中,Debezium 和 Canal 是用户中最流行使用的 CDC 工具,这两种工具用来同步 changelog 到其它的系统中,如消息队列。据此,FLIP-105 首先支持了 Debezium 和 Canal 这两种格式,而且 Kafka source 也已经可以支持解析上述格式并输出更新事件,在后续的版本中会进一步支持 Avro(Debezium) 和 Protobuf(Canal)。
CREATE TABLE my_table (
...) WITH (
'connector'='...', -- e.g. 'kafka'
'format'='debezium-json',
'debezium-json.schema-include'='true' -- default: false (Debezium can be configured to include or exclude the message schema)
'debezium-json.ignore-parse-errors'='true' -- default: false
);
2 Table & SQL 支持 JDBC Catalog
1.11.0 之前,用户如果依赖 Flink 的 source/sink 读写关系型数据库或读取 changelog 时,必须要手动创建对应的 schema。而且当数据库中的 schema 发生变化时,也需要手动更新对应的 Flink 作业以保持一致和类型匹配,任何不匹配都会造成运行时报错使作业失败。用户经常抱怨这个看似冗余且繁琐的流程,体验极差。
实际上对于任何和 Flink 连接的外部系统都可能有类似的上述问题,在 1.11.0 中重点解决了和关系型数据库对接的这个问题。FLIP-93 提供了 JDBC catalog 的基础接口以及 Postgres catalog 的实现,这样方便后续实现与其它类型的关系型数据库的对接。
1.11.0 版本后,用户使用 Flink SQL 时可以自动获取表的 schema 而不再需要输入 DDL。除此之外,任何 schema 不匹配的错误都会在编译阶段提前进行检查报错,避免了之前运行时报错造成的作业失败。这是提升易用性和用户体验的一个典型例子。
3 Hive 实时数仓
从 1.9.0 版本开始 Flink 从生态角度致力于集成 Hive,目标打造批流一体的 Hive 数仓。经过前两个版本的迭代,已经达到了 batch 兼容且生产可用,在 TPC-DS 10T benchmark 下性能达到 Hive 3.0 的 7 倍以上。
1.11.0 在 Hive 生态中重点实现了实时数仓方案,改善了端到端流式 ETL 的用户体验,达到了批流一体 Hive 数仓的目标。同时在兼容性、性能、易用性方面也进一步进行了加强。
在实时数仓的解决方案中,凭借 Flink 的流式处理优势做到实时读写 Hive:
- Hive 写入:FLIP-115 完善扩展了 FileSystem connector 的基础能力和实现,Table/SQL 层的 sink 可以支持各种格式(CSV、Json、Avro、Parquet、ORC),而且支持 Hive table 的所有格式。
- Partition 支持:数据导入 Hive 引入 partition 提交机制来控制可见性,通过sink.partition-commit.trigger 控制 partition 提交的时机,通过 sink.partition-commit.policy.kind 选择提交策略,支持 SUCCESS 文件和 metastore 提交。
- Hive 读取:实时化的流式读取 Hive,通过监控 partition 生成增量读取新 partition,或者监控文件夹内新文件生成来增量读取新文件。
在 Hive 可用性方面的提升:
- FLIP-123 通过 Hive Dialect 为用户提供语法兼容,这样用户无需在 Flink 和 Hive 的 CLI 之间切换,可以直接迁移 Hive 脚本到 Flink 中执行。
- 提供 Hive 相关依赖的内置支持,避免用户自己下载所需的相关依赖。现在只需要单独下载一个包,配置 HADOOP_CLASSPATH 就可以运行。
- 在 Hive 性能方面,1.10.0 中已经支持了 ORC(Hive 2+)的向量化读取,1.11.0 中我们补全了所有版本的 Parquet 和 ORC 向量化支持来提升性能。
3 全新 Source API
前面也提到过,source 和 sink 是 Flink 对接外部系统的一个桥梁,对于完善生态、可用性及端到端的用户体验是很重要的环节。社区早在一年前就已经规划了 source 端的彻底重构,从 FLIP-27 的 ID 就可以看出是很早的一个 feature。但是由于涉及到很多复杂的内部机制和考虑到各种 source connector 的实现,设计上需要考虑的很全面。从 1.10.0 就开始做 POC 的实现,最终赶上了 1.11.0 版本的发布。
先简要回顾下 source 之前的主要问题:
- 对用户而言,在 Flink 中改造已有的 source 或者重新实现一个生产级的 source connector 不是一件容易的事情,具体体现在没有公共的代码可以复用,而且需要理解很多 Flink 内部细节以及实现具体的 event time 分配、watermark 产出、idleness 监测、线程模型等。
- 批和流的场景需要实现不同的 source。
- partitions/splits/shards 概念在接口中没有显式表达,比如 split 的发现逻辑和数据消费都耦合在 source function 的实现中,这样在实现 Kafka 或 Kinesis 类型的 source 时增加了复杂性。
- 在 runtime 执行层,checkpoint 锁被 source function 抢占会带来一系列问题,框架很难进行优化。
FLIP-27 在设计时充分考虑了上述的痛点:
- 首先在 Job Manager 和 Task Manager 中分别引入两种不同的组件 Split Enumerator 和 Source reader,解耦 split 发现和对应的消费处理,同时方便随意组合不同的策略。比如现有的 Kafka connector 中有多种不同的 partition 发现策略和实现耦合在一起,在新的架构下,我们只需要实现一种 source reader,就可以适配多种 split enumerator 的实现来对应不同的 partition 发现策略。
- 在新架构下实现的 source connector 可以做到批流统一,唯一的小区别是对批场景的有限输入,split enumerator 会产出固定数量的 split 集合并且每个 split 都是有限数据集;对于流场景的无限输入,split enumerator 要么产出无限多的 split 或者 split 自身是无限数据集。
- 复杂的 timestamp assigner 以及 watermark generator 透明的内置在 source reader 模块内运行,对用户来说是无感知的。这样用户如果想实现新的 source connector,一般不再需要重复实现这部分功能。
目前 Flink 已有的 source connector 会在后续的版本中基于新架构来重新实现,legacy source 也会继续维护几个版本保持兼容性,用户也可以按照 release 文档中的说明来尝试体验新 source 的开发。
4 PyFlink 生态
众所周知,Python 语言在机器学习和数据分析领域有着广泛的使用。Flink 从 1.9.0 版本开始发力兼容 Python 生态,Python 和 Flink 合力为 PyFlink,把 Flink 的实时分布式处理能力输出给 Python 用户。前两个版本 PyFlink 已经支持了 Python Table API 和 UDF,在 1.11.0 中扩大对 Python 生态库 Pandas 的支持以及和 SQL DDL/Client 的集成,同时 Python UDF 性能有了极大的提升。
具体来说,之前普通的 Python UDF 每次调用只能处理一条数据,而且在 Java 端和 Python 端都需要序列化/反序列化,开销很大。1.11.0 中 Flink 支持在 Table & SQL 作业中自定义和使用向量化 Python UDF,用户只需要在 UDF 修饰中额外增加一个参数 udf_type=“pandas” 即可。这样带来的好处是:
- 每次调用可以处理 N 条数据。
- 数据格式基于 Apache Arrow,大大降低了 Java、Python 进程之间的序列化/反序列化开销。
- 方便 Python 用户基于 Numpy 和 Pandas 等数据分析领域常用的 Python 库,开发高性能的 Python UDF。
除此之外,1.11.0 中 PyFlink 还支持:
- PyFlink table 和 Pandas DataFrame 之间无缝切换(FLIP-120),增强 Pandas 生态的易用性和兼容性。
- Table & SQL 中可以定义和使用 Python UDTF(FLINK-14500),不再必需 Java/Scala UDTF。
- Cython 优化 Python UDF 的性能(FLIP-121),对比 1.10.0 可以提升 30 倍。
- Python UDF 中用户自定义 metric(FLIP-112),方便监控和调试 UDF 的执行。
上述解读的都是侧重 API 层面,用户开发作业可以直接感知到的易用性的提升。下面我们看看执行引擎层在 1.11.0 中都有哪些值得关注的变化。
改进二. 生产可用性和稳定性提升
1 支持 application 模式和 Kubernetes 增强
1.11.0 版本前,Flink 主要支持如下两种模式运行:
- Session 模式:提前启动一个集群,所有作业都共享这个集群的资源运行。优势是避免每个作业单独启动集群带来的额外开销,缺点是隔离性稍差。如果一个作业把某个 Task Manager(TM)容器搞挂,会导致这个容器内的所有作业都跟着重启。虽然每个作业有自己独立的 Job Manager(JM)来管理,但是这些 JM 都运行在一个进程中,容易带来负载上的瓶颈。
- Per-job 模式:为了解决 session 模式隔离性差的问题,每个作业根据资源需求启动独立的集群,每个作业的 JM 也是运行在独立的进程中,负载相对小很多。
以上两种模式的共同问题是需要在客户端执行用户代码,编译生成对应的 Job Graph 提交到集群运行。在这个过程需要下载相关 jar 包并上传到集群,客户端和网络负载压力容易成为瓶颈,尤其当一个客户端被多个用户共享使用。
1.11.0 中引入了 application 模式(FLIP-85)来解决上述问题,按照 application 粒度来启动一个集群,属于这个 application 的所有 job 在这个集群中运行。核心是 Job Graph 的生成以及作业的提交不在客户端执行,而是转移到 JM 端执行,这样网络下载上传的负载也会分散到集群中,不再有上述 client 单点上的瓶颈。
用户可以通过 bin/flink run-application 来使用 application 模式,目前 Yarn 和 Kubernetes(K8s)都已经支持这种模式。Yarn application 会在客户端将运行作业需要的依赖都通过 Yarn Local Resource 传递到 JM。K8s application 允许用户构建包含用户 jar 与依赖的镜像,同时会根据作业自动创建 TM,并在结束后销毁整个集群,相比 session 模式具有更好的隔离性。K8s 不再有严格意义上的 per-job 模式,application 模式相当于 per-job 在集群进行提交作业的实现。
除了支持 application 模式,Flink 原生 K8s 在 1.11.0 中还完善了很多基础的功能特性(FLINK-14460),以达到生产可用性的标准。例如 Node Selector、Label、Annotation、Toleration 等。为了更方便的与 Hadoop 集成,也支持根据环境变量自动挂载 Hadoop 配置的功能。
2 Checkpoint & Savepoint 优化
checkpoint 和 savepoint 机制一直是 Flink 保持先进性的核心竞争力之一,社区在这个领域的改动很谨慎,最近的几个大版本中几乎没有大的功能和架构上的调整。在用户邮件列表中,我们经常能看到用户反馈和抱怨的相关问题:比如 checkpoint 长时间做不出来失败,savepoint 在作业重启后不可用等等。1.11.0 有选择的解决了一些这方面的常见问题,提高生产可用性和稳定性。
1.11.0 之前, savepoint 中 meta 数据和 state 数据分别保存在两个不同的目录中,这样如果想迁移 state 目录很难识别这种映射关系,也可能导致目录被误删除,对于目录清理也同样有麻烦。1.11.0 把两部分数据整合到一个目录下,这样方便整体转移和复用。另外,之前 meta 引用 state 采用的是绝对路径,这样 state 目录迁移后路径发生变化也不可用,1.11.0 把 state 引用改成了相对路径解决了这个问题(FLINK-5763),这样 savepoint 的管理维护、复用更加灵活方便。
实际生产环境中,用户经常遭遇 checkpoint 超时失败、长时间不能完成带来的困扰。一旦作业 failover 会造成回放大量的历史数据,作业长时间没有进度,端到端的延迟增加。1.11.0 从不同维度对 checkpoint 的优化和提速做了改进,目标实现分钟甚至秒级的轻量型 checkpoint。
首先,增加了 Checkpoint Coordinator 通知 task 取消 checkpoint 的机制(FLINK-8871),这样避免 task 端还在执行已经取消的 checkpoint 而对系统带来不必要的压力。同时 task 端放弃已经取消的 checkpoint,可以更快的参与执行 coordinator 新触发的 checkpoint,某种程度上也可以避免新 checkpoint 再次执行超时而失败。这个优化也对后面默认开启 local recovery 提供了便利,task 端可以及时清理失效 checkpoint 的资源。
其次,在反压场景下,整个数据链路堆积了大量 buffer,导致 checkpoint barrier 排在数据 buffer 后面,不能被 task 及时处理对齐,也就导致了 checkpoint 长时间不能执行。1.11.0 中从两个维度对这个问题进行解决:
1)尝试减少数据链路中的 buffer 总量(FLINK-16428),这样 checkpoint barrier 可以尽快被处理对齐。
- 上游输出端控制单个 sub partition 堆积 buffer 的最大阈值(backlog),避免负载不均场景下单个链路上堆积大量 buffer。
- 在不影响网络吞吐性能的情况下合理修改上下游默认的 buffer 配置。
- 上下游数据传输的基础协议进行了调整,允许单个数据链路可以配置 0 个独占 buffer 而不死锁,这样总的 buffer 数量和作业并发规模解耦。根据实际需求在吞吐性能和 checkpoint 速度两者之间权衡,自定义 buffer 配比。
这个优化有一部分工作已经在 1.11.0 中完成,剩余部分会在下个版本继续推进完成。
2)实现了全新的 unaligned checkpoint 机制(FLIP-76)从根本上解决了反压场景下 checkpoint barrier 对齐的问题。实际上这个想法早在 1.10.0 版本之前就开始酝酿设计,由于涉及到很多模块的大改动,实现机制和线程模型也很复杂。我们实现了两种不同方案的原型 POC 进行了测试、性能对比,确定了最终的方案,因此直到 1.11.0 才完成了 MVP 版本,这也是 1.11.0 中执行引擎层唯一的一个重量级 feature。其基本思想可以概括为:
- Checkpoint barrier 跨数据 buffer 传输,不在输入输出队列排队等待处理,这样就和算子的计算能力解耦,barrier 在节点之间的传输只有网络延时,可以忽略不计。
- 每个算子多个输入链路之间不需要等待 barrier 对齐来执行 checkpoint,第一个到的 barrier 就可以提前触发 checkpoint,这样可以进一步提速 checkpoint,不会因为个别链路的延迟而影响整体。
- 为了和之前 aligned checkpoint 的语义保持一致,所有未被处理的输入输出数据 buffer 都将作为 channel state 在 checkpoint 执行时进行快照持久化,在 failover 时连同 operator state 一同进行恢复。换句话说,aligned 机制保证的是 barrier 前面所有数据必须被处理完,状态实时体现到 operator state 中;而 unaligned 机制把 barrier 前面的未处理数据所反映的 operator state 延后到 failover restart 时通过 channel state 回放进行体现,从状态恢复的角度来说最终都是一致的。注意这里虽然引入了额外的 in-flight buffer 的持久化,但是这个过程实际是在 checkpoint 的异步阶段完成的,同步阶段只是进行了轻量级的 buffer 引用,所以不会过多占用算子的计算时间而影响吞吐性能。
Unaligned checkpoint 在反压严重的场景下可以明显加速 checkpoint 的完成时间,因为它不再依赖于整体的计算吞吐能力,而和系统的存储性能更加相关,相当于计算和存储的解耦。但是它的使用也有一定的局限性,它会增加整体 state 的大小,对存储 IO 带来额外的开销,因此在 IO 已经是瓶颈的场景下就不太适合使用 unaligned checkpoint 机制。
1.11.0 中 unaligned checkpoint 还没有作为默认模式,需要用户手动配置来开启,并且只在 exactly-once 模式下生效。但目前还不支持 savepoint 模式,因为 savepoint 涉及到作业的 rescale 场景,channel state 目前还不支持 state 拆分,在后面的版本会进一步支持,所以 savepoint 目前还是会使用之前的 aligned 模式,在反压场景下有可能需要很长时间才能完成。
四. Flink 1.12 新特性
- flink1.12新特性,该部分内容转自 官宣 | Apache Flink 1.12.0 正式发布,流批一体真正统一运行!
发布时间:2020.12.10
DataStream API 支持批执行模式
Flink 的核心 API 最初是针对特定的场景设计的,尽管 Table API / SQL 针对流处理和批处理已经实现了统一的 API,但当用户使用较底层的 API 时,仍然需要在批处理(DataSet API)和流处理(DataStream API)这两种不同的 API 之间进行选择。鉴于批处理是流处理的一种特例,将这两种 API 合并成统一的 API,有一些非常明显的好处,比如:
· 可复用性:作业可以在流和批这两种执行模式之间自由地切换,而无需重写任何代码。因此,用户可以复用同一个作业,来处理实时数据和历史数据。
· 维护简单:统一的 API 意味着流和批可以共用同一组 connector,维护同一套代码,并能够轻松地实现流批混合执行,例如 backfilling 之类的场景。
考虑到这些优点,社区已朝着流批统一的 DataStream API 迈出了第一步:支持高效的批处理(FLIP-134)。从长远来看,这意味着 DataSet API 将被弃用(FLIP-131),其功能将被包含在 DataStream API 和 Table API / SQL 中。
■ 有限流上的批处理
您已经可以使用 DataStream API 来处理有限流(例如文件)了,但需要注意的是,运行时并不“知道”作业的输入是有限的。为了优化在有限流情况下运行时的执行性能,新的 BATCH 执行模式,对于聚合操作,全部在内存中进行,且使用 sort-based shuffle(FLIP-140)和优化过的调度策略(请参见 Pipelined Region Scheduling 了解更多详细信息)。因此,DataStream API 中的 BATCH 执行模式已经非常接近 Flink 1.12 中 DataSet API 的性能。有关性能的更多详细信息,请查看 FLIP-140。
在 Flink 1.12 中,默认执行模式为 STREAMING,要将作业配置为以 BATCH 模式运行,可以在提交作业的时候,设置参数 execution.runtime-mode:
$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar
或者通过编程的方式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeMode.BATCH);
注意:尽管 DataSet API 尚未被弃用,但我们建议用户优先使用具有 BATCH 执行模式的 DataStream API 来开发新的批作业,并考虑迁移现有的 DataSet 作业。
新的 Data Sink API (Beta)
之前发布的 Flink 版本中[1],已经支持了 source connector 工作在流批两种模式下,因此在 Flink 1.12 中,社区着重实现了统一的 Data Sink API(FLIP-143)。新的抽象引入了 write/commit 协议和一个更加模块化的接口。Sink 的实现者只需要定义 what 和 how:SinkWriter,用于写数据,并输出需要 commit 的内容(例如,committables);Committer 和 GlobalCommitter,封装了如何处理 committables。框架会负责 when 和 where:即在什么时间,以及在哪些机器或进程中 commit。
这种模块化的抽象允许为 BATCH 和 STREAMING 两种执行模式,实现不同的运行时策略,以达到仅使用一种 sink 实现,也可以使两种模式都可以高效执行。Flink 1.12 中,提供了统一的 FileSink connector,以替换现有的 StreamingFileSink connector (FLINK-19758)。其它的 connector 也将逐步迁移到新的接口。
基于 Kubernetes 的高可用 (HA) 方案
Flink 可以利用 Kubernetes 提供的内置功能来实现 JobManager 的 failover,而不用依赖 ZooKeeper。为了实现不依赖于 ZooKeeper 的高可用方案,社区在 Flink 1.12(FLIP-144)中实现了基于 Kubernetes 的高可用方案。该方案与 ZooKeeper 方案基于相同的接口[3],并使用 Kubernetes 的 ConfigMap[4] 对象来处理从 JobManager 的故障中恢复所需的所有元数据。关于如何配置高可用的 standalone 或原生 Kubernetes 集群的更多详细信息和示例,请查阅文档[5]。
注意:需要注意的是,这并不意味着 ZooKeeper 将被删除,这只是为 Kubernetes 上的 Flink 用户提供了另外一种选择。
其它功能改进
■ 将现有的 connector 迁移到新的 Data Source API
在之前的版本中,Flink 引入了新的 Data Source API(FLIP-27),以允许实现同时适用于有限数据(批)作业和无限数据(流)作业使用的 connector 。在 Flink 1.12 中,社区从 FileSystem connector(FLINK-19161)出发,开始将现有的 source connector 移植到新的接口。
注意: 新的 source 实现,是完全不同的实现,与旧版本的实现不兼容。
■ Pipelined Region 调度 (FLIP-119)
在之前的版本中,Flink 对于批作业和流作业有两套独立的调度策略。Flink 1.12 版本中,引入了统一的调度策略, 该策略通过识别 blocking 数据传输边,将 ExecutionGraph 分解为多个 pipelined region。这样一来,对于一个 pipelined region 来说,仅当有数据时才调度它,并且仅在所有其所需的资源都被满足时才部署它;同时也可以支持独立地重启失败的 region。对于批作业来说,新策略可显著地提高资源利用率,并消除死锁。
■ 支持 Sort-Merge Shuffle (FLIP-148)
为了提高大规模批作业的稳定性、性能和资源利用率,社区引入了 sort-merge shuffle,以替代 Flink 现有的实现。这种方案可以显著减少 shuffle 的时间,并使用较少的文件句柄和文件写缓存(这对于大规模批作业的执行非常重要)。在后续版本中(FLINK-19614),Flink 会进一步优化相关性能。
注意:该功能是实验性的,在 Flink 1.12 中默认情况下不启用。要启用 sort-merge shuffle,需要在 TaskManager 的网络配置[6]中设置合理的最小并行度。
■ Flink WebUI 的改进 (FLIP-75)
作为对上一个版本中,Flink WebUI 一系列改进的延续,Flink 1.12 在 WebUI 上暴露了 JobManager 内存相关的指标和配置参数(FLIP-104)。对于 TaskManager 的指标页面也进行了更新,为 Managed Memory、Network Memory 和 Metaspace 添加了新的指标,以反映自 Flink 1.10(FLIP-102)开始引入的 TaskManager 内存模型的更改[7]。
Table API/SQL: SQL Connectors 中的 Metadata 处理
如果可以将某些 source(和 format)的元数据作为额外字段暴露给用户,对于需要将元数据与记录数据一起处理的用户来说很有意义。一个常见的例子是 Kafka,用户可能需要访问 offset、partition 或 topic 信息、读写 kafka 消息中的 key 或 使用消息 metadata中的时间戳进行时间相关的操作。
在 Flink 1.12 中,Flink SQL 支持了元数据列用来读取和写入每行数据中 connector 或 format 相关的列(FLIP-107)。这些列在 CREATE TABLE 语句中使用 METADATA(保留)关键字来声明。
CREATE TABLE kafka_table (
id BIGINT,
name STRING,
event_time TIMESTAMP(3) METADATA FROM 'timestamp', -- access Kafka 'timestamp' metadata
headers MAP METADATA -- access Kafka 'headers' metadata
) WITH (
'connector' = 'kafka',
'topic' = 'test-topic',
'format' = 'avro'
);
在 Flink 1.12 中,已经支持 Kafka 和 Kinesis connector 的元数据,并且 FileSystem connector 上的相关工作也已经在计划中(FLINK-19903)。由于 Kafka record 的结构比较复杂,社区还专门为 Kafka connector 实现了新的属性[8],以控制如何处理键/值对。关于 Flink SQL 中元数据支持的完整描述,请查看每个 connector 的文档[9]以及 FLIP-107 中描述的用例。
Table API/SQL: Upsert Kafka Connector
在某些场景中,例如读取 compacted topic 或者输出(更新)聚合结果的时候,需要将 Kafka 消息记录的 key 当成主键处理,用来确定一条数据是应该作为插入、删除还是更新记录来处理。为了实现该功能,社区为 Kafka 专门新增了一个 upsert connector(upsert-kafka),该 connector 扩展自现有的 Kafka connector,工作在 upsert 模式(FLIP-149)下。新的 upsert-kafka connector 既可以作为 source 使用,也可以作为 sink 使用,并且提供了与现有的 kafka connector 相同的基本功能和持久性保证,因为两者之间复用了大部分代码。
要使用 upsert-kafka connector,必须在创建表时定义主键,并为键(key.format)和值(value.format)指定序列化反序列化格式。完整的示例,请查看最新的文档[10]。
Table API/SQL: SQL 中 支持 Temporal Table Join
在之前的版本中,用户需要通过创建时态表函数(temporal table function) 来支持时态表 join(temporal table join) ,而在 Flink 1.12 中,用户可以使用标准的 SQL 语句 FOR SYSTEM_TIME AS OF(SQL:2011)来支持 join。此外,现在任意包含时间列和主键的表,都可以作为时态表,而不仅仅是 append-only 表。这带来了一些新的应用场景,比如将 Kafka compacted topic 或数据库变更日志(来自 Debezium 等)作为时态表。
CREATE TABLE orders (
order_id STRING,
currency STRING,
amount INT,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '30' SECOND
) WITH (
…
);
-- Table backed by a Kafka compacted topic
CREATE TABLE latest_rates (
currency STRING,
rate DECIMAL(38, 10),
currency_time TIMESTAMP(3),
WATERMARK FOR currency_time AS currency_time - INTERVAL ‘5’ SECOND,
PRIMARY KEY (currency) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
…
);
-- Event-time temporal table join
SELECT
o.order_id,
o.order_time,
o.amount * r.rate AS amount,
r.currency
FROM orders AS o, latest_rates FOR SYSTEM_TIME AS OF o.order_time r
ON o.currency = r.currency;
上面的示例同时也展示了如何在 temporal table join 中使用 Flink 1.12 中新增的 upsert-kafka connector。
■ 使用 Hive 表进行 Temporal Table Join
用户也可以将 Hive 表作为时态表来使用,Flink 既支持自动读取 Hive 表的最新分区作为时态表(FLINK-19644),也支持在作业执行时追踪整个 Hive 表的最新版本作为时态表。请参阅文档,了解更多关于如何在 temporal table join 中使用 Hive 表的示例。
Table API/SQL 中的其它改进
**■ Kinesis Flink SQL Connector (FLINK-18858)
**
从 Flink 1.12 开始,Table API / SQL 原生支持将 Amazon Kinesis Data Streams(KDS)作为 source 和 sink 使用。新的 Kinesis SQL connector 提供了对于增强的Fan-Out(EFO)以及 Sink Partition 的支持。如需了解 Kinesis SQL connector 所有支持的功能、配置选项以及对外暴露的元数据信息,请查看最新的文档。
**■ 在 FileSystem/Hive connector 的流式写入中支持小文件合并 (FLINK-19345)
**
很多 bulk format,例如 Parquet,只有当写入的文件比较大时,才比较高效。当 checkpoint 的间隔比较小时,这会成为一个很大的问题,因为会创建大量的小文件。在 Flink 1.12 中,File Sink 增加了小文件合并功能,从而使得即使作业 checkpoint 间隔比较小时,也不会产生大量的文件。要开启小文件合并,可以按照文档[11]中的说明在 FileSystem connector 中设置 auto-compaction = true 属性。
■ Kafka Connector 支持 Watermark 下推 (FLINK-20041)
为了确保使用 Kafka 的作业的结果的正确性,通常来说,最好基于分区来生成 watermark,因为分区内数据的乱序程度通常来说比分区之间数据的乱序程度要低很多。Flink 现在允许将 watermark 策略下推到 Kafka connector 里面,从而支持在 Kafka connector 内部构造基于分区的 watermark[12]。一个 Kafka source 节点最终所产生的 watermark 由该节点所读取的所有分区中的 watermark 的最小值决定,从而使整个系统可以获得更好的(即更接近真实情况)的 watermark。该功能也允许用户配置基于分区的空闲检测策略,以防止空闲分区阻碍整个作业的 event time 增长。
■ 新增的 Formats
■ 利用 Multi-input 算子进行 Join 优化 (FLINK-19621)
Shuffling 是一个 Flink 作业中最耗时的操作之一。为了消除不必要的序列化反序列化开销、数据 spilling 开销,提升 Table API / SQL 上批作业和流作业的性能, planner 当前会利用上一个版本中已经引入的N元算子(FLIP-92),将由 forward 边所连接的多个算子合并到一个 Task 里执行。
■ Type Inference for Table API UDAFs (FLIP-65)
Flink 1.12 完成了从 Flink 1.9 开始的,针对 Table API 上的新的类型系统[2]的工作,并在聚合函数(UDAF)上支持了新的类型系统。从 Flink 1.12 开始,与标量函数和表函数类似,聚合函数也支持了所有的数据类型。
PyFlink: Python DataStream API
为了扩展 PyFlink 的可用性,Flink 1.12 提供了对于 Python DataStream API(FLIP-130)的初步支持,该版本支持了无状态类型的操作(例如 Map,FlatMap,Filter,KeyBy 等)。如果需要尝试 Python DataStream API,可以安装PyFlink,然后按照该文档[14]进行操作,文档中描述了如何使用 Python DataStream API 构建一个简单的流应用程序。
from pyflink.common.typeinfo import Types
from pyflink.datastream import MapFunction, StreamExecutionEnvironment
class MyMapFunction(MapFunction):
def map(self, value):
return value + 1
env = StreamExecutionEnvironment.get_execution_environment()
data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT())
mapped_stream.print()
env.execute("datastream job")
PyFlink 中的其它改进
**■ PyFlink Jobs on Kubernetes (FLINK-17480)
**
除了 standalone 部署和 YARN 部署之外,现在也原生支持将 PyFlink 作业部署在 Kubernetes 上。最新的文档中详细描述了如何在 Kubernetes 上启动 session 或 application 集群。
**■ 用户自定义聚合函数 (UDAFs)
**
从 Flink 1.12 开始,您可以在 PyFlink 作业中定义和使用 Python UDAF 了(FLIP-139)。普通的 UDF(标量函数)每次只能处理一行数据,而 UDAF(聚合函数)则可以处理多行数据,用于计算多行数据的聚合值。您也可以使用 Pandas UDAF[15](FLIP-137),来进行向量化计算(通常来说,比普通 Python UDAF 快10倍以上)。
注意: 普通 Python UDAF,当前仅支持在 group aggregations 以及流模式下使用。如果需要在批模式或者窗口聚合中使用,建议使用 Pandas UDAF。
五. Flink 1.13 新特性
- flink1.13新特性,该部分内容转自 官宣|Apache Flink 1.13.0 正式发布,流处理应用更加简单高效!
发布时间:2021.5.3
1. 被动扩缩容
Flink 项目的一个初始目标,就是希望流处理应用可以像普通应用一样简单和自然,被动扩缩容是 Flink 针对这一目标上的最新进展。
当考虑资源管理和部分的时候,Flink 有两种可能的模式。用户可以将 Flink 应用部署到 k8s、yarn 等资源管理系统之上,并且由 Flink 主动的来管理资源并按需分配和释放资源。这一模式对于经常改变资源需求的作业和应用非常有用,比如批作业和实时 SQL 查询。在这种模式下,Flink 所启动的 Worker 数量是由应用设置的并发度决定的。在 Flink 中我们将这一模式叫做主动扩缩容。
对于长时间运行的流处理应用,一种更适合的模型是用户只需要将作业像其它的长期运行的服务一样启动起来,而不需要考虑是部署在 k8s、yarn 还是其它的资源管理平台上,并且不需要考虑需要申请的资源的数量。相反,它的规模是由所分配的 worker 数量来决定的。当 worker 数量发生变化时,Flink 自动的改动应用的并发度。在 Flink 中我们将这一模式叫做被动扩缩容。
Flink 的 Application 部署模式开启了使 Flink 作业更接近普通应用(即启动 Flink 作业不需要执行两个独立的步骤来启动集群和提交应用)的努力,而被动扩缩容完成了这一目标:用户不再需要使用额外的工具(如脚本、K8s 算子)来让 worker 的数量与应用并发度设置保持一致。
用户现在可以将自动扩缩容的工具应用到 Flink 应用之上,就像普通的应用程序一样,只要用户了解扩缩容的代价:有状态的流应用在扩缩容的时候需要将状态重新分发。
如果想要尝试被动扩缩容,用户可以增加 scheduler-mode: reactive 这一配置项,然后启动一个应用集群(Standalone 或者 K8s)。更多细节见被动扩缩容的文档。
2. 分析应用的性能
对所有应用程序来说,能够简单的分析和理解应用的性能是非常关键的功能。这一功能对 Flink 更加重要,因为 Flink 应用一般是数据密集的(即需要处理大量的数据)并且需要在(近)实时的延迟内给出结果。
当 Flink 应用处理的速度跟不上数据输入的速度时,或者当一个应用占用的资源超过预期,下文介绍的这些工具可以帮你分析原因。
2.1 瓶颈检测与反压监控
Flink 性能分析首先要解决的问题经常是:哪个算子是瓶颈?
为了回答这一问题,Flink 引入了描述作业繁忙(即在处理数据)与反压(由于下游算子不能及时处理结果而无法继续输出)程度的指标。应用中可能的瓶颈是那些繁忙并且上游被反压的算子。
Flink 1.13 优化了反压检测的逻辑(使用基于任务 Mailbox 计时,而不在再于堆栈采样),并且重新实现了作业图的 UI 展示:Flink 现在在 UI 上通过颜色和数值来展示繁忙和反压的程度。
2.2 Web UI 中的 CPU 火焰图
Flink 关于性能另一个经常需要回答的问题:瓶颈算子中的哪部分计算逻辑消耗巨大?
针对这一问题,一个有效的可视化工具是火焰图。它可以帮助回答以下问题:
- 哪个方法调现在在占用 CPU?
- 不同方法占用 CPU 的比例如何?
- 一个方法被调用的栈是什么样子的?
火焰图是通过重复采样线程的堆栈来构建的。在火焰图中,每个方法调用被表示为一个矩形,矩形的长度与这个方法出现在采样中的次数成正比。火焰图在 UI 上的一个例子如下图所示。
火焰图的文档 包括启用这一功能的更多细节和指令。
2.3 State 访问延迟指标
另一个可能的性能瓶颈是 state backend,尤其是当作业的 state 超过内存容量而必须使用 RocksDB state backend 时。
这里并不是想说 RocksDB 性能不够好(我们非常喜欢 RocksDB!),但是它需要满足一些条件才能达到最好的性能。例如,用户可能很容易遇到非故意的在云上由于使用了错误的磁盘资源类型而不能满足 RockDB 的 IO 性能需求的问题。
基于 CPU 火焰图,新的 State Backend 的延迟指标可以帮助用户更好的判断性能不符合预期是否是由 State Backend 导致的。例如,如果用户发现 RocksDB 的单次访问需要几毫秒的时间,那么就需要查看内存和 I/O 的配置。这些指标可以通过设置 state.backend.rocksdb.latency-track-enabled 这一选项来启用。这些指标是通过采样的方式来监控性能的,所以它们对 RocksDB State Backend 的性能影响是微不足道的。
3. 通过 Savepoint 来切换 State Backend
用户现在可以在从一个 Savepoint 重启时切换一个 Flink 应用的 State Backend。这使得 Flink 应用不再被限制只能使用应用首次运行时选择的 State Backend。
基于这一功能,用户现在可以首先使用一个 HashMap State Backend(纯内存的 State Backend),如果后续状态变得过大的话,就切换到 RocksDB State Backend 中。
在实现层,Flink 现在统一了所有 State Backend 的 Savepoint 格式来实现这一功能。
4. K8s 部署时使用用户指定的 Pod 模式
原生 kubernetes 部署(Flink 主动要求 K8s 来启动 Pod)中,现在可以使用自定义的 Pod 模板。
使用这些模板,用户可以使用一种更符合 K8s 的方式来设置 JM 和 TM 的 Pod,这种方式比 Flink K8s 集成内置的配置项更加灵活。
5. 生产可用的 Unaligned Checkpoint
Unaligned Checkpoint 目前已达到了生产可用的状态,我们鼓励用户在存在反压的情况下试用这一功能。
具体来说,Flink 1.13 中引入的这些功能使 Unaligned Checkpoint 更容易使用:
- 用户现在使用 Unaligned Checkpoint 时也可以扩缩容应用。如果用户需要因为性能原因不能使用 Savepoint而必须使用 Retained checkpoint 时,这一功能会非常方便。
- 对于没有反压的应用,启用 Unaligned Checkpoint 现在代价更小。Unaligned Checkpoint 现在可以通过超时来自动触发,即一个应用默认会使用 Aligned Checkpoint(不存储传输中的数据),而只在对齐超过一定时间范围时自动切换到 Unaligned Checkpoint(存储传输中的数据)。
关于如何启用 Unaligned Checkpoint 可以参考相关文档。
6. 机器学习迁移到单独的仓库
为了加速 Flink 机器学习的进展(流批统一的机器学习),现在 Flink 机器学习开启了新的 flink-ml 仓库。我们采用类似于 Stateful Function 项目的管理方式,通过使用一个单独的仓库从而简化代码合并的流程并且可以进行单独的版本发布,从而提高开发的效率。
用户可以关注 Flink 在机器学习方面的进展,比如与 Alink(Flink 常用机器学习算法套件)的互操作以及 Flink 与 Tensorflow 的集成。
二、SQL / Table API 进展
与之前的版本类似,SQL 和 Table API 仍然在所有开发中占用很大的比例。
1. 通过 Table-valued 函数来定义时间窗口
在流式 SQL 查询中,一个最经常使用的是定义时间窗口。Flink 1.13 中引入了一种新的定义窗口的方式:通过 Table-valued 函数。这一方式不仅有更强的表达能力(允许用户定义新的窗口类型),并且与 SQL 标准更加一致。
Flink 1.13 在新的语法中支持 TUMBLE 和 HOP 窗口,在后续版本中也会支持 SESSION 窗口。我们通过以下两个例子来展示这一方法的表达能力:
- 例 1:一个新引入的 CUMULATE 窗口函数,它可以支持按特定步长扩展的窗口,直到达到最大窗口大小:
SELECT window_time, window_start, window_end, SUM(price) AS total_price
FROM TABLE(CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, window_time;
- 例 2:用户在 table-valued 窗口函数中可以访问窗口的起始和终止时间,从而使用户可以实现新的功能。例如,除了常规的基于窗口的聚合和 Join 之外,用户现在也可以实现基于窗口的 Top-K 聚合:
SELECT window_time, ...
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY total_price DESC)
as rank
FROM t
) WHERE rank <= 100;
2. 提高 DataStream API 与 Table API / SQL 的互操作能力
这一版本极大的简化了 DataStream API 与 Table API 混合的程序。
Table API 是一种非常方便的应用开发接口,因为这经支持表达式的程序编写并提供了大量的内置函数。但是有时候用户也需要切换回 DataStream,例如当用户存在表达能力、灵活性或者 State 访问的需求时。
Flink 新引入的 StreamTableEnvironment.toDataStream()/.fromDataStream() 可以将一个 DataStream API 声明的 Source 或者 Sink 当作 Table 的 Source 或者 Sink 来使用。主要的优化包括:
- DataStream 与 Table API 类型系统的自动转换。
- Event Time 配置的无缝集成,Watermark 行为的高度一致性。
- Row 类型(即 Table API 中数据的表示)有了极大的增强,包括 toString() / hashCode() 和 equals() 方法的优化,按名称访问字段值的支持与稀疏表示的支持。
Table table = tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByMetadata("rowtime", "TIMESTAMP(3)")
.watermark("rowtime", "SOURCE_WATERMARK()")
.build());
DataStream<Row> dataStream = tableEnv.toDataStream(table)
.keyBy(r -> r.getField("user"))
.window(...);
3. SQL Client: 初始化脚本和语句集合 (Statement Sets)
SQL Client 是一种直接运行和部署 SQL 流或批作业的简便方式,用户不需要编写代码就可以从命令行调用 SQL,或者作为 CI / CD 流程的一部分。
这个版本极大的提高了 SQL Client 的功能。现在基于所有通过 Java 编程(即通过编程的方式调用 TableEnvironment 来发起查询)可以支持的语法,现在 SQL Client 和 SQL 脚本都可以支持。这意味着 SQL 用户不再需要添加胶水代码来部署他们的SQL作业。
3.1 配置简化和代码共享
Flink 后续将不再支持通过 Yaml 的方式来配置 SQL Client(注:目前还在支持,但是已经被标记为废弃)。作为替代,SQL Client 现在支持使用一个初始化脚本在主 SQL 脚本执行前来配置环境。
这些初始化脚本通常可以在不同团队/部署之间共享。它可以用来加载常用的 catalog,应用通用的配置或者定义标准的视图。
./sql-client.sh -i init1.sql init2.sql -f sqljob.sql
3.2 更多的配置项
通过增加配置项,优化 SET / RESET 命令,用户可以更方便的在 SQL Client 和 SQL 脚本内部来控制执行的流程。
3.3 通过语句集合来支持多查询
多查询允许用户在一个 Flink 作业中执行多个 SQL 查询(或者语句)。这对于长期运行的流式 SQL 查询非常有用。
语句集可以用来将一组查询合并为一组同时执行。
以下是一个可以通过 SQL Client 来执行的 SQL 脚本的例子。它初始化和配置了执行多查询的环境。这一脚本包括了所有的查询和所有的环境初始化和配置的工作,从而使它可以作为一个自包含的部署组件。
-- set up a catalog
CREATE CATALOG hive_catalog WITH ('type' = 'hive');
USE CATALOG hive_catalog;
-- or use temporary objects
CREATE TEMPORARY TABLE clicks (
user_id BIGINT,
page_id BIGINT,
viewtime TIMESTAMP
) WITH (
'connector' = 'kafka',
'topic' = 'clicks',
'properties.bootstrap.servers' = '...',
'format' = 'avro'
);
-- set the execution mode for jobs
SET execution.runtime-mode=streaming;
-- set the sync/async mode for INSERT INTOs
SET table.dml-sync=false;
-- set the job's parallelism
SET parallism.default=10;
-- set the job name
SET pipeline.name = my_flink_job;
-- restore state from the specific savepoint path
SET execution.savepoint.path=/tmp/flink-savepoints/savepoint-bb0dab;
BEGIN STATEMENT SET;
INSERT INTO pageview_pv_sink
SELECT page_id, count(1) FROM clicks GROUP BY page_id;
INSERT INTO pageview_uv_sink
SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id;
END;
4. Hive 查询语法兼容性
用户现在在 Flink 上也可以使用 Hive SQL 语法。除了 Hive DDL 方言之外,Flink现在也支持常用的 Hive DML 和 DQL 方言。
为了使用 Hive SQL 方言,需要设置 table.sql-dialect 为 hive 并且加载 HiveModule。后者非常重要,因为必须要加载 Hive 的内置函数后才能正确实现对 Hive 语法和语义的兼容性。例子如下:
CREATE CATALOG myhive WITH ('type' = 'hive'); -- setup HiveCatalog
USE CATALOG myhive;
LOAD MODULE hive; -- setup HiveModule
USE MODULES hive,core;
SET table.sql-dialect = hive; -- enable Hive dialect
SELECT key, value FROM src CLUSTER BY key; -- run some Hive queries
需要注意的是, Hive 方言中不再支持 Flink 语法的 DML 和 DQL 语句。如果要使用 Flink 语法,需要切换回 default 的方言配置。
5. 优化的 SQL 时间函数
在数据处理中时间处理是一个重要的任务。但是与此同时,处理不同的时区、日期和时间是一个日益复杂的任务。
在 Flink 1.13 中,我们投入了大量的精力来简化时间函数的使用。我们调整了时间相关函数的返回类型使其更加精确,例如 PROCTIME(),CURRENT_TIMESTAMP() 和 NOW()。
其次,用户现在还可以基于一个 TIMESTAMP_LTZ 类型的列来定义 Event Time 属性,从而可以优雅的在窗口处理中支持夏令时。
用户可以参考 Release Note 来查看该部分的完整变更。
三、PyFlink 核心优化
这个版本对 PyFlink 的改进主要是使基于 Python 的 DataStream API 与 Table API 与 Java/scala 版本的对应功能更加一致。
1. Python DataStream API 中的有状态算子
在 Flink 1.13 中,Python 程序员可以享受到 Flink 状态处理 API 的所有能力。在 Flink 1.12 版本重构过的 Python DataStream API 现在已经拥有完整的状态访问能力,从而使用户可以将数据的信息记录到 state 中并且在后续访问。
带状态的处理能力是许多依赖跨记录状态共享(例如 Window Operator)的复杂数据处理场景的基础。
以下例子展示了一个自定义的计算窗口的实现:
class CountWindowAverage(FlatMapFunction):
def __init__(self, window_size):
self.window_size = window_size
def open(self, runtime_context: RuntimeContext):
descriptor = ValueStateDescriptor("average", Types.TUPLE([Types.LONG(), Types.LONG()]))
self.sum = runtime_context.get_state(descriptor)
def flat_map(self, value):
current_sum = self.sum.value()
if current_sum is None:
current_sum = (0, 0)
# update the count
current_sum = (current_sum[0] + 1, current_sum[1] + value[1])
# if the count reaches window_size, emit the average and clear the state
if current_sum[0] >= self.window_size:
self.sum.clear()
yield value[0], current_sum[1] // current_sum[0]
else:
self.sum.update(current_sum)
ds = ... # type: DataStream
ds.key_by(lambda row: row[0]) \
.flat_map(CountWindowAverage(5))
2. PyFlink DataStream API 中的用户自定义窗口
Flink 1.13 中 PyFlink DataStream 接口增加了对用户自定义窗口的支持,现在用户可以使用标准窗口之外的窗口定义。
由于窗口是处理无限数据流的核心机制 (通过将流切分为多个有限的『桶』),这一功能极大的提高的 API 的表达能力。
3. PyFlink Table API 中基于行的操作
Python Table API 现在支持基于行的操作,例如用户对行数据的自定义函数。这一功能使得用户可以使用非内置的数据处理函数。
一个使用 map() 操作的 Python Table API 示例如下:
@udf(result_type=DataTypes.ROW(
[DataTypes.FIELD("c1", DataTypes.BIGINT()),
DataTypes.FIELD("c2", DataTypes.STRING())]))
def increment_column(r: Row) -> Row:
return Row(r[0] + 1, r[1])
table = ... # type: Table
mapped_result = table.map(increment_column)
除了 map(),这一 API 还支持 flat_map(),aggregate(),flat_aggregate() 和其它基于行的操作。这使 Python Table API 的功能与 Java Table API 的功能更加接近。
4. PyFlink DataStream API 支持 Batch 执行模式
对于有限流,PyFlink DataStream API 现在已经支持 Flink 1.12 DataStream API 中引入的 Batch 执行模式。
通过复用数据有限性来跳过 State backend 和 Checkpoint 的处理,Batch 执行模式可以简化运维,并且提高有限流处理的性能。
四、其它优化
1. 基于 Hugo 的 Flink 文档
Flink 文档从 JekyII 迁移到了 Hugo。如果您发现有问题,请务必通知我们,我们非常期待用户对新的界面的感受。
2. Web UI 支持历史异常
Flink Web UI 现在可以展示导致作业失败的 n 次历史异常,从而提升在一个异常导致多个后续异常的场景下的调试体验。用户可以在异常历史中找到根异常。
3. 优化失败 Checkpoint 的异常和失败原因的汇报
Flink 现在提供了失败或被取消的 Checkpoint 的统计,从而使用户可以更简单的判断 Checkpoint 失败的原因,而不需要去查看日志。
Flink 之前的版本只有在 Checkpoint 成功的时候才会汇报指标(例如持久化数据的大小、触发时间等)。
4. 提供『恰好一次』一致性的 JDBC Sink
从 1.13 开始,通过使用事务提交数据,JDBC Sink 可以对支持 XA 事务的数据库提供『恰好一次』的一致性支持。这一特性要求目标数据库必须有(或链接到)一个 XA 事务处理器。
这一 Sink 现在只能在 DataStream API 中使用。用户可以通过 JdbcSink.exactlyOnceSink(…) 来创建这一 Sink(或者通过显式初始化一个 JdbcXaSinkFunction)。
5. PyFlink Table API 在 Group 窗口上支持用户自定义的聚合函数
PyFlink Table API 现在对 Group 窗口同时支持基于 Python 的用户自定义聚合函数(User-defined Aggregate Functions, UDAFs)以及 Pandas UDAFs。这些函数对许多数据分析或机器学习训练的程序非常重要。
在 Flink 1.13 之前,这些函数仅能在无限的 Group-by 聚合场景下使用。Flink 1.13 优化了这一限制。
6. Batch 执行模式下 Sort-merge Shuffle 优化
Flink 1.13 优化了针对批处理程序的 Sort-merge Blocking Shuffle 的性能和内存占用情况。这一 Shuffle 模式是在Flink 1.12 的 FLIP-148 中引入的。
这一优化避免了大规模作业下不断出现 OutOfMemoryError: Direct Memory 的问题,并且通过 I/O 调度和 broadcast 优化提高了性能(尤其是在机械硬盘上)。
7. HBase 连接器支持异步维表查询和查询缓存
HBase Lookup Table Source 现在可以支持异步查询模式和查询缓存。这极大的提高了使用这一 Source 的 Table / SQL 维表 Join 的性能,并且在一些典型情况下可以减少对 HBase 的 I/O 请求数量。
在之前的版本中,HBase Lookup Source 仅支持同步通信,从而导致作业吞吐以及资源利用率降低。
8. 升级 Flink 1.13 需要注意的改动
- FLINK-21709 – 老的 Table & SQL API 计划器已经被标记为废弃,并且将在 Flink 1.14 中被删除。Blink 计划器在若干版本之前已经被设置为默认计划器,并且将成为未来版本中的唯一计划器。这意味着 BatchTableEnvironment 和 DataSet API 互操作后续也将不再支持。用户需要切换到统一的 TableEnvironment 来编写流或者批的作业。
- FLINK-22352 – Flink 社区决定废弃对 Apache mesos 的支持,未来有可能会进一步删除这部分功能。用户最好能够切换到其它的资源管理系统上。
- FLINK-21935 – state.backend.async 这一配置已经被禁用了,因为现在 Flink 总是会异步的来保存快照(即之前的配置默认值),并且现在没有实现可以支持同步的快照保存操作。
- FLINK-17012 – Task 的 RUNNING 状态被细分为两步:INITIALIZING 和 RUNNING。Task 的 INITIALIZING 阶段包括加载 state 和在启用 unaligned checkpoint 时恢复 In-flight 数据的过程。通过显式区分这两种状态,监控系统可以更好的区分任务是否已经在实际工作。
- FLINK-21698 – NUMERIC 和 TIMESTAMP 类型之间的直接转换存在问题,现在已经被禁用,例如 CAST(numeric AS TIMESTAMP(3))。用户应该使用 TO_TIMESTAMP(FROM_UNIXTIME(numeric)) 来代替。
- FLINK-22133 – 新的 Source 接口有一个小的不兼容的修改,即 SplitEnumerator.snapshotState() 方法现在多接受一个 checkpoint id 参数来表示正在进行的 snapshot 操作所属的 checkpoint 的 id。
- FLINK-19463 – 由于老的 Statebackend 接口承载了过多的语义并且容易引起困惑,这一接口被标记为废弃。这是一个纯 API 层的改动,而并不会影响应用运行时。对于如何升级现有作业,请参考 作业迁移指引 。