Flink-Streaming-State & Fault Tolerance-The Broadcast State Pattern

上篇讲了operator state在恢复(重启/故障恢复)时要么使用evenly distributed策略,要么使用union策略,来重启operator的并发实例。
operator state支持的第三种类型是 Broadcast State。引入该类型state是为了支持一个流中的一些数据需要广播到所有流中的场景,这些数据会被存储在本地,并应用在另一些流的所有数据上以便进行处理。例如一个很自然的例子,我们有一个包含一系列规则的很缓慢的流,我们想要将这些规则应用到其他流的所有数据上。把这个例子记在脑子里,broadcast state与其他operator state不同点在于:

  • 它有一个map结构
  • 它仅能在某些特定的operator中使用,operator需要既可以将 broadcasted stream作为输入,也可以将 non-broadcasted 流作为输入
  • 这些特定的操作符可以有多个不同的 broadcast state,每个state都有自己的名称。

Provided APIs


在展示完整功能前,我们先从一个示例开始讲解flink提供的api。我们假设我们的数据流数据含有color与shape两个属性。我们想要找到一些特性模式的数据对,如:color属性相同,且shape符合某些规则,如先出现矩形再出现三角形。我们假设这些规则是随时间变化的。
在这个示例中,第一个流包含 Item类型的数据,他有 Color 与 Shape两个属性。另一个流包含Rule类型数据。
我们从包含Item的流入手,我们需要将其根据Color进行key操作,因为我们要找的数据对的color都是一样的。这样做之后,会保证具有相同color的数据会分配到相同的物理机器上。

// key the shapes by color
KeyedStream<Item, Color> colorPartitionedStream = shapeStream
                        .keyBy(new KeySelector<Shape, Color>(){...});

再来看 Rule 数据流,包含Rule的数据流需要被广播broadcast到所有的下游任务中去,并且这些任务需要将其存储到本地,这样就可以使用本地数据与Item数据做计算
下面的小段程序会:1)广播rule数据流 2)使用提供的MapStateDescriptor 来创建rule需要存储到的broadcast state对象。

// a map descriptor to store the name of the rule (string) and the rule itself.
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
            "RulesBroadcastState",
            BasicTypeInfo.STRING_TYPE_INFO,
            TypeInformation.of(new TypeHint<Rule>() {}));
        
// broadcast the rules and create the broadcast state
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
                        .broadcast(ruleStateDescriptor);

最后,为了在Item流上的每个Item中都应用Rule进行计算,我们需要:
1.连接这两个流
2.定义我们模式匹配的逻辑

可以使用 connect() 方法来连接一个(keyed/non-keyed)流与 Broadcast 流。在非 broadcast流上调用connect() 方法,broadcast流作为参数传入。这个方法会返回 BroadcastConnectedStream 类,在这个类上,我们可以调用 process() 方法传入 CoProcessFunction的实现类。我们可以在这个函数内实现我们的逻辑。函数的类型取决于non-broadcast 流的情况:

  • 如果它是 keyed 流,那么函数类型就是 KeyedBroadcastProcessFunction
  • 如果它是non-keyed 流,那么函数类型是 BroadcastProcessFunction

假设,我们的non-broadcast流是 non-keyed 类型。
注意:connect方法需要由 non-broadcast 流来调用,broadcast流作为参数

DataStream<String> output = colorPartitionedStream
                 .connect(ruleBroadcastStream)
                 .process(
                     
                     // type arguments in our KeyedBroadcastProcessFunction represent: 
                     //   1. the key of the keyed stream
                     //   2. the type of elements in the non-broadcast side
                     //   3. the type of elements in the broadcast side
                     //   4. the type of the result, here a string
                     
                     new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
                         // my matching logic
                     }
                 );
BroadcastProcessFunction 与 KeyedBroadcastProcessFunction

