Flink SQL 性能优化实战

缘起


最近我们组在大规模上线Flink SQL作业。首先,在进行跑批量初始化完历史数据后,剩下的就是消费Kafka历史数据进行追数了。但是发现某些作业的追数过程十分缓慢,要运行一晚上甚至三四天才能追上最新数据。由于是实时数仓指标计算上线初期,经常验证作业如果有问题就得重蹈覆辙重新追数,效率很低,于是我开始分析Flink SQL的优化。

问题


insert into tableB
select a, max(b), max(c), sum(d) ...
from tableA
group by a

上面这个作业的简化版SQL,主要就是做一个分组聚合:

  1. 从tableA分组聚合出结果插入tableB

  2. tableA的联合主键是:a,b(但是a的离散度已经很高了)

  3. tableA的Flink表类型为upset-kafka

  4. tableB的Flink表类型为HBase

初步分析


这个作业跑在集群上的job graph如下:

image.png

可以看到有三个vertex:

  1. 第一个是TableSourceScan

  2. 第二个是ChangelogNormalize

  3. 第三个是GroupAggregate

TableSourceScan接入tableA表的upsert-kafka流;

ChangelogNormalize对upset-kafka进行撤回语义的解析;

GroupAggregate对撤回流进行分组聚合,然后写入tableB的HBase;

优化思路1:local/global agg


agg分类:

  • group agg
select count(a) from t group by b
  • over agg
select count(a) over (partition by b order by c) from t
  • window agg
select count(a) from t group by tumble(ts, interval '10' seconds), b

local/global agg:

image.png

核心思想与hadoop的combiner是一致的,就是在mapreduce的过程中,在map阶段就做一个预聚合,即combine操作。

[图片上传失败...(image-c0ad24-1650075387085)]

带来的收益是:减少网络shuffle数据,提升计算引擎的性能。

前提条件:

  1. agg的所有agg function都是mergeable(实现merge方法)

  2. table.optimizer.agg-phase-strategy为AUTO或TWO_PHASE

  3. Stream下,minibatch开启;Batch下,AUTO会根据cost选择

解释说明:

mergeable其实就是能用分治法解决的计算问题,例如sum、count等,而avg就不能用分治法先计算部分元素的avg,再计算最终avg了,结果有时候会出错。

table.optimizer.agg-phase-strategy:默认为AUTO,意思是引擎尽量做预聚合;TWO_PHASE表示所有聚合操作都做预聚合;ONE_PHASE表示所有聚合都不做预聚合。

minibatch:即开启微批模式。主要有三个参数:

  • table.exec.mini-batch.enabled:是否开启,默认不开启
  • table.exec.mini-batch.size:微批的record buffer大小
  • table.exec.mini-batch.allow-latency:微批的time buffer大小

minibatch的本质就是平衡实时性和吞吐量的刻度尺。

所以,local/global agg一共需要三个参数控制。

验证


经过对比验证,在这个SQL场景下的效率提升很小。

local/global agg降低了第二个vertex即ChangelogNormalize的sent records的数据量,而并没有使得第一个vertex的数据处理效率有显著提升。

所以,这个作业的瓶颈并不在vertex间, 而在于第一个vertex的处理数据效率。

优化思路二:调大并行度


这个思路的关键在于source upsert-kafka的分区数,这是制约吞吐量的瓶颈。因为在upsert-kafka中,每个partition最多被一个Flink线程读取。

增加了10倍的并行度,source分区也增加10倍后,作业周转时间缩短了将近一半。

优化思路三:RocksDB性能调优


仔细分析这个SQL作业,是对一个联合主键的字段做group by,那么state一定会非常大。

经过在对这个表在数仓中的数据进行分析,发现这个字段的离散度几乎接近于主键的离散度。

而进行group by必然要根据每一条upsert kafka的数据去查验在flink statebackend中物化的source table中该字段值的分布情况,这应该是才是瓶颈所在!

沿着这个思路,开始分析Flink的statebackend机制。

这里我们简单回顾一下Flink statebackend(后面再做专题总结):

由 Flink 管理的 keyed state 是一种分片的键/值存储,每个 keyed state 的工作副本都保存在负责该键的 taskmanager 本地中。另外,Operator state 也保存在机器节点本地。Flink 定期获取所有状态的快照,并将这些快照复制到持久化的位置,例如分布式文件系统。

