0x01 背景
反压概念在流式计算中非常流行,要解决的问题是短时间内业务流量剧增,数据流入速度远高于数据处理速度,会对流处理系统构成巨大负载压力,如果不能正确处理,可能导致系统运行异常。因此有效的反压机制对保障流处理系统的稳定性至关重要。
Storm反压理念,bolt处理消息来不及的情况下会向ZK写一个节点进行反压,监听bolt上游的节点停止发送数据直到下游的bolt能够正常处理。这样的问题是数据可能出现骤降的过程,并且反压结束数据流下来之后容易导致新一次的反压,从而导致tps会一直抖动。
Jstorm做了两级反压,第一级和storm类似,通过执行队列来监测,但是不会通过ZK来协调,而是通过Topology Master来协调。在队列中会标记high water mark和low water mark,当执行队列超过high water mark时,就认为bolt来不及处理,则向TM发一条控制消息,上游开始减慢发送速率,直到下游低于low water mark时解除反压。此外,在Netty层也做了一级反压,由于每个Worker Task都有自己的发送和接收的缓冲区,可以对缓冲区设定限额、控制大小,如果spout数据量特别大,缓冲区填满会导致下游bolt的接收缓冲区填满,造成反压。
本文主要介绍task接收和发送反压实现。
0x02 emit反压实现
spout/bolt要向下游发射消息,就调用Collect.emit方法,该方法分两步实现:
1、把emit的消息缓存到task内部的消息队列(send Buffer),在缓存时判断队列是否处在反压状态,如果在反压状态,就一直循环等待直到队列降到低水位; 如果缓存消息后队列达到高水位,就标识该队列存在状态。
缓存队列大小由topology.executor.send.buffer.size参数控制。
2、后台发送线程异步的从缓存队列拿消息,然后通过NettyClient发送。NettyClient会一直等待直到下游task(targetTask)解除反压才通过socket发送消息。获取下游task状态有两种方式:第一种根据Netty Server端response判断;第二种Topology Master发送的控制消息。
如果下游task发生反压,NettyClient就不会从send buffer里取数据向下游发送,在不断emit时,会导致send buffer达到高水位,进而emit产生阻塞(一直等待send buffer降到低水位)。
0x03 receive反压实现
NettyServer处理NettyClient发送请求,也是分两步走:
1、server接收到client发送的消息后,先把消息放到接收缓存队列。同样消息缓存后也要判断队列是在高水位,如果队列达到高水位,返回给client端该task处在反压状态、并发送控制消息。
topology.executor.receive.buffer.size参数控制接收端缓存队列大小。
2、后台线程(BoltExecuts)从缓存队列拿消息,然后把event封装成tuple传递给Bolt的execute方法。
如果bolt execute方法执行时间过长,会导致Executs线程从队列拿消息速度变慢,在上游task emit速度不变的情况下,缓存队列就会达到高水位,最终导致该task触发反压。
emit和receive的整个反压过程:
0x04 相关参数
topology.backpressure.enable:开启反压
topology.backpressure.water.mark.low:低水位,当队列使用量低于这个量时,认为可以解除阻塞,默认值0.2
topology.backpressure.water.mark.high:高水位,当队列使用量超过这个值时,认为阻塞,默认值0.8
0x05 后记
JStorm的消息发送和接收都加了消息队列,本质上是生产消费模型,这样设计能降低系统对某个组件的依赖性,提高整体性能。如果对生产消费模型进行限流,JStorm反压机制很有参考意义,提供一种实现思路。通过水位检测队列过载风险,也是个很巧妙高效的设计。
本文首发于公众号:data之道