在 CoProcessFunction 接口中,有两个处理方法需要实现:processBroadcastElement() 方法,处理broadcast 流中的数据;processElement() 方法处理 non-broadcast流的数据。两个方法的详细方法签名如下:

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {

    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;

    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {

    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;

    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}

首先需要注意的是:两个接口都需要实现 processBroadcastElement() 方法来处理 broadcast流中的数据,以及 processElement() 方法来处理 non-broadcast流中的数据。
两个方法的区别在于他们入参的context 的不同。处理non-broadcast 的方法的入参为 ReadOnlyContext,处理 broadcast 的方法的入参为 Context。
两个context都有如下特点

  • 获取 broadcast.state 的能力:ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
  • 允许查询元素的时间戳:ctx.timestamp(
  • 获取当前的watermark:ctx.currentWatermark()
  • 获取当前processing time:ctx.currentProcessingTime()
  • 发射数据到 side-output:ctx.output(OutputTag<X> outputTag, X value)

getBroadcastState()方法中的 stateDescriptor 需要与 .broadcast(ruleStateDescriptor) 中的相同。
两个context的不同点在于两者对 broadcast state 的访问级别。 处理broadcast 流的方法的context对broadcast state 是读写权限,而处理 non-broadcast流的方法的context对broadcast state是只读权限。这样做的原因是:在Flink中,是没有跨任务数据交换(no cross-task communication)的。因此,为了保证在操作符的所有并发实例中的broadcast state都是相同的,我们仅给broadcast流读写权限,这样所有任务中的broadcast的数据都会相同,我们也能保证应用于non-broadcast数据上的计算在所有task中都是相同的。不这么做,就无法达到一致性的保证,导致不一致且很难调试的结果。
注意:processBroadcast() 方法中实现的逻辑也需要在所有并发操作实例中保持一致性。

最终,由于 KeyedBroadcastProcessFunction 是在 keyed 流上进行操作,它提供了 BroadcastProcessFunction 没有的一些功能:

  1. processElement()中的 ReadOnlyContext可以访问Flink提供的 timer server服务,它允许注册一个 event time/processing time的时间回调函数。当触发回调时,会调用onTime()方法,该方法的 OnTimeContext 参数包含了 ReadOnlyContext的所有功能,再加上:
  • 判断该timer是基于event time还是processing time的
  • 查询timer关联的key

2.processBroadcastElement()方法中的 Context 有 applyToKeyedState(StateDescriptor<S, VS> stateDescriptor, KeyedStateFunction<KS, S> function) 方法。该方法可以注册一个
KeyedStateFunction 应用于stateDescriptor所指代的state,所有key上的符合要求的state都会应用此functino。

注意:只可以在 KeyedBroadcastProcessFunction 中的 processElement() 中注册timer。不可以在 processBroadcastElement()中注册,因为这个方法中处理的是broadcast element,没有关联的key。

回到我们一开始的例子,我们的 KeyedBroadcastProcessFunction 可以这样写:

new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {

    // store partial matches, i.e. first elements of the pair waiting for their second element
    // we keep a list as we may have many first elements waiting
    private final MapStateDescriptor<String, List<Item>> mapStateDesc =
        new MapStateDescriptor<>(
            "items",
            BasicTypeInfo.STRING_TYPE_INFO,
            new ListTypeInfo<>(Item.class));

    // identical to our ruleStateDescriptor above
    private final MapStateDescriptor<String, Rule> ruleStateDescriptor = 
        new MapStateDescriptor<>(
            "RulesBroadcastState",
            BasicTypeInfo.STRING_TYPE_INFO,
            TypeInformation.of(new TypeHint<Rule>() {}));

    @Override
    public void processBroadcastElement(Rule value,
                                        Context ctx,
                                        Collector<String> out) throws Exception {
        ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);
    }

    @Override
    public void processElement(Item value,
                               ReadOnlyContext ctx,
                               Collector<String> out) throws Exception {

        final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc);
        final Shape shape = value.getShape();
    
        for (Map.Entry<String, Rule> entry :
                ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {
            final String ruleName = entry.getKey();
            final Rule rule = entry.getValue();
    
            List<Item> stored = state.get(ruleName);
            if (stored == null) {
                stored = new ArrayList<>();
            }
    
            if (shape == rule.second && !stored.isEmpty()) {
                for (Item i : stored) {
                    out.collect("MATCH: " + i + " - " + value);
                }
                stored.clear();
            }
    
            // there is no else{} to cover if rule.first == rule.second
            if (shape.equals(rule.first)) {
                stored.add(value);
            }
    
            if (stored.isEmpty()) {
                state.remove(ruleName);
            } else {
                state.put(ruleName, stored);
            }
        }
    }
}

重要考量


介绍完提供的API后,接下来的这部分用于提醒你,在使用broadcast state时,需要记住这些重要的事情:

  • 没有跨任务(实例)间的数据交换(no cross-task communication):之前已经提过没什么在 (Keyed)-BroadcastProcessFunction 中仅有处理 broadcast数据的方法可以修改 broadcast state 的内容。除此之外,使用者需要保证所有并发实例中修改broadcast state时的逻辑都是一样的。否则,不同的实例就会有不同的state内容,导致计算结果的不一致。
  • 不同任务(实例)中broadcast state的内容的顺序可能会不一致:尽管广播一个流的数据会保证所有数据将会(最终)到达所有下游任务,但是数据到达每一个任务的顺序可能不一样。因此,state更新时,绝对不可以根据数据的顺序来更新(而是根据数据的属性)
  • 所有的任务都会checkpoint它们的 broadcast state:尽管所有的任务(实例)的broadcast state都保存着相同的数据,但是在checkpoint时,所有的任务(实例)都会对它们所维持的broadcast state进行备份,而不是只选择其中的一个实例的broadcast state进行备份。这样的设计是为了避免在故障恢复时,所有的任务都读取同一个文件,尽管这样做会带来增加state快照的消耗问题,这个消耗与并发度p相关。Flink会保证,在重启/伸缩 应用时,没有副本并且不会丢失数据。为了以相同的并发度或更少的并发度来重启,每个task都会读取它自己状态快照。当以更多的并发度来重启时,每一个task读取自己的状态快照,剩下的任务(p_new - p_old)以轮询的方式依次读取旧任务的checkpoint。
  • No RocksDB state backend:broadcast state会保存在内存中,需要完成相应的内存配置。这适用于所有的 operator state。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容