如果发生故障,Flink 可以恢复应用程序的完整状态并继续处理,就如同没有出现过异常。

Flink 管理的状态存储在 state backend 中。Flink 有两种 state backend 的实现 – 一种基于 RocksDB 内嵌 key/value 存储将其工作状态保存在磁盘上的,另一种基于堆的 state backend,将其工作状态保存在 Java 的堆内存中。这种基于堆的 state backend 有两种类型:FsStateBackend,将其状态快照持久化到分布式文件系统;MemoryStateBackend,它使用 JobManager 的堆保存状态快照。

image.png

当使用基于堆的 state backend 保存状态时,访问和更新涉及在堆上读写对象。但是对于保存在 RocksDBStateBackend 中的对象,访问和更新涉及序列化和反序列化,所以会有更大的开销。但 RocksDB 的状态量仅受本地磁盘大小的限制。还要注意,只有 RocksDBStateBackend 能够进行增量快照,这对于具有大量变化缓慢状态的应用程序来说是大有裨益的。

所有这些 state backends 都能够异步执行快照,这意味着它们可以在不妨碍正在进行的流处理的情况下执行快照。

我们的线上一般采用的是RocksDB作为状态后端,checkpoint dir采用hdfs文件系统。其实我个人觉得这个应该根据作业的特性进行选择,根据我个人的经验以及知识沉淀,选择的主要因素是作业的state大小及对处理数据性能的要求:

  • RocksDBStateBackend可以突破内存的限制,rocksDB的数据逻辑结构和redis相似,但是数据的物理存储结构又和hbase相似,继承自levelDB的LSM树思想,缺点是性能太低

  • 而FsStateBackend是在做snapshot的时候才将内存的state持久化到远端,速度接近于内存状态

  • MemoryStateBackend是纯内存的,一般只用做调试。

但是由于这个大状态作业追数速度实在太慢,我甚至想过:

在追数的时候用FsStateBackend,并配置大内存,且把managed memory调成0,同时将ck的周期设置的很大,基本上不做ck,追上后savepoint。再把状态后端换成RocksDB,并且从FSSatebackend的savepoint处恢复,但是发现1.13才支持savepoint切换statebackend类型。

只剩下调优RocksDB一条路了。根据之前对HBase的LSM原理的理解,进行知识迁移,马上对RocksDB有了一定的认识。在HBase中调优效果最明显无乎:

blockcache读缓存、memStore写缓存、增加布隆过滤器、提升compact效率

沿着这个思路,再查阅了一番RocksDB资料后,决定先对如下参数进行调优:

  • state.backend.rocksdb.block.cache-size

  • state.backend.rocksdb.block.blocksize

Block 块是 RocksDB 保存在磁盘中的 SST 文件的基本单位,它包含了一系列列有序的 Key 和 Value 集合,可以设置固定的大小。

image.png

但是,通过增加 Block Size,会显著增加读放大(Read Amplification)效应,令读取数据时,吞吐量下降。原因是 Block Size增加以后,如果 Block Cache 的大小没有变,就会⼤大减少 Cache 中可存放的 Block 数。如果 Cache 中还存处理索引和过滤器等内容,那么可放置的数据块数目就会更少,可能需要更多的磁盘 IO 操作,找到数据就更更慢了,此时读取性能会大幅下降。反之,如果减小BlockSize,会让读的性能有不少提升,但是写性能会下降,⽽而且对 SSD 寿命也不利。

因此我的调优经验是,如果需要增加 Block Size 的大小来提升读写性能,请务必一并增加 Block Cache Size 的大小,这样才可以取得比较好的读写性能。Block Cache,缓存清除算法⽤用的是 LRU(Least Recently Used)。

验证


测试对比后发现,原本半天左右完成的作业只需要一到两个小时即可追上数据!

感悟


性能调优就如同把脉治病,关键在于对症下药。

前期,要分析当前场景下真正制约性能的瓶颈所在,后期,在症结处用效果最明显的方式处理症结。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,335评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,895评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,766评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,918评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,042评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,169评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,219评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,976评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,393评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,711评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,876评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,562评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,193评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,903评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,699评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,764评论 2 351

推荐阅读更多精彩内容