Flamestream简单介绍

1. 核心思想

通过把状态变成流的一部分,引入循环图,使得上一个item的状态可以作为下一个item的输入,使得有状态算子变成无状态算子。
B(x,h)=Y \Longrightarrow B(x,s_t)=(y,s_{t+1})
x,Y be input and output items, h,the state handler and s_t, the state object at time t.B becomes stateless and state management is done on the system side. This change allows the system to implement fault tolerance mechanisms, but it also opens the opportunity to implement deterministic processing

将状态当做流来处理

2. 计算逻辑图

Map applies a user-defined function to the payload of an input item and returns
a (possibly empty) sequence of data items with transformed payloads.
Broadcast replicates an input item to the specified number of operations.
Merge operation is initialized with the specified number of input nodes. It sends
all incoming data to the output.
Grouping constructs a single item containing a set of consecutive items that
have the same value of partition function. The maximum number of items
that can be grouped is specified as a parameter WindowSize.
The output item of the grouping has the same ordering label as the last
item in the output group. Groupings of different partitions are independent.
Grouping is the only operation that has a state.

MapReduce的逻辑执行图.png

在输入前front加入meta信息用来维护全局的顺序,item=(Payload, Meta)。
barrier用来过滤grouping算子由于输入元组乱序导致的错误输出。

  • The first map operation outputs mapped items according to map stage of
    MapReduce model.
  • The grouping with WindowSize = 2 groups the accumulator with next
    mapped item.
  • The combine map produces a new state of accumulator to be sent to grouping.
  • The final map converts accumulator into final reduce output.


    reduce阶段的计算

    排序规则保证每个accumulator item都在下一个没有被combined mapped item之前到达grouping。也就是说前一个item的状态和下个一item交替到达grouping。

3. 对乱序的处理:发送tomb使得乱序的输出无效

3.1无序产生的原因

每个operation在入口都会有一个用户定义的hash function(balance function)。由balance function决定将数据路由到对应的partition中,每个partition由单一的结点进行处理。对于reduce阶段,由于在map后的shuffle过程中,属于同一partition的数据来源于不同的上游物理节点,不能保证partition内的item是按序到来的。

3.2grouping opearation对于乱序的处理

只有grouping operation是有状态的,因此grouping对输入顺序敏感。当grouping接收到的items的时间小于已经被处理过的items,也就是说到来的的item是乱序时,则grouping算子将该乱序的item插入相应的位置,且对应顺序之前产生的item,则会相应的发送对应的tombstone,表示该item是无效的。tombstone的payload与无效item相同,仅在meta数据中,tombstone会被标记。这么做是为了保证tombstone在通过partition function时能被分配到相同的partition,从而保证与对应的由于乱序导致计算错误的item走相同的路径。


乱序的处理

如上图所示,grouping已经处理到item序号为6的元组,这时收到item为4的元组,则超前计算的item 4、5被视为无效,同时grouping先下游发送无效元组对应的tomb,之后发送正常的元组4。对于乱序产生的无效item,则会通过在下节介绍barrier过滤无效信息

4.使用barrier过滤

4.1全局顺序的定义

顺序定义

如图2所示,1'是1的衍生item,2'是2 的衍生item。即衍生item会被放在原始item之后。
meta用于维护全局顺序,在source节点处加入,组成<payloads,meta>的元组对,称作一个item。
Meta := (GlobalTime, ChildIds[ ], Trace, IsTombstone)

每个原始item都有唯一的GlobalTime,如果该item进过operation产生衍生item,则会用childids[]数组记录衍生的item与operation的对应关系,用以维护衍生item的顺序。通过比较GlobalTime, ChildIds[ ]就能保证全局顺序。
trace用来确认item和tombstone的对应关系。由于同一个原始item,在经过operation时会产生衍生item,衍生item与原始item有相同的globaltime,为了区分不同衍生item与其对应的tomb,需要比较payload的内容,为了简化计算,加入了trace字段。trace是该item进过的所有物理节点id的异或,失效item与其对应的tombstone有相同的trace,保证能快速匹配。

4.2记录系统正在处理的item的最早时间

Lemma 1. For any data item D let G(D) be its global time. If data item D has global time G(D) < G(F ) for each in-flight element F, then all tombstones for that item had already arrived at the barrier.

因此barrier的输出要保证输出的item的GT小于所有正在被处理的item,即可保证barrier收到item可能产生的tombstone,从而过滤掉无效item。


使用acker跟踪

采用与storm类似的track机制跟踪元组,同一个item在发送和接受时会向acker发送相同的ackval,为了保证处理信息不丢失,算子在处理完成后先向acker发送send的ack,在发送receive的ack,如图6所示,当item到达barrier时,GT(global time)对应xor值是0。整个流处理系统正在处理中的item最小GT,即使acker中 XOR为非0值的最小GT.在图6中为21。

参考文献

  1. 项目网站:https://research.jetbrains.org/groups/mach_learning/projects?project_id=18

  2. 较为详细介绍了IOP,OOP的优缺点,提出了基于OOP处理乱序的一种方法,介绍grouping算子的具体工作原理
    An optimistic approach to handle out-of-order events within analytical stream processing

  3. 在2的基础上进一步实现确定性流处理
    Deterministic Model for Distributed Speculative Stream Processing

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容