1. 内存模型与计算案例
假设参数默认条件下,提交DMsize内存为4G,内存分配如下:
JVM执行开销:4g0.1 = 409.6M
flink内存开销:4g - 256m - 409.6 = 3430.4m
网络内存3430.4m0.1
托管内存:3430.4m*0.4
框架内存,堆外和堆内都是128m
task堆内内存=3430.4m - 128m -128m - 343.04m -1372.16m
flink提供了大量参数来设置对应区块的大小:
官网不建议同时设置进程总内存和 Flink 总内存。 这可能会造成内存配置冲突,从而导致部署失败。 额外配置其他内存部分时,同样需要注意可能产生的配置冲突。所以Flink的内存配置是三选一:要么配置进程总内存、要么配置Flink总内存、要么配置堆内内存+托管内存
2. 并行度的设置
开发完成后,先进行压测,任务并行度建议给10以下,测试单个并行度的处理上限,然后根据总QPS/单并行度的处理能力 = 并行度(最好根据高峰期的QPS,并行度*1.2,富余一些资源)
source端并行度的配置:一般数据源为kafka,source的并行度设置为kafka对应的topic分区数.flink的一个并行度可以处理一至多个分区的数据,如果并行度多于kafka的分区数,那么就会造成并行度空闲,浪费资源,也可能出现数据倾斜的情况
transform端并行度的配置:keyby之前的例如map,filter可以和source保持一致,keyby之后如果并发比较大建议调整成2的整数次幂
sink端并行度的配置:sink端的数据量比较小,比如是一些监控告警的场景,并行度可以设置的小一些,如果是kafka,可以设置对应的分区数,具体情况具体分析,,要根据下游服务的抗压能力进行综合评估
3. 大状态调优
RocksDB
RocksDB是基于LSM Tree实现的,写数据都是先缓存到内存中,所以RocksDB的写请求效率是比较高的,RocksDB使用内存结合磁盘的方式来存储数据,每次获取数据时,先从内存中blockcache中查找,如果内存中没有再去次磁盘中查询,使用RocksDB时,状态大小仅受磁盘空间量的限制,性能瓶颈主要在于RocksDB对磁盘的读请求,每次读写操作都必须对数据进行反系列化和序列化,当处理性能不够时,仅需要横向扩展并行度即可提高整个job的吞吐量
开启增量检查点和本地恢复
RocksDB是目前唯一可用于支持状态流处理应用程序增量检查点的状态后端,可以修改参数开启增量检查点:
state.backend.incremental:true
new EmbeddedRocksDBStateBacken(true)
当flink任务失败时,可以基于本地的状态信息进行恢复任务,可能不需要从dfs拉取,本地恢复目前仅涵盖键控类型的状态后端(RocksDB)
调整预定义选项
blockcache和writebuffer调优
- 增大block缓存:整个RocksDB共享一个block cache,读取数据时内存cacke大小,该参数值越大,读取数据缓存的命中率越高
- 增大write buffer 和level阈值大小
- 增大write buffer数量
- 增大后台线程数和write buffer合并数
- 开启分区索引:对RocksDB增加了分区索引功能,复用了RocksDB的partitioned index & filter功能,简单来说就是对RocksDB的partitioned index做了多级索引,也就是将内存中的最上层常驻,下层根据需要再load回来,这样就大大降低了数据swap竞争,再线上测试中,相对内存比较小的场景下,性能提升10倍左右
4. Checkpoint
一般的需求,我们的Checkpoint时间间隔可以设置分钟级别,但是对于一些状态很大的任务每次Checkoing访问dfs比较耗时,可以设置为5-10min一次,并且调大每次Checkpoint的间隔,例如设置两次Checkpoint之间至少4/8min,同时也要结合时效性的要求和性能之间做一个平衡.如果Checkpoint语义设置配置是EXACTLY_ONCE,那么在Checkpoint过程中还会存在barrier对其的过程,可以通过Flink-UI的Checkpoint的选项卡来查看Checkpoint过程中各个阶段的耗时情况
5. 反压
Flink 中文社区 | Apache Flink 进阶教程(七):网络流控及反压剖析 (flink-learning.org.cn)
参考维表关联实战参考:Flink 中文社区 | Flink DataStream 关联维表实战
维度数据实时关联的实践(w/ Flink、Vert.x & Guava Cache)
6. 数据倾斜
- keyby之后的聚合
使用locakeyby的思想:在keyby上游算子数据发送前,首先对上游算子本地数据进行聚合后,在发送到下游,使得下游接收到的数据量大大减少,从而使得keybu之后的聚合操作不再是任务的瓶颈,类似于mapreduce中combiner思想 - keyby之前的数据倾斜
产生该情况大概是由于数据源的数据本身就是不均匀的,这种情况,需要使用flink强制shuffle - keyby之后窗口发生数据倾斜
二次聚合
SQL写法参考:Flink教程-keyby 窗口数据倾斜的优化
7. job优化
- 算子指定UUID,默认情况下,算子是根据Jobgraph自动生成的,Jobgraph的更改可能会导致UUID改变,手动指定算子UUID,可以让Flink有效地将算子从savepoint映射到作业修改后的正确的算子上
- 链路延迟测量,关注数据输入,计算和输出的及时性
- 开启对象重用
- 细粒度滑动窗口优化思路:思路一,加上新增的步长数据,减去滑动的步长数据,思路二:数据分片
8. flinksql优化
- TTL
- minbatch
- localglobal解决数据倾斜
- split distinct
- 多维distinct使用fliter
9. 常见故障配置
- 非法配置异常
- java堆内存异常
- 直接缓冲存储器异常
- 元空间异常
- 网络缓冲区数量不足
- 超出容器内存异常
- checkpoint失败
- checkpoint慢
- kafka动态发现分区
- 水位线不更新
- 依赖冲突
- 超出文件描述符限制
- 脏数据导致数据转发失败
- 通讯超时
10. 状态一致性
内部保证 -- checkpoint
source端 -- 可重设数据的读取位置
sink端 -- 从故障恢复时,数据不会重复写入外部系统
幂等性 :就是说一个操作,可以重复执行很多次,但是只会导致一次结果变更,也就是说再重复的数据执行就不起作用了
事务写入
仅支持幂等性写入,有如下问题,例如是: 10 30 50 | 70 90
在50的时候有保存点,但是消费到90的时候,挂掉了,然后数据需要从50开始重新消费,就业务来看数据是不对的: 10 30 50 | 70 90 70 90
所以我们需要事务实现的思想是:构建的事务对应checkpoint,等到checkpoint真正完成的时候,才把所有对应的结果写入sink系统中
- 预写日志的方案不可行
- 两阶段提交:对于每一个checkpoint,sink任务会启动一个事务,并将接下来所有接受到的数据添加到事务中,然后将这些数据写入到外部sink系统,但是不提交他们--这个时候只是预提交,当它收到checkpoint完成的通知时,它才正式提交事务,实现结果的真正写入