前言
看到一篇14年的关于流处理系统的弹性缩扩容的论文Elastic Scaling for Data Stream Processing,觉得挺有启发性,于是就写了一篇中文的解读。文章主要关注与设计与算法实现,关于benchmark的部分没有涉及。并不是论文翻译,没有做到很精准的表达,里面也算是我个人的理解,如果哪边有错误欢迎指正。
概述
- 流处理应用实际上就是由算子(点),数据流(边)组成的一个有向图。要对这样的应用进行弹性伸缩,本质上就是改变这个图映射到一组机器上的方式。
- 但是如何做到让应用开发者对此无感知的同时还能保证结果的正确性?并且如何设计一个高效的,资源节省的Scaling机制?
- 这套机制需要做到能根据工作量,还有可用的资源数量来动态的调节并行度。
基本概念
- 如下图就是一个由点(算子)和边(数据流组成的)流处理应用。
- 点与点之间往往不止一对一的关系(窄依赖),还有一对多,多对一,多对多的关系。
- 实际上多对多就是由多个多对一组合而成
- 在一对多的关系中,我们需要一个Splitter,流分区器
- 在多对一的关系中,我们需要一个Merger,流归并器(没想到更好的翻译)
- 而在多对多的关系中,上游存在Splitter,下游存在Merger
- 一个流经过流分区器会被分割为多个流,在这儿,我们把被分割的子流,叫做Channel,通道,对应着图上的边。
需求
- 首先有两个重要的需求
- 有状态的算子在扩容或者缩容时需要状态迁移 =>
- 让用户无感知
- 迁移尽可能少的状态
- 一个根据运行时的情况来决定是否和如何和Scaling的控制算法
- 鉴于一个通用的流处理系统会有大量的用户自定义的算子,基于代价的优化可行性不高
- 这套算法应该基于运行时的监控数据
- 提供SASO特性
- Stability 不容易产生震荡,导致频繁的增减并行度
- Accuracy 找到最大化吞吐的并行度
- Settling time 快速的到达稳定的最优并行度
- Overshoot 避免浪费资源,使用超过实际需要的并行度
- 有状态的算子在扩容或者缩容时需要状态迁移 =>
解决思路
- 针对状态
- 提供State的API,对用户屏蔽状态迁移的具体细节
- 一个增量(原文为增量,个人理解为部分)的状态迁移协议,配合基于一致性哈希的流分区策略
- 针对控制算法
- 依赖的监控数据
- Congestion Index,流分区器中的阻塞时间(简单理解为反压时间)
- Throughput 吞吐量
- 具体操作
- 基于监控动态增减下游并行度来解决Acurracy和Overshoot问题
- 记录历史表现来保证Stability,防止震荡
- Rapid Scaling 快速扩缩容来减少Settling time
- 依赖的监控数据
状态迁移
- 状态管理
- 从状态的角度来看算子可以分为无状态的算子和分区状态的算子(partitioned stateful)。
- 分区状态的算子需要根据流分区器的变化来迁移状态,并且迁移的状态是该分区的子集,所以要求系统能够比较细粒度的掌控这些状态(比如说keyBy这样的流分区器,系统能迁移指定key的集合的状态)
- 状态迁移协议
- 变量含义
- i 代表一个算子的某一个通道(分割的子流)的索引
- N 代表迁移过后通道的数量(下游新并行度)
- Si 代表存储在索引为i的算子的状态
- H 代表流分区器生成器,根据通道数量生成流分区器(在此处使用了一致性哈希)
- 迁移算法实现
- 算法分为两个阶段,donate和collect
- donate阶段,在流分区器更新后,在新的并行度下不再属于这个算子的状态(通过新的流分区方程获取状态属于哪个算子)会被收集起来,放入一个存储介质。
- 在donate和collect阶段之间有一个vertical barrier来确保在collect阶段开使前所有的算子已完成donate操作
- collect阶段,每个算子将其他算子donate的属于自己的状态取回
- horizontal barrier 用来防止collect完成前splitter发送新的数据到这个算子,出现数据不一致的情况
- 算法分为两个阶段,donate和collect
- 变量含义
- 流分区器生成
- 如果使用一个简单的流分区器生成器,比如使用一个哈希方程根据通道数量取模,可能会导致大量的状态在各个算子之间迁移,成本会非常高
- 在选择流分区器时要考虑两个因素
- 一是平衡,数据能否能被比较平均的分配到各个通道
- 二是单调性(monotonicity),即状态不会被迁移到一个迁移前后都存在的算子
- 简单来说,就是当新增一个通道时,过去存在的下游算子算子会将状态迁移到这个新通道对应的算子,而不会互相迁移。
- 当减少一个通道时,被减少的这个通道下游算子的状态只会被分配到其他过去存在的下游算子,而其他算子之间并不会迁移状态。
- 解决方案,一致性哈希
- O(1)的时间复杂度
- 每个partition对应一个环上的多个点,而数据经过哈希方程之后最靠近哪个点就会被分配到哪个partition
- 在一个环上新增一个点,只会从相邻的两个点分流数据
- 在一个环上减少一个点,之前经过这个点的数据只会被分配到相邻的两个点
- 使用这样一个哈希方程可以同时满足两个要求,并且计算本身的代价也是非常低的。
- 具体一致性哈希的实现这篇文章不会涉及,有相关的论文
控制算法
- 监控指标
- Congestion Index(阻塞标志)
- 本质是测量背压(消息阻塞)时间的比例是否超出了阈值
- 监控的意义
- 如果背压时间过多,说明消费能力不够,需要增加输出通道(Channel),也意味着下游需要增加并行度
- 如果不存在背压,说明可能资源超出实际需要,需要减少输出通道(Channel),也意味着下游需要降低并行度
- 该指标的变化可以反映工作量的变化
- Throughput 吞吐量
- 监控的意义
- 当增加通道(并发)后,吞吐量是操作是否有效最重要的检验标准
- 该指标的变化可以直观反映工作量的变化
- 监控的意义
- Congestion Index(阻塞标志)
- 算法基本原则
- 内容
- P1 如果存在阻塞(Congestion Index),查询历史记录,如果有记录显示增加通道(提高下游并行度)并不能增加吞吐量,则不变。其余情况增加并行度。
- P2 如果没有阻塞(Congestion Index),查询历史记录,如果有记录显示减少通道(降低下游并行度)会导致阻塞(Congestion)出现,则不变。其余情况降低并行度。
- 目的
- P1来保证SASO中的Acurracy, P2来保证避免SASO中的Overshoot
- 通过查询历史记录来判断是否改变并行度,保证SASO中的Stability,防止震荡
- 最终到达一个最优的并行度
- 内容
- 补充
- 针对情况: 并行度会最终达到一个稳定值,这个值确保在当前并行度下不会出现阻塞标志(Congestion Index),但是任何小于该并行度都会导致出现阻塞标志(Congestion Index)。 可是工作量(数据流量)本身处于波动,必须有机制去适应这种波动。
- 内容:如何适应工作量(数据流量)的变化(在并行度已经达到一个稳定值的情况下)
- P3 如果观测到阻塞标志(Congestion Index),即背压时间增加,意味着工作量(数据流量)的增加,则需要忘记历史记录,并按照P1要求增加并行度
- P4 如果观测到吞吐降低,则意味着资源过剩,则需要忘记历史记录,并按照P2降低并行度
- 优化
- 针对情况1: Remote Congestion(远程阻塞),即阻塞(Congestion)不是由下游引起的,而是由下游的下游引起的,增加下游并行度并没有效果。并且一个流处理应用也会有一个扩展的上限,这是由无法并行执行的部分决定的(比如上图中的Sink)。
- 内容: P5 如果提高并行度并不能提高吞吐,则降低并行度
- 针对情况2: 在资源和工作量都很高的情况下,该算法可能要很长时间才能找到最优的并行度,如何降低SASO中的Settling time也是需要考虑的因素之一
- 内容: P6 Rapid Scaling: 每次扩容或缩容的时候不是一个并行度一个并行度来,而是定义一个Level,这个Level和并行度呈超线性关系。一个比较合理的公式如下。
- 流程图
- 最后整个流程如下图所示,算法也是根据这个流程图实现的。
- 算法实现
- 变量含义
- P 当前阶段(可以理解为auto scaling是定期触发,两次触发之间形成一个阶段)
- L 当前所处Level(Rapid Scaling当中的Level)
- 以下变量角标均表示Level
- Pi 意味着上最后一次处于这个Level时所处的的阶段
- 如下图是一个Level随阶段变化的折线图
- 从当下来看,P0 = 1, P1 = 2, P2 = 4, P3 = 8, P4 = 6
- Ci 意味着最后一次处于这个Level时的是否阻塞
-
意味着最后一次处于这个level是的吞吐
-
意味着最后一次观察到有连续的阶段处于这个level时的吞吐量(比较拗口)
- Pi 意味着上最后一次处于这个Level时所处的的阶段
- L*代表level的最大值
- 变量初始化
- 算法主体
- 核心就在于根据监控数据获取最优的通道数(下游并行度)
- 首先根据阻塞和吞吐获取工作量(流量)是否变化
- 对应P3,P4,决定是否要忘掉历史记录
- 如果工作量减少,则清除所有当前Level以下的阻塞标记Ci和吞吐量Ti-| (i < L)
- 如果工作量增加,则清除所有当前Level以上的阻塞标记Ci 和吞吐量Ti-| (L < i < L*)
- 接下来更新当前Level的变量
- 之后判断是否存在远程阻塞,对应P5,如果存在则降低Level
- 如果不存在远程阻塞,但是当前存在阻塞,对应P1
- 并且有历史记录表明提升Level能提升吞吐(没有历史记录的话TL+1 -| 为无穷大,一定大于当前吞吐),则提升Level(增加通道,即增加并行度)
- 如果不存在远程阻塞,当前也不存在阻塞,对应P2
- 并且有历史记录表明降低Level不会产生阻塞(没有历史记录的话CL-1 为False,标识不会阻塞),则降低Level(减少通道,即降低并行度)
- 最后根据通道数与Level的共识来决定通道数(下游并行度)
- 通过阻塞和吞吐判断工作量变化的具体实现
- 通过阻塞判断工作量变化
- 因为两个阶段之间Level变为最多为1,所以分为三种情况
- 如果当前阶段Level等于上一阶段,并且阻塞情况从不阻塞到阻塞,则代表工作量(流量)增加,从阻塞到不阻塞,则代表工作量(流量)减少
- 如果当前阶段Level比上一阶段小一层,如果上一阶段阻塞,当前不阻塞则代表工作量(流量)降低
- 如果当前阶段Level比上一阶段高一层,如果上一阶段不阻塞,当前阻塞则代表工作量(流量)增加
- 其余情况返回Unknown
- 因为两个阶段之间Level变为最多为1,所以分为三种情况
- 通过吞吐判断工作量变化
- 与阻塞相同,还是分三种情况
- 如果当前阶段Level等于上一阶段,先判断吞吐是否有变化,再判断吞吐变化是否超出了阈值,这个阈值与TL+1 |- 相关(这个值不会被遗忘,只会被更新),如果都满足,根据吞吐量增减返回工作量(流量)的增减
- 如果当前阶段Level小于上一阶段,并且吞吐量上升了,则代表工作量(流量)上升
- 如果当前阶段Level高于上一阶段,并且吞吐量下降了,则代表工作量(流量)下降
- 其余情况返回Unknown
- 与阻塞相同,还是分三种情况
- 通过阻塞判断工作量变化
- 变量含义
思考
- 状态迁移中,运行机制必须能从一个partition钟获取一个子集的key的状态,迁移时对于借助类似于level db存储状态的系统来说可能不是那么友好(不确定)?可以通过更粗粒度的状态迁移来克服?
- 状态迁移中,这两个引入的barrier能否通过异步来做?
- 使用一致性哈希,确实比较均衡,但是背压常常是由数据倾斜带来的,能否将背压的通道一分为二?
引用
Buğra Gedik ; Scott Schneider ; Martin Hirzel ; Kun-Lung Wu "Elastic Scaling for Data Stream Processing"