流式计算框架编程接口的标准化,傻瓜化,SQL化,自打谷歌发表Dataflow编程模型的Paper起,就有走上台面的趋势。各家计算框架都开始认真考虑相关的问题,俨然成为大家竞争的热点方向。在过去一年多的时间里,Beam/Flink/Spark在这方面的努力和相关工作也逐渐落地成熟,实际线上成熟应用的日子看起来指日可待了。 所以,翻出一年多前阅读DataFlow Paper的旧文,更新一下部分过时信息。
本文主要阐述DataFlow编程模型的思想,基本上可以认为,这是当前各种流式计算的上层编程模型背后的理论原型基础,篇尾再简单对比一下 Spark Structured Streaming的编程模型实现,以及 Flink/Beam/StreamCQL在这方面的相关进展情况。
DataFlow是什么东西
谷歌的Dataflow首先是一个服务 https://cloud.google.com/dataflow, 为用户提供以流式或批量模式处理海量数据的能力,该服务的编程接口模型(或者说计算框架)也就是下面要讨论的Dataflow模型
流式计算处理框架很多,也有大量的模型/框架号称能够同时较好的处理流式和批量计算场景,比如Lambda模型,比如Spark小批量模型等等,那么Dataflow模型有什么特别的呢?
这就要从流式(streaming) 和批量( batch) 这两个词的语意说起,简单的说,谷歌的同学认为目前的各种计算框架模型以及用户在一定程度上对这两个词的语意的使用姿势不够恰当,或者说用这两个词来区分应用场景,进而给计算框架分类并不合适。而这种分类给框架的应用甚至设计带来了一定的认识的误解和偏差。
谷歌的同学认为,当大家谈论到流式计算,或者流式数据时,内心想表达的场景,实际上更准确的说法应该是unbounded data(processing),也就是无边界的连续的数据(的处理);对应的批量(计算),更准确的说法是bounded data(processing),亦即有明确边界的数据的处理。而streaming和batch实际上只是这两种数据集历史上传统使用的处理方式而已,这两者并不完全等价。而随着技术的发展,继续用这种方式来分类和看待问题就显得不够高大上了。
再比如,下面这一篇抨击Lambda模型的文章: https://www.oreilly.com/ideas/questioning-the-lambda-architecture 所表达的观点,一定程度上来说也是这种思想的一个体现。
而Dataflow模型则是谷歌的同学在处理无边界数据的实践中,总结的一套SDK级别的解决方案,其目标是做到在非有序的,无边界的海量数据上,基于事件时间进行运算,并能根据数据自身的属性进行window操作,同时数据处理过程的正确性,延迟,代价可根据需求进行灵活的调整配置。
DataFlow的底层计算引擎依托于 Millwheel 实时计算框架和FlumeJava批处理框架,在谷歌开源了相关SDK以后,发起了beam项目: http://beam.incubator.apache.org/ , 为了拉拢开源社区的同学,其底层计算引擎也可以替换适配成Spark/Flink等开源计算框架(适配工作持续进行中)
核心思想
先来看看Lambda模型被挑战的点:用一个流式+批量的拼凑方案去解决海量无限数据的实时统计问题,看起来很美,但是出发点立意有些Low(亦即,认定了这种问题只能通过两套截然不同的框架模型去协同处理),而维护两套计算框架模型和处理逻辑的代价始终是这个模型无法克服的痛点。
虽然有各种上层封装抽象,统一SDK编程接口方案的存在,企图通过一套代码,翻译执行的方式,降低在两套计算框架模型上开发和维护代码的代价,但实际效果往往并不如意,翻译执行层的存在,并不能抹平两种计算框架在模型根源上的差异,到头来真正能复用的代码逻辑并不多,简单的说就是Lambda框架本身并不解决用户真正的痛点,而只是一种没有出路的情况下的无奈之举。
而Dataflow计算模型,则是希望从编程模型的源头上,统一解决传统的流式和批量这两种计算语意所希望处理的问题。
和Spark通过micro batch模型来处理Streaming场景(如前,更准确的说法是无边界数据集)的出发点不同,Dataflow认为batch的处理模式只是streaming处理模式的一个子集。在无边界数据集的处理过程中,要及时产出数据结果,无限等待显然是不可能的,所以必然需要对要处理的数据划定一个窗口区间,从而对数据及时的进行分段处理和产出,而各种处理模式(stream,micro batch,session,batch),本质上,只是窗口的大小不同,窗口的划分方式不同而已。
比如,Batch的处理模式就只是一个窗口区间涵盖了整个有边界的数据集这样的一种特例场景而已。一个设计良好的能处理无边界数据集的系统,完全能在准确性和正确性上做到和“Batch”系统一样甚至应该更好。而不是传统的认为batch框架的正确性更好,streaming框架顾及了实时性,正确性天然就做不好,必须和batch框架配合走Lambda模型来补足。
那么无边界数据集的处理过程中,大家认为天然做不好的点,或者说最难处理的点在哪里,Dataflow模型是怎么解决的呢。
这里又要先说一下在Dataflow模型里强调的两个时间概念:Event time和 Process time:
Event time 事件时间就是数据真正发生的时间,比如用户浏览了一个页面,或者下了一个订单等等,这时候通常就会有一些数据会被生产出来,比如前者可能会产生一条用户的浏览日志
而Process time则是这条日志数据真正到达计算框架中被处理的时间点,简单的说,就是你的程序是什么时候读到这条日志的
现实情况下,由于各种原因,数据采集,传输到达处理系统的时间可能会有长短不同的延迟,在分布式应用场景环境下,不仅是延迟,数据乱序到达往往也是常态。这些问题,在有边界数据集的处理过程中往往并不存在,或者无关紧要。
基于这种无边界数据集的特性,在Dataflow模型中,数据的处理过程中需要解决的问题,被概括为以下4个方面:
- What results are being computed. : 计算逻辑是什么
- Where in event time they are being computed. : 计算什么时候(事件时间)的数据
- When in processing time they are materialized. : 在什么时候(处理时间)进行计算/输出
- How earlier results relate to later refinements. : 后续数据如何影响(修正)之前的计算结果
清晰的定义这些问题,并针对性的在模型框架层面加以解决,正是Dataflow模型区别于其它流式计算模型的核心关键所在。通常的流式计算框架往往模糊或者无法有效的区别对待数据的事件时间和处理时间,对于第4个问题,如何修正数据,也可能缺乏直接的支持。这些问题通常需要开发人员在业务代码逻辑层面,自行想办法解决,因而也就加大了这类数据处理业务的开发难度,甚至让这种业务的开发成为一个不可能完成的任务。
而更重要的是,针对同一或类似数据集,各种数据处理需求,其核心计算逻辑往往可能是一致的,但是根据应用目标场景,统计口径可能各有不同。
比如计算活跃用户数,核心计算逻辑就是一个去重计数逻辑。具体实施时,可能要求计算过去一个小时的活跃用户,也可能是计算全天的累计的活跃用户,可能基于实际时间计算也可能基于数据采集时间计算,可能要求更新历史数据(有数据晚到),也可能出于效率,性能考虑,直接放弃晚到的数据。
Dataflow计算模型的目标是把上述4方面的问题,用明确的语意,清晰的拆分出来,更好的模块化,从而实现在模型层面调整局部设置,就能快速适应各种业务逻辑的开发需求。
例如在 https://cloud.google.com/dataflow/blog/dataflow-beam-and-spark-comparison 一文中,就用实际的例子比较了Beam和Spark在处理这类数据业务逻辑时,所需要进行的开发工作,总体的意思就是用dataflow模型开发,代码更简洁,语义更明显,更容易理解,开发效率更高,维护成本更低。
当然,Spark 2.0开始启动的Structure Streaming API也引入了和Dataflow类似的模型思想,那篇文章里的很多比较已经不成立了。
理论模型
那么Dataflow是如何解决上面四个方面的问题呢,基本上,是通过构建以下三个核心功能模型来做到的:
- 一个支持基于事件时间的窗口(window)模型,并提供简易的API接口:支持固定窗口/滑动窗口/Session(以Key为维度,基于事件时间连续性进行划分)等窗口模式
- 一个和数据自身特性绑定的计算结果输出触发模型,并提供灵活可描述的API接口
- 一个增量更新模型,可以将数据增量更新的能力融合进上述窗口和结果触发模型中。
窗口模型
为了在计算框架级别实现基于事件时间的窗口模型,Dataflow系统中,将常见的流式计算框架中的[key,value]两元组tuple形式的信息数据,变换成了[key,value, event time, window ]这样的四元组模型
Event time的引入原因显而易见,必须要有相关载体承载这个信息,否则只能基于Process time/Batch time 来划分窗口
而window窗口标识信息的引入,个人认为,很重要的一个原因是要支持Session类型的窗口模型,而同时,要将流式和增量更新的支持融合进窗口的概念中,也势必需要在数据中引入这样一个显式的窗口信息(否则,通常的做法就只能是用micro batch分组数据的方式,隐式的标识数据的窗口属性)
在消息的四元组数据结构基础上,Dataflow通过提供对消息进行窗口赋值,窗口合并,按key分组,按窗口分组等原子功能操作,来实现各种窗口模型。
窗口触发模型
多数基于Process time定义的固定窗口或滑动窗口模型,并没有特别强调窗口触发这样一个概念,因为在这类模型中,窗口的边界时间点,也就是触发计算结果输出的时间点,并不需要特别加以区分。
而对于Dataflow这样的基于事件时间的模型来说,由于事件时间和处理时间之间存在非固定的延迟,而框架又需要正确的处理乱序的数据,这使得判断窗口的边界位置,进而触发计算和结果输出变得困难起来。
在这一点上,Dataflow部分借用了底层Millwheel提供的Low watermark低水位这样一个概念来解决窗口边界的判断问题,当低水位对应的时间点超过设定的时间窗口边界时间点时,触发窗口的计算和结果输出。但是,低水位的概念在理论上虽然是OK的,在实际场景中,通常是一个概率模型,并不能完全保证准确的判断事件时间的延迟情况,而且有很多场合对窗口边界的判断,用户自己有自己的需求。
因此,Dataflow提供了可自定义的窗口触发模型,可以使用低水位做触发,也可以使用比如:定时触发,计数触发,计量触发,模式匹配触发或其它外部触发源,甚至各种触发条件的逻辑运算组合等机制来应对可能的需求。
增量更新模型
当一个特定时间窗口被触发以后,后续晚到的数据如何处理,如何对之前触发结算的结果进行修正,Dataflow在框架层面也提供了直接的支持,基本上包括三种策略:
- 丢弃:一旦特定窗口触发过,对应窗口的数据就丢弃,晚到的数据也丢弃
- 累计:触发过的窗口对应的数据保留(保留时间策略也可调整),晚到的数据更新对应窗口的输出结果
- 累计并更正:和累积模式类似,区别在于会先对上一次窗口触发的结果发送一个反相修正的信息,再输出新的结果,便于有需要的下游更正之前收到的信息
通常来说,丢弃策略实现起来最简单,既没有历史数据负担,对下游计算也不产生影响。但是前提条件是,数据乱序或者晚到的情况不严重或者不重要或者不影响最后的统计结果的精度
累计策略,从窗口自身的角度来说,实现起来也不复杂,除了内存代价会高一些,因为要保留历史窗口的数据,但是存在的问题是有些下游运算逻辑是基于上游运算结果计算的,下游计算逻辑能否正确处理重复输出的窗口结果,正确的进行去重或者累加,往往是个问题。
累计并更正策略,就窗口自身逻辑来说,实现上会更加复杂一点,但是下游计算逻辑的编写复杂性其实才是最难的。反相修正信息,是为了给下游提供更多的信息来解决上述窗口运算结果重复输出问题,增加了下游链路去重数据的能力,但实际上,这个逻辑需要下游计算逻辑的深度配合才能实现,个人觉得,除了部分计算拓扑逻辑相对简单的程序能够正确处理好这种情况,依赖关系稍微复杂一点的计算链路,靠反相修正信息,要做到正确的累加或去重还是很困难的。
举个例子,实际情况下,如果你自己写程序,比如在storm中计算当日UV信息,如果手工编程,你很可能采取的办法是:加大时间窗口(一个窗口一天),在一定时间段内保留多个窗口(比如今天和昨天的两个窗口),累计,定时覆盖输出全量结果。这种方式来解决正确累计和处理晚到数据。倘若这个UV信息要再进一步传导到下游计算任务,那下游计算任务最好也能处理全量覆盖这种场景,靠反向信息修正几乎是不可能的(因为UV信息通常不具备简单累加特性)。
相关研究,项目等
Spark Structured streaming
Spark 2.x版本,新增的structured streaming API,针对原先的streaming编程接口DStream的问题进行了改进,Dstream的问题包括:
- 框架自身只能针对Batch time进行处理,很难处理event time,很难处理延迟,乱序的数据
- 流式和批量处理的API还是不完全一致,两种使用场景中,程序代码还是需要一定的转换
- 端到端的数据容错保障逻辑需要用户自己小心构建,增量更新和持久化存储等一致性问题处理难度较大
这些问题其实也就是Dataflow中明确定位需要解决的问题。
通过Structured Streaming API,Spark计划支持和Dataflow类似的概念,如Event time based的窗口策略,自定义的触发逻辑,对输出(sink)模块的更新模式(追加,全量覆盖,更新)的built-in支持,更加统一的处理无边界数据和有边界数据等。
总体看来,Spark 2.x的structured streaming 模型和Dataflow有异曲同工之处,设计的目标看起来很远大,甚至给出了一份功能比较表格来证明其优越性
但上面的表格明显的是有“扬长避短”的偏向性的。比如在2.1的版本中,Structured Streaming还是Alpha版的,所支持的类Dataflow模型的功能还相对简单。2.2版本中,号称production了,不过,应该还是从稳定性的角度来说的,功能完整性方面还有一定差距。
比如还不支持session window,追加模式更新只能支持无聚合操作的场景,还有各种功能还停留在设想阶段,对于join等操作还有各种各样的限制等等,这些部分和dataflow业已实现的功能还有较大的差距。
对于exactly once发送的保障,Structured Streaming要求外部数据源具备offset定位的能力,再加上snapshot等机制来实现,而dataflow是通过对消息在框架内部进行持久化来实现replay,不依赖外部数据源的能力。
另外,个人理解像 prefix integrity, Transactional sink等概念,实际上是对上下游读写接口的一个封装,帮用户实现了一些业务逻辑(比如prefix integrity 的实现依托于于per key有序性的保证,这是由外部source源提供的保障,比如 file/kafka等;而Transactional sinks等则是比如对jdbc接口逻辑的封装),整体上偏外围功能一点,用这些特性来和其它框架比较不一定客观,因为设计理念不太不一样。
而在Dataflow的模型设计中,用户能更加细化的定义每个环节的步骤和设置,所以没有把一些逻辑替用户实现,更多的是以模块化的方式,留给用户去自己选择,而Structured steaming则把很多事情包办了,定制的余地较小,灵活性应该会差一些,不过这也给程序的自动优化带来了一些便利
Beam
Beam https://github.com/apache/beam 是由谷歌发起的apache 项目,基本来说就是实现dataflow编程模型的SDK项目,目标是提供一个high level的统一API编程接口,后端的执行引擎支持对接 APEX/Spark/Flink/Cloud dataflow
目前的编程语言支持Java和Python,2017年5月发布了第一个稳定版本2.0.0。
这个项目的前景如何,不太好说,单就适配各个后端的角度来说,就Spark后端来说,在spark 1.x时代,这种high level的编程模型抽象是对spark编程模型的一种add on,有一定的附加价值,但是按照spark 2.x structured streaming的发展路线来说,这一层抽象就稍微显得有些多余了。而基于Java的语法,在表达的简洁性上,相比scala也会带来一些额外的代价。
Flink
Dataflow的核心就是窗口和触发模型,而Flink在这两方面的实现,最接近Dataflow的理论原型,事件时间驱动,各种窗口模型,自定义触发和乱序/晚到数据的处理等等。
Flink的Data Streaming API通过定义window方法,和window内的数据需要使用的聚合函数比如:reduce,fold,window(前两者增量,后者全量),以及窗口触发(Trigger)和窗口内数据的淘汰(Evictor)方法,让用户可以实现对Dataflow模型中定义的场景的灵活处置,比如:需要在大数据量,大窗口尺度内实现实时连续输出结果的目的。通过allow late数据的时间范围来处理晚到数据。
不过晚到数据会触发聚合结果的再次输出,这个和Dataflow的模型不同的是,Flink本身是不提供反向信息输出的,需要业务逻辑自行做必要的去重处理。
对于Flink的实现,个人比较赞同的一点,是对数据的聚合和淘汰方式,给用户留下了足够灵活的选择,毕竟在工程实践中,长时间,大窗口,连续结果输出这种场景很常见,比如实时统计一天之类各个小时段的PV/UV,5秒更新一次结果。这种情况下,要避免OOM,还要正确处理晚到数据,追数据等问题,预聚合和提前触发的能力就必不可少了。
至于SQL化这条路,Flink的SQL语法解析和优化是依赖Apache Calcite实现的,而Calcite对window语法的支持才刚刚开始,所以FlinkSQL目前还不支持Streaming模型
整体感觉Flink目前在Dataflow模型思想方面实现的成熟度比Spark Structured Streaming要好
华为的StreamCQL on Storm
华为的StreamCQL方案,是构建在Storm之上的,简单的说就是提供了一个流式SQL的编程接口,执行时,底层翻译成Storm的拓扑逻辑提交执行。
整体上,StreamCQL做的好的地方是,SQL的支持比较完整,其它框架,在Stream这个场景,SQL的支持,或多或少还在开发完善中。
但StreamCQL最大的问题,是它的编程模型,和Dataflow的模型还有很大的差距。
整体上来说,StreamCQL的框架逻辑,就是使用窗口来buffer一部分数据,然后当窗口结束条件满足时,释放出这批数据给下游触发一次计算流程。
粗看和Dataflow没有太大的区别,但实际上,最主要的差距,是StreamCQL对窗口模型的定义,其次是触发和数据更新模型的缺失。
StreamCQL的窗口模型,支持Batch(也就是固定间隔窗口)和Slide,但是窗口的划分默认是基于处理时间Process Time的!!!而且,虽然窗口内的数据可以再细分Partition,但窗口只有一个。。。
不能同时处理几个窗口,意味着无法处理数据乱序或者晚到的情况。而Slide窗口的定义,也和主流的Slide窗口定义不同,每次对下游更新离开窗口范围的数据,看起来更像一个FIFO Queue的实现。
尽管可以使用Trigger关键字,将Batch窗口的触发条件改为消息中的某一个字段或者表达式,从而通过指定事件时间字段,近似的达到基于事件时间的窗口划分。
但是,实际上,因为单一的窗口机制,这样做,也只能处理事件源严格递增的场景。而现实情况中,来自不同客户端的事件,时间必然是乱序的,实时流计算的来源也主要是分布式消息队列(如kafka),进一步导致全局的无序,所以现实中,基本是不可能存在消息中事件时间严格递增的场景。
此外,由于缺乏灵活的数据更新和淘汰方式的定义,StreamCQL的主流程基本上是Buffer一堆数据,然后计算加淘汰这批数据,所以,缺乏数据预聚合的能力,这就导致窗口范围内所有的数据在窗口关闭之前,都必须保存在内存中。
因此即使是事件时间严格递增或者只关心Process Time的场景,Window的范围也不能太大,否则很容易超过内存限制,造成OOM,而实际上,多数场景,只需要保留增量聚合后的结果数据就足够了。
总体来说,StreamCQL的SQL语法比较完善,但计算模型在理论和架构实现方面存在较大的不足,所以如果不加改造,在实际工程应用中很难有大的做为。
参考资料
- dataflow论文 : The Dataflow Model- A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing
- https://cloud.google.com/dataflow/blog/dataflow-beam-and-spark-comparison
- https://spark-summit.org/2016/events/structuring-spark-dataframes-datasets-and-streaming/
- http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
- https://databricks.com/blog/2016/07/28/continuous-applications-evolving-streaming-in-apache-spark-2-0.html
- https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101
- https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102
- https://cloud.google.com/dataflow
- http://beam.apache.org/
- https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html
- https://github.com/HuaweiBigData/StreamCQL
常按扫描下面的二维码,关注“大数据务虚杂谈”,务虚,我是认真的 ;)