【Flink 精选】如何分析及处理反压?

阐述Flink、Storm,Spark Streaming 的反压机制,Flink 如何定位及分析反压?


概念

反压(backpressure)是流式计算中十分常见的问题。反压意味着数据管道中某个节点成为瓶颈,处理速率跟不上上游发送数据的速率,而需要对上游进行限速。由于实时计算应用通常使用消息队列来进行生产端和消费端的解耦,消费端数据源是 pull-based 的,所以反压通常是从某个节点传导至数据源并降低数据源(比如 Kafka consumer)的摄入速率。

① 节点有性能瓶颈可能是该节点所在的机器有故障(网络、磁盘等)、机器的网络延迟和磁盘不足、频繁GC、数据热点等原因。

② 大多数消息中间件,例如kafka的consumer从broker把数据pull到本地,而producer把数据push到broker

反压的影响

反压并不会直接影响作业的可用性,它表明作业处于亚健康的状态,有潜在的性能瓶颈并可能导致更大的数据处理延迟。通常来说,对于一些对延迟要求不高或者数据量较少的应用,反压的影响可能并不明显。然而对于规模比较大的 Flink 作业,反压可能会导致严重的问题。

反压会影响checkpoint

checkpoint时长:checkpoint barrier跟随普通数据流动,如果数据处理被阻塞,使得checkpoint barrier流经整个数据管道的时长变长,导致checkpoint 总体时间变长。

state大小:为保证Exactly-Once准确一次,对于有两个以上输入管道的 Operator,checkpoint barrier需要对齐,即接受到较快的输入管道的barrier后,它后面数据会被缓存起来但不处理,直到较慢的输入管道的barrier也到达。这些被缓存的数据会被放到state 里面,导致checkpoint变大。

checkpoint是保证准确一次的关键,checkpoint时间变长有可能导致checkpoint超时失败,而state大小可能拖慢checkpoint甚至导致OOM。 

1.Flink、Storm、Spark Streaming 的反压机制

网络流控的实现:动态反馈/自动反压

动态反馈

Consumer 需要及时给 Producer 做一个 feedback,即告知 Producer 能够承受的速率是多少。动态反馈分为两种:

负反馈:接受速率小于发送速率时发生,告知 Producer 降低发送速率

正反馈:发送速率小于接收速率时发生,告知 Producer 可以把发送速率提上来

1.1 Flink 反压机制

1.1.1 Flink 数据交换

Flink 的数据交换有3种:①同一个 Task 的数据交换,②不同 Task 同 JVM 下的数据交换,③不同 Task 且不同 TaskManager 之间的交换

(1)同一个 Task 的数据交换

通过算子链 operator chain 串联多个算子,主要作用是避免了序列化网络通信的开销。

Flink-数据交换1

算子链 operator chain 串联多个算子的条件:

① 上下游的并行度一致

② 下游节点的入度为1

③ 上下游节点共享同一个slot

④ 下游节点的 chain 策略为 ALWAYS(例如 map、flatmap、filter等默认是ALWAYS)

⑤ 上游节点的 chain 策略为 ALWAYS 或 HEAD(source默认是HEAD)

⑥ 两个节点间数据分区方式是 forward

⑦ 用户没有禁用 chain

(2)不同 Task 同 TaskManager 的数据交换

Flink-数据交换2

在 TaskA 中,算子输出的数据首先通过 record Writer 进行序列化,然后传递给 result Partition 。接着,数据通过 local channel 传递给 TaskB 的 Input Gate,然后传递给 record reader 进行反序列。

(3)不同 Task 且不同 TaskManager 之间的交换

Flink-数据交换3

与上述(2)的不同点是数据先传递给 netty ,通过 netty 把数据推送到远程端的 Task 。

1.1.2 Flink(before V1.5)的TCP-based反压机制

1.5 版本之前是采用 TCP 流控机制,而没有采用feedback机制

(1)Flink1.5 版本之前的TCP-based 反压机制

Flink 1.5 版本之前的反压机制  

发送端 Flink 有一层Network Buffer,底层用Netty通信即有一层Channel Buffer,最后Socket通信也有Buffer,同理接收端也有对应的3级 Buffer。Flink (before V1.5)实质是利用 TCP 的流控机制来实现 feedback 。

(2)TCP 利用滑动窗口实现网络流控

TCP报文段首部有16位窗口字段,当接收方收到发送方的数据后,ACK响应报文中就将自身缓冲区的剩余大小设置到放入16位窗口字段。该窗口字段值是随网络传输的情况变化的,窗口越大,网络吞吐量越高。

参考:1.【计算机网络】3.1 运输层 - TCP/UDP协议

           2.Apache Flink 进阶教程(七):网络流控及反压剖析

例子:TCP 利用滑动窗口限制流量

步骤1:发送端将 4,5,6 发送,接收端也能接收全部数据。

TCP 滑动窗口1

