1.前言
2.状态原理
2.1. 状态、状态后端、Checkpoint 三者之间的区别及关系?
结论:拿五个字做比喻:"铁锅炖大鹅",铁锅是状态后端,大鹅是状态,Checkpoint 是炖的动作。
状态:本质来说就是数据,在 Flink 中,其实就是 Flink 提供给用户的状态编程接口。比如 flink 中的 MapState,ValueState,ListState。
状态后端:Flink 提供的用于管理状态的组件,状态后端决定了以什么样数据结构,什么样的存储方式去存储和管理我们的状态。Flink 目前官方提供了 memory、filesystem,rocksdb 三种状态后端来存储我们的状态。
Checkpoint(状态管理):Flink 提供的用于定时将状态后端中存储的状态同步到远程的存储系统的组件或者能力。为了防止 long run 的 Flink 任务挂了导致状态丢失,产生数据质量问题,Flink 提供了状态管理(Checkpoint,Savepoint)的能力把我们使用的状态给管理起来,定时的保存到远程。然后可以在 Flink 任务 failover 时,从远程把状态数据恢复到 Flink 任务中,保障数据质量。
2.2. 把状态后端从 FileSystem 变为 RocksDB 后,Flink 任务状态存储会发生那些变化?
结论:是否使用 RocksDB 只会影响 Flink 任务中 keyed-state 存储的方式和地方,Flink 任务中的 operator-state 不会受到影响。
首先我们来看看,Flink 中的状态只会分为两类:
- keyed-state:键值状态,如其名字,此类状态是以 k-v 的形式存储,状态值和 key 绑定。Flink 中的 keyby 之后紧跟的算子的 state 就是键值状态;
- operator-state:算子状态,非 keyed-state 的 state 都是算子状态,非 k-v 结构,状态值和算子绑定,不和 key 绑定。Flink 中的 kafka source 算子中用于存储 kafka offset 的 state 就是算子状态。
如下图所示是 3 种状态后端和 2 种 State 的对应存储关系:
- ⭐ 横向(行)来看,即 Flink 的状态分类。分为 Operator state-backend、Keyed state-backend;
- ⭐ 纵向(列)来看,即 Flink 的状态后端分类。用户可以配置 memory,filesystem,rocksdb 3 中状态后端,在 Flink 任务中生成 MemoryStateBackend,FsStateBackend,RocksdbStateBackend,其声明了整个任务的状态管理后端类型;
- ⭐ 每个格子中的内容就是用户在配置 xx 状态后端(列)时,给用户使用的状态(行)生成的状态后端实例,生成的这个实例就是在 Flink 中实际用于管理用户使用的状态的组件。
结论:
- Flink 任务中的 operator-state。无论用户配置哪种状态后端(无论是 memory,filesystem,rocksdb),都是使用 DefaultOperatorStateBackend 来管理的,状态数据都存储在内存中,做 Checkpoint 时同步到远程文件存储中(比如 HDFS)。
- Flink 任务中的 keyed-state。用户在配置 rocksdb 时,会使用 RocksdbKeyedStateBackend 去管理状态;用户在配置 memory,filesystem 时,会使用 HeapKeyedStateBackend 去管理状态。因此就有了这个问题的结论,配置 rocksdb 只会影响 keyed-state 存储的方式和地方,operator-state 不会受到影响。
2.3. 什么样的业务场景你会选择 filesystem,什么样的业务场景你会选 rocksdb 状态后端?
在回答这个问题前,我们先看看每种状态后端的特性:
- MemoryStateBackend
-
原理:运行时所需的 State 数据全部保存在 TaskManager JVM 堆上内存中,执行 Checkpoint 的时候,会把 State 的快照数据保存到 JobManager 进程 的内存中。执行 Savepoint 时,可以把 State 存储到文件系统中。
适用场景:
a. 基于内存的 StateBackend 在生产环境下不建议使用,因为 State 大小超过 JobManager 内存就 OOM 了,此种状态后端适合在本地开发调试测试,生产环境基本不用。
b. State 存储在 JobManager 的内存中。受限于 JobManager 的内存大小。
c. 每个 State 默认 5MB,可通过 MemoryStateBackend 构造函数调整。
d. 每个 State 不能超过 Akka Frame 大小。
- FSStateBackend
-
原理:运行时所需的 State 数据全部保存在 TaskManager 的内存中,执行 Checkpoint 的时候,会把 State 的快照数据保存到配置的文件系统中。TM 是异步将 State 数据写入外部存储。
-
适用场景:
a. 适用于处理小状态、短窗口、或者小键值状态的有状态处理任务,不建议在大状态的任务下使用 FSStateBackend。比如 ETL 任务,小时间间隔的 TUMBLE 窗口
b. State 大小不能超过 TM 内存。
- ⭐ RocksDBStateBackend
-
原理:使用嵌入式的本地数据库 RocksDB 将流计算数据状态存储在本地磁盘中。在执行 Checkpoint 的时候,会将整个 RocksDB 中保存的 State 数据全量或者增量持久化到配置的文件系统中。
适用场景:
a. 最适合用于处理大状态、长窗口,或大键值状态的有状态处理任务。
b. RocksDBStateBackend 是目前唯一支持增量检查点的后端。
c. 增量检查点非常适用于超大状态的场景。比如计算 DAU 这种大数据量去重,大状态的任务都建议直接使用 RocksDB 状态后端。
到生产环境中:
- 如果状态很大,使用 Rocksdb;如果状态不大,使用 Filesystem。
- Rocksdb 使用磁盘存储 State,所以会涉及到访问 State 磁盘序列化、反序列化,性能会收到影响,而 Filesystem 直接访问内存,单纯从访问状态的性能来说 Filesystem 远远好于 Rocksdb。生产环境中实测,相同任务使用 Filesystem 性能为 Rocksdb 的 n 倍,因此需要根据具体场景评估选择。
2.4. Flink SQL API State TTL 的过期机制是 onCreateAndUpdate 还是onReadAndWrite
- 结论:Flink SQL API State TTL 的过期机制目前只支持 onCreateAndUpdate,DataStream API 两个都支持
- 剖析:
- onCreateAndUpdate:是在创建 State 和更新 State 时【更新 State TTL】
- onReadAndWrite:是在访问 State 和写入 State 时【更新 State TTL】
- 实际踩坑场景:Flink SQL Deduplicate 写法,row_number partition by user_id order by proctime asc,此 SQL 最后生成的算子只会在第一条数据来的时候更新 state,后续访问不会更新 state TTL,因此 state 会在用户设置的 state TTL 时间之后过期。
3.时间窗口
3.1. watermark 到底是干啥的?应用场景?
大部分同学都只能回答出:watermark 是用于缓解时间时间的乱序问题的。
没错,这个观点是正确的。但是博主认为这只是 watermark 第二重要的作用,其更重要的作用在于可以标识一个 Flink 任务的事件 时间进度。
怎么理解 时间进度?
我们可以现象一下,一个事件时间窗口的任务,如果没有一个 东西 去标识其事件时间的进度,那么这个事件时间的窗口也就是不知道什么时候能够触发了,也就是说这个窗口永远不会触发并且输出结果。
所以要有一个 东西 去标识其事件时间的进度,从而让这个事件时间窗口知道,这个事件时间窗口已经结束了,可以触发计算了。在 Flink 中,这个 东西 就是 watermark。
总结一下,博主认为 watermark 为 Flink 解决了两个问题:
- ⭐ 标识 Flink 任务的事件时间进度,从而能够推动事件时间窗口的触发、计算。
- ⭐ 解决事件时间窗口的乱序问题。
3.2. 一个 Flink 任务中可以既有事件时间窗口,又有处理时间窗口吗?
结论:一个 Flink 任务可以同时有事件时间窗口,又有处理时间窗口。
那么有些小伙伴们问了,为什么我们常见的 Flink 任务要么设置为事件时间语义,要么设置为处理时间语义?
确实,在生产环境中,我们的 Flink 任务一般不会同时拥有两种时间语义的窗口。
那么怎么解释开头博主所说的结论呢?
博主这里从两个角度进行说明:
- ⭐ 我们其实没有必要把一个 Flink 任务和某种特定的时间语义进行绑定。对于事件时间窗口来说,我们只要给它 watermark,能让 watermark 一直往前推进,让事件时间窗口能够持续触发计算就行。对于处理时间来说更简单,只要窗口算子按照本地时间按照固定的时间间隔进行触发就行。无论哪种时间窗口,主要满足时间窗口的触发条件就行。
- ⭐ Flink 的实现上来说也是支持的。Flink 是使用一个叫做 TimerService 的组件来管理 timer 的,我们可以同时注册事件时间和处理时间的 timer,Flink 会自行判断 timer 是否满足触发条件,如果是,则回调窗口处理函数进行计算。
3.3. window 后面跟 aggregate 和 process 的两个窗口计算的区别是什么?
- aggregate:是增量聚合,来一条数据计算完了存储在累加器中,不需要等到窗口触发时计算,性能较好;
- process:全量函数,缓存全部窗口内的数据,满足窗口触发条件再触发计算,同时还提供定时触发,窗口信息等上下文信息;
- 应用场景:aggregate 一个一个处理的聚合结果向后传递一般来说都是有信息损失的,而** process 则可以更加定制化的处理**。
4.编程技巧
4.1. 为什么 Flink DataStream API 在函数入参或者出参有泛型时,不能使用 lambda 表达式?
Flink 类型信息系统是通过反射获取到 Java class 的方法签名去获取类型信息的。
以 FlatMap 为例,Flink 在通过反射时会检查及获取 FlatMap collector 的出参类型信息。
但是 lambda 表达式写的 FlatMap 逻辑,会导致反射方法获取类型信息时【直接获取不到】collector 的出参类型参数,所以才会报错。
4.2. Flink 为什么强调 function 实现时,实例化的变量要实现 serializable 接口?
问题:
- ⭐ 为什么 Flink 要用到 Java 序列化机制。和 Flink 类型系统的数据序列化机制的用途有啥区别?
- ⭐ 非实例化的变量没有实现 Serializable 为啥就不报错,实例化就报错?
- ⭐ 为啥加 transient 就不报错?
答案:
- ⭐ Flink 写的函数式编程代码或者说闭包,需要 Java 序列化从 JobManager 分发到 TaskManager,而 Flink 类型系统的数据序列化机制是为了分发数据,不是分发代码,可以用非Java的序列化机制,比如 Kyro。
- ⭐ 编译期不做序列化,所以不实现 Serializable 不会报错,但是运行期会执行序列化动作,没实现 Serializable 接口的就报错了
- ⭐ Flink DataStream API 的 Function 作为闭包在网络传输,必须采用 Java 序列化,所以要通过 Serializable 接口标记,根据 Java 序列化的规定,内部成员变量要么都可序列化,要么通过 transient 关键字跳过序列化,否则 Java 序列化的时候会报错。静态变量不参与序列化,所以不用加 transient。
4.3. Flink 的并行度可以通过哪几种方式设置,优先级关系是什么?
- ⭐ 代码中算子单独设置
- ⭐ 代码中Env全局设置
- ⭐ 提交参数
- ⭐ 默认配置信息
上面的 Flink 并行度优先级从上往下由大变小。
5.实战经验
5.1. Flink SQL 计算用户分布
⭐ 需求:上游是一个 kafka 数据源,数据内容是用户 QQ 等级变化明细数据(time,uid,level)。需要你求出当前每个等级的用户数。
⭐ 实现 SQL:Deduplicate
-- 如果需要可以打开 minibatch
select
level
, count(1) as uv
, max(time) as time
from (
select
uid
, level
, time
, row_number() over (partition by uid order by time desc) rn
from source
) tmp
where rn =1
group by
level
5.2.Flink SQL 计算 DAU
⭐ 需求:数据源:用户心跳日志(uid,time,type)。计算分 Android,iOS 的 DAU,最晚一分钟输出一次当日零点累计到当前的结果。
⭐ 实现方式 1:cumulate 窗口
SELECT
window_start
, window_end
, platform
, sum(bucket_dau) as dau
from (
SELECT
window_start
, window_end
, platform
, count(distinct uid) as bucket_dau
FROM TABLE(
CUMULATE(
TABLE user_log,
DESCRIPTOR(time),
INTERVAL '60' SECOND
, INTERVAL '1' DAY))
GROUP BY
window_start
, window_end
, platform
, MOD(HASH_CODE(user_id), 1024)
) tmp
GROUP by
window_start
, window_end
, platform
- 优点:如果是曲线图的需求,可以完美回溯曲线图。
- 缺点:大窗口之间如果有数据乱序,有丢数风险;并且由于是 watermark 推动产出,所以数据产出会有延迟。
- ⭐ 实现方式 2:Deduplicate
-- 如果需要可以打开 minibatch
select
platform
, count(1) as dau
, max(time) as time
from (
select
uid
, platform
, time
, row_number() over (partition by uid, platform, time / 24 / 3600 / 1000 order by time desc) rn
from source
) tmp
where rn = 1
group by
platform
- 优点:计算快。
- 缺点:任务发生 failover,曲线图不能很好回溯。没法支持 cube 计算。
- ⭐ 实现方式 3:group agg
-- 如果需要可以打开 minibatch
SELECT
max(time) as time
, platform
, sum(bucket_dau) as dau
from (
SELECT
max(time) as time
, platform
, count(distinct uid) as bucket_dau
FROM source
GROUP BY
platform
, MOD(HASH_CODE(user_id), 1024)
) t
GROUP by
platform
- 优点:计算快,支持 cube 计算。
- 缺点:任务发生 failover,曲线图不能很好回溯。
5.3.你是怎么合理的评估 Flink 任务的并行度?
Flink 任务并行度合理行一般根据峰值流量进行压测评估,并且根据集群负载情况留一定量的 buffer 资源。
- ⭐ 如果数据源已经存在,则可以直接消费进行测试
- ⭐ 如果数据源不存在,需要自行造压测数据进行测试
对于一个 Flink 任务来说,一般可以按照以下方式进行细粒度设置并行度:
source 并行度配置:以 kafka 为例,source 的并行度一般设置为 kafka 对应的 topic 的分区数
transform(比如 flatmap、map、filter 等算子)并行度的配置:这些算子一般不会做太重的操作,并行度可以和 source 保持一致,使得算子之间可以做到 forward 传输数据,不经过网络传输
keyby 之后的处理算子:建议最大并行度为此算子并行度的整数倍,这样可以使每个算子上的 keyGroup 是相同的,从而使得数据相对均匀 shuffle 到下游算子,如下图为 shuffle 策略
- sink 并行度的配置:sink 是数据流向下游的地方,可以根据 sink 的数据量及下游的服务抗压能力进行评估。如果 sink 是 kafka,可以设为 kafka 对应 topic 的分区数。注意 sink 并行度最好和 kafka partition 成倍数关系,否则可能会出现如到 kafka partition 数据不均匀的情况。但是大多数情况下 sink 算子并行度不需要特别设置,只需要和整个任务的并行度相同就行。
5.4.你是怎么合理评估任务最大并行度?
前提:并行度必须 <= 最大并行度
最大并行度的作用:合理设置最大并行度可以缓解数据倾斜的问题
-
根据具体场景的不同,最大并行度大小设置也有不同的方式:
- 在 key 非常多的情况下,最大并行度适合设置比较大(几千),不容易出现数据倾斜,以 Flink SQL 场景举例:row_number = 1 partition key user_id 的 Deduplicate 场景(user_id 一般都非常多)
- 在 key 不是很多的情况下,最大并行度适合设置不是很大,不然会加重数据倾斜,以 Flink SQL 场景举例:group by dim1,dim2 聚合并且维度值不多的 group agg 场景(dim1,dim2 可以枚举),如果依然有数据倾斜的问题,需要自己先打散数据,缓解数据倾斜
最大并行度的使用限制:最大并行度一旦设置,是不能随意变更的,否则会导致检查点或保存点失效;最大并行度设置会影响 MapState 状态划分的 KeyGroup 数,并行度修改后再从保存点启动时,KeyGroup 会根据并行度的设定进行重新分布。
最大并行度的设置:最大并行度可以自己设置,也可以框架默认生成;默认的算法是取当前算子并行度的 1.5 倍和 2 的 7 次方比较,取两者之间的最大值,然后用上面的结果和 2 的 15 次方比较,取其中的最小值为默认的最大并行度,非常不建议自动生成,建议用户自己设置。