0x01、前言
- 1、Storm作为经典流计算框架,由于不支持Window,更别说Event Time了,虽说Trident支持了exactly-once语义,但是性能会急剧下降,再者Acker支持的at-least-once语义性能也同样存在问题(每条用户消息都会产生一条对应的系统消息到acker,同时有额外的计算消耗,并且acker消息会消耗大量的网络带宽。)。这样一对比Flink,总感觉Storm会被加速淘汰掉。
- 2、听说Storm下个大版本,将会升级为JStorm,同时JStorm不断重构和加入新功能,例如:Window、exactly-once。又对Storm重燃信心了,遗留的老代码或许经过很小的改动,会跑的更加欢快了呢( ̄▽ ̄)"。
0x02、Window
- 1、Storm窗口重构升级,Storm不会再憋数据,而是增量计算。因此每来一条消息都会直接,触发计算以及更新窗口状态。如果一条消息属于多个窗口,那么每个窗口都会计算一次。
- 2、执行流程
1、创建WindowedBoltExecutor
2、到达一条tuple,都会先抽取消息的时间(processing time / event time),然后为这条消息分配窗口(一个或多个,取决于窗口类型)。
3、对这条tuple,遍历步骤2分配的窗口,调用execute(T tuple, Object state, TimeWindow window)进行计算。 在计算时,如果检测到这个window对应的用户状态为空,则调用Object initWindowState()初始化window状态。
4、检查window是否到期,如果到期,则调用void purgeWindow(Object state, TimeWindow window),同时删除对应的window状态。(purge:当window到期时被调用。用户可以对此时的window状态做自定义操作,如存储到外部系统等)
0x03、Event Time
Storm的window也支持event time了。TimestampExtractor、WatermarkGenerator、乱序消息的处理、Retractor当然不会少了。
1、WatermarkGenerator
使用event time的时候,需要定义watermark,否则jstorm会使用默认的watermark实现:PeriodicWatermarkGenerator。 即定期往下游发送watermark。
watermark标识了一个流是一直前进,不可回退的。即,假设当前收到了2017-01-13 18:00:00的watermark,那么意味着上游后续要发送的所有数据都不 会早于18:00:00。如果早于这个时间,则认为是late element。处理策略参见下面late element。
当收到一条消息时,jstorm同时会调用WatermarkGenerator#onElement
方法,以更新它内部的timestamp。同时,通过watermarkInterval来定时 发送watermark和检查event windows是否需要purge。
jstorm支持几种通过watermark触发window purge的策略: * GLOBAL_MAX_TIMESTAMP 全局最大时间戳。这种策略会始终使用接收到的所有task中的最大的时间戳。如果这个时间戳>窗口边界,就会purge window * MAX_TIMESTAMP_WITH_RATIO 这个策略在上面的基础上,还指定了收到上游watermark的task的比例,默认为0.9。即,只有当时间戳>窗口边界, 且收到了90%以上的task的watermark,才会purge window * TASK_MAX_GLOBAL_MIN_TIMESTAMP 这个策略,会记录所有上游task发送的watermark的值,然后取所有task的watermark的最小值作为当前时间戳, 以防止各别task的timestamp很大导致窗口被过早purge。这种策略是jstorm的默认策略。
2、乱序消息的处理
event time的场景中,消息的乱序几乎是必然会出现的。 在上面的场景中即为,当前watermark已经到达18:00:00,但是下一条消息到达时,发现它的时间戳是17:30:00。这就是一条乱序的消息。
默认情况下,jstorm会丢弃这条消息。也可以通过实现Retractor
接口来重新计算一个已经purge的窗口值。见下面。
3、Retractor
这个接口只有一个方法:
void retract(Tuple element, Collection<TimeWindow> windows);
即,这个element所属的窗口为windows这个集合,由用户指定如何处理这条乱序消息。
以word count为例,我们可能每隔1分钟计算word count,然后最终把每个window下的word count输出到hbase或者tair中。
在watermark=18:00:00时,假设之前计算出来的17:30分这个窗口的word: aa的count=100。接下来我们又收到一条消息,timestamp=17:30:00,word=aa。
此时我们需要更新HBase/Tair中17:30分这个窗口中对应的aa的count值。那么我们可以在retract方法中,直接去update HBase或者tair。
当然,如果经常会发生乱序,一条条处理效率显然是太慢了。建议用户可以保存一个Map<TimeWindow, List<Tuple>>
(或者使用guava中的Multimap)对象, 当达到一定数量再触发一次计算。
不过说了这么多,使用retractor的一个最大前提还是:计算结果必须是可被更新的。否则就只能丢弃然后打个日志了。