步骤2:consumer 消费了 2 ,接收端的窗口会向前滑动一格,即窗口空余1格。接着向发送端发送 ACK = 7、window = 1

TCP 滑动窗口2

步骤3:发送端将 7 发送后,接收端接收到 7 ,但是接收端的 consumer 故障不能消费数据。这时候接收端向发送端发送 ACK = 8、window = 0 ,由于这个时候 window = 0,发送端是不能发送任何数据,也就会使发送端的发送速度降为 0。

TCP 滑动窗口3
TCP 滑动窗口4

(3)TCP-based 反压机制的缺点

TCP-based 反压机制的缺点

① 单个Task的反压,阻塞了整个TaskManager的socket,导致checkpoint barrier也无法传播,最终导致checkpoint时间增长甚至checkpoint超时失败。

② 反压路径太长,导致反压时间延迟

1.1.3 Flink(since V1.5)的 Credit-based 反压机制

在 Flink 层面实现反压机制,通过 ResultPartition 和 InputGate 传输 feedback

Credit-base 的 feedback 步骤:

① 每一次 ResultPartitionInputGate 发送数据的时候,都会发送一个 backlog size 告诉下游准备发送多少消息,下游就会去计算有多少的 Buffer 去接收消息。(backlog 的作用是为了让消费端感知到我们生产端的情况)

② 如果下游有充足的 Buffer ,就会返还给上游 Credit (表示剩余 buffer 数量),告知发送消息(图上两个虚线是还是采用 Netty 和 Socket 进行通信)。

生产段发送backlog=1
消费端返回credit=3
当生产端用完buffer,返回credit=0
生产端也出现了数据积压

1.2 Storm 反压机制

Storm反压机制

 Storm 在每一个 Bolt 都会有一个监测反压的线程(Backpressure Thread),这个线程一但检测到 Bolt 里的接收队列(recv queue)出现了严重阻塞就会把这个情况写到 ZooKeeper 里,ZooKeeper 会一直被 Spout 监听,监听到有反压的情况就会停止发送。因此,通过这样的方式匹配上下游的发送接收速率。

Storm 提供的最基本的处理 stream 的原语是 spout 和 bolt。

spout 是流的源头。 通常 spout 从外部数据源(队列、数据库等)读取数据,然后封装成Tuple形式,之后发送到Stream中。

② bolt 处理输入的Stream,并产生新的输出Stream。bolt 可以执行Filter、Map、Join等操作。bolt 是一个被动的 角色,其接口中有一个execute(Tuple input)方法,在接收到消息之后会调用此函数,用户可以在此方法中执行自己的处理逻辑。

1.3 Spark Streaming 反压机制

Spark streaming反压机制

组件 RateController 监听负责监听“OnBatchCompleted”事件,然后从中抽取processingDelay 及schedulingDelay信息。RateEstimator 依据这些信息估算出最大处理速度(rate),最后由基于Receiver的Input Stream 将 rate 转发给 Executor 的 BlockGenerator,并更新RateLimiter

1.4 对比Flink、Storm、Spark Streaming 的反压机制

Flink、Storm、Spark Streaming 的反压机制都采用动态反馈/自动反压原理,可以动态反映节点限流情况,进而实现自动的动态反压。

Flink、Storm、Spark Streaming 反压机制的区别

Flink 是天然的流处理引擎,数据传输的过程相当于提供了反压,类似管道里的水(下游流动慢自然导致下游也慢),所以不需要一种特殊的机制来处理反压。

② Storm 利用 Zookeeper 组件和流量监控的线程实现反压机制,其中存在的问题有实现复杂、bolt 接收队列暴涨导致OOM、反压慢

Spark Streaming 是微批处理,可以根据前一批次数据的处理情况,动态、自动的调整后续数据的摄入量,其中存在的问题有实现复杂、时效性较差。

2 Flink 如何定位反压节点

2.1 Flink Web UI 自带的反压监控 —— 直接方式

Flink Web UI 的反压监控提供了 Subtask 级别的反压监控。监控的原理是通过Thread.getStackTrace() 采集在 TaskManager 上正在运行的所有线程,收集在缓冲区请求中阻塞的线程数(意味着下游阻塞),并计算缓冲区阻塞线程数与总线程数的比值 rate。其中,rate < 0.1 为 OK,0.1 <= rate <= 0.5 为 LOW,rate > 0.5 为 HIGH。

Web UI 反压监控 

以下两种场景可能导致反压:

该节点发送速率跟不上它的产生数据速率。该场景一般是单输入多输出的算子,例如FlatMap。定位手段是因为这是从 Source Task 到 Sink Task 的第一个出现反压的节点,所以该节点是反压的根源节点。

下游的节点处理数据的速率较慢,通过反压限制了该节点的发送速率。定位手段是从该节点开始继续排查下游节点。

注意事项:

