1. 核心思想
通过把状态变成流的一部分,引入循环图,使得上一个item的状态可以作为下一个item的输入,使得有状态算子变成无状态算子。
be input and output items, ,the state handler and , the state object at time t.B becomes stateless and state management is done on the system side. This change allows the system to implement fault tolerance mechanisms, but it also opens the opportunity to implement deterministic processing
2. 计算逻辑图
Map applies a user-defined function to the payload of an input item and returns
a (possibly empty) sequence of data items with transformed payloads.
Broadcast replicates an input item to the specified number of operations.
Merge operation is initialized with the specified number of input nodes. It sends
all incoming data to the output.
Grouping constructs a single item containing a set of consecutive items that
have the same value of partition function. The maximum number of items
that can be grouped is specified as a parameter WindowSize.
The output item of the grouping has the same ordering label as the last
item in the output group. Groupings of different partitions are independent.
Grouping is the only operation that has a state.
在输入前front加入meta信息用来维护全局的顺序,item=(Payload, Meta)。
barrier用来过滤grouping算子由于输入元组乱序导致的错误输出。
- The first map operation outputs mapped items according to map stage of
MapReduce model. - The grouping with WindowSize = 2 groups the accumulator with next
mapped item. - The combine map produces a new state of accumulator to be sent to grouping.
-
The final map converts accumulator into final reduce output.
排序规则保证每个accumulator item都在下一个没有被combined mapped item之前到达grouping。也就是说前一个item的状态和下个一item交替到达grouping。
3. 对乱序的处理:发送tomb使得乱序的输出无效
3.1无序产生的原因
每个operation在入口都会有一个用户定义的hash function(balance function)。由balance function决定将数据路由到对应的partition中,每个partition由单一的结点进行处理。对于reduce阶段,由于在map后的shuffle过程中,属于同一partition的数据来源于不同的上游物理节点,不能保证partition内的item是按序到来的。
3.2grouping opearation对于乱序的处理
只有grouping operation是有状态的,因此grouping对输入顺序敏感。当grouping接收到的items的时间小于已经被处理过的items,也就是说到来的的item是乱序时,则grouping算子将该乱序的item插入相应的位置,且对应顺序之前产生的item,则会相应的发送对应的tombstone,表示该item是无效的。tombstone的payload与无效item相同,仅在meta数据中,tombstone会被标记。这么做是为了保证tombstone在通过partition function时能被分配到相同的partition,从而保证与对应的由于乱序导致计算错误的item走相同的路径。
如上图所示,grouping已经处理到item序号为6的元组,这时收到item为4的元组,则超前计算的item 4、5被视为无效,同时grouping先下游发送无效元组对应的tomb,之后发送正常的元组4。对于乱序产生的无效item,则会通过在下节介绍barrier过滤无效信息
4.使用barrier过滤
4.1全局顺序的定义
如图2所示,1'是1的衍生item,2'是2 的衍生item。即衍生item会被放在原始item之后。
meta用于维护全局顺序,在source节点处加入,组成<payloads,meta>的元组对,称作一个item。
每个原始item都有唯一的GlobalTime,如果该item进过operation产生衍生item,则会用childids[]数组记录衍生的item与operation的对应关系,用以维护衍生item的顺序。通过比较GlobalTime, ChildIds[ ]就能保证全局顺序。
trace用来确认item和tombstone的对应关系。由于同一个原始item,在经过operation时会产生衍生item,衍生item与原始item有相同的globaltime,为了区分不同衍生item与其对应的tomb,需要比较payload的内容,为了简化计算,加入了trace字段。trace是该item进过的所有物理节点id的异或,失效item与其对应的tombstone有相同的trace,保证能快速匹配。
4.2记录系统正在处理的item的最早时间
Lemma 1. For any data item D let G(D) be its global time. If data item D has global time G(D) < G(F ) for each in-flight element F, then all tombstones for that item had already arrived at the barrier.
因此barrier的输出要保证输出的item的GT小于所有正在被处理的item,即可保证barrier收到item可能产生的tombstone,从而过滤掉无效item。
采用与storm类似的track机制跟踪元组,同一个item在发送和接受时会向acker发送相同的ackval,为了保证处理信息不丢失,算子在处理完成后先向acker发送send的ack,在发送receive的ack,如图6所示,当item到达barrier时,GT(global time)对应xor值是0。整个流处理系统正在处理中的item最小GT,即使acker中 XOR为非0值的最小GT.在图6中为21。
参考文献
项目网站:https://research.jetbrains.org/groups/mach_learning/projects?project_id=18
较为详细介绍了IOP,OOP的优缺点,提出了基于OOP处理乱序的一种方法,介绍grouping算子的具体工作原理
An optimistic approach to handle out-of-order events within analytical stream processing在2的基础上进一步实现确定性流处理
Deterministic Model for Distributed Speculative Stream Processing