① 因为Flink Web UI 反压面板是监控发送端的,所以反压的根源节点并不一定会在反压面板体现出高反压。如果某个节点是性能瓶颈并不会导致它本身出现高反压,而是导致它的上游出现高反压。总体来看,如果找到第一个出现反压的节点,则反压根源是这个节点或者是它的下游节点

② 通过反压面板无法区分上述两种状态,需要结合 Metrics 等监控手段来定位。如果作业的节点数很多或者并行度很大,即需要采集所有 Task 的栈信息,反压面板的压力也会很大甚至不可用

2.2 Flink Task Metrics —— 间接方式

(1)回顾 Flink Credit-based 网络

Flink Credit-Based 网络

① TaskManager 之间的数据传输

不同的 TaskManager 上的两个 Subtask 通常情况下,channel 数量等于分组 key 的数量或者等于算子并发度。这些 channel 会复用同一个 TaskManager 进程的 TCP 请求,并且共享接收端 Subtask 级别的 Buffer Pool。

② 接收端

每个 channel 在初始阶段会被分配固定数量的独享 Exclusive Buffer,用于存储接收到的数据。算子 Operator 使用后再次释放 Exclusive  Buffer。说明:channel 接收端空闲的 Buffer 数量称为 Credit,Credit 会被定时同步给发送端,用于决定发送多少个 Buffer 的数据。

③ 流量较大的场景

接收端,channel 写满 Exclusive Buffer 后,Flink 会向 Buffer Pool 申请剩余的 Floating Buffer。发送端,一个 Subtask 所有的 Channel 会共享同一个 Buffer Pool,因此不区分 Exclusive Buffer 和 Floating Buffer。

(2)Flink Task Metrics 监控反压

Network 和 task I/O metrics 是轻量级反压监视器,用于正在持续运行的作业,其中一下几个 metrics 是最有用的反压指标。

metrics反压指标

采用 Metrics 分析反压的思路:如果一个 Subtask 的发送端 Buffer 占用率很高,则表明它被下游反压限速了;如果一个 Subtask 的接受端 Buffer 占用很高,则表明它将反压传导至上游

inPoolUsage和outPoolUsage反压分析表

解释:

① outPoolUsage 和 inPoolUsage 同为低表明当前 Subtask 是正常的,同为高分别表明当前 Subtask 被下游反压。

② 如果一个 Subtask 的 outPoolUsage 是高,通常是被下游 Task 所影响,所以可以排查它本身是反压根源的可能性。

③ 如果一个 Subtask 的 outPoolUsage 是低,但其 inPoolUsage 是高,则表明它有可能是反压的根源。因为通常反压会传导至其上游,导致上游某些 Subtask 的 outPoolUsage 为高。

注意:反压有时是短暂的且影响不大,比如来自某个 channel 的短暂网络延迟或者 TaskManager 的正常 GC,这种情况下可以不用处理。

下表把 inPoolUsage 分为 floatingBuffersUsage 和 exclusiveBuffersUsage,并且总结上游 Task outPoolUsage 与 floatingBuffersUsage 、 exclusiveBuffersUsage 的关系,进一步的分析一个 Subtask 和其上游 Subtask 的反压情况。

outPoolUsage 与 floatingBuffersUsage 、 exclusiveBuffersUsage 的关系表

解析:

floatingBuffersUsage 为高则表明反压正在传导至上游

② exclusiveBuffersUsage 则表明了反压可能存在倾斜。如果floatingBuffersUsage 高、exclusiveBuffersUsage 低,则存在倾斜。因为少数 channel 占用了大部分的 floating Buffer(channel 有自己的 exclusive buffer,当 exclusive buffer 消耗完,就会使用floating Buffer)。 

3 Flink 如何分析反压

上述主要通过 TaskThread 定位反压,而分析反压原因类似一个普通程序的性能瓶颈

(1)数据倾斜

通过 Web UI 各个 SubTask 的 Records Sent 和 Record Received 来确认,另外 Checkpoint detail 里不同 SubTask 的 State size 也是一个分析数据倾斜的有用指标。解决方式把数据分组的 key 进行本地/预聚合来消除/减少数据倾斜。

(2)用户代码的执行效率

TaskManager 进行 CPU profile,分析 TaskThread 是否跑满一个 CPU 核:如果没有跑满,需要分析 CPU 主要花费在哪些函数里面,比如生产环境中偶尔会卡在 Regex 的用户函数(ReDoS);如果没有跑满,需要看 Task Thread 阻塞在哪里,可能是用户函数本身有些同步的调用,可能是 checkpoint 或者 GC 等系统活动

(3)TaskManager 的内存以及 GC

TaskManager JVM 各区内存不合理导致的频繁 Full GC 甚至失联。可以加上 -XX:+PrintGCDetails 来打印 GC 日志的方式来观察 GC 的问题。推荐TaskManager 启用 G1 垃圾回收器来优化 GC。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351