上篇讲了operator state在恢复(重启/故障恢复)时要么使用evenly distributed策略,要么使用union策略,来重启operator的并发实例。
operator state支持的第三种类型是 Broadcast State。引入该类型state是为了支持一个流中的一些数据需要广播到所有流中的场景,这些数据会被存储在本地,并应用在另一些流的所有数据上以便进行处理。例如一个很自然的例子,我们有一个包含一系列规则的很缓慢的流,我们想要将这些规则应用到其他流的所有数据上。把这个例子记在脑子里,broadcast state与其他operator state不同点在于:
- 它有一个map结构
- 它仅能在某些特定的operator中使用,operator需要既可以将 broadcasted stream作为输入,也可以将 non-broadcasted 流作为输入
- 这些特定的操作符可以有多个不同的 broadcast state,每个state都有自己的名称。
Provided APIs
在展示完整功能前,我们先从一个示例开始讲解flink提供的api。我们假设我们的数据流数据含有color与shape两个属性。我们想要找到一些特性模式的数据对,如:color属性相同,且shape符合某些规则,如先出现矩形再出现三角形。我们假设这些规则是随时间变化的。
在这个示例中,第一个流包含 Item类型的数据,他有 Color 与 Shape两个属性。另一个流包含Rule类型数据。
我们从包含Item的流入手,我们需要将其根据Color进行key操作,因为我们要找的数据对的color都是一样的。这样做之后,会保证具有相同color的数据会分配到相同的物理机器上。
// key the shapes by color
KeyedStream<Item, Color> colorPartitionedStream = shapeStream
.keyBy(new KeySelector<Shape, Color>(){...});
再来看 Rule 数据流,包含Rule的数据流需要被广播broadcast到所有的下游任务中去,并且这些任务需要将其存储到本地,这样就可以使用本地数据与Item数据做计算
下面的小段程序会:1)广播rule数据流 2)使用提供的MapStateDescriptor 来创建rule需要存储到的broadcast state对象。
// a map descriptor to store the name of the rule (string) and the rule itself.
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Rule>() {}));
// broadcast the rules and create the broadcast state
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
.broadcast(ruleStateDescriptor);
最后,为了在Item流上的每个Item中都应用Rule进行计算,我们需要:
1.连接这两个流
2.定义我们模式匹配的逻辑
可以使用 connect() 方法来连接一个(keyed/non-keyed)流与 Broadcast 流。在非 broadcast流上调用connect() 方法,broadcast流作为参数传入。这个方法会返回 BroadcastConnectedStream 类,在这个类上,我们可以调用 process() 方法传入 CoProcessFunction的实现类。我们可以在这个函数内实现我们的逻辑。函数的类型取决于non-broadcast 流的情况:
- 如果它是 keyed 流,那么函数类型就是 KeyedBroadcastProcessFunction
- 如果它是non-keyed 流,那么函数类型是 BroadcastProcessFunction
假设,我们的non-broadcast流是 non-keyed 类型。
注意:connect方法需要由 non-broadcast 流来调用,broadcast流作为参数
DataStream<String> output = colorPartitionedStream
.connect(ruleBroadcastStream)
.process(
// type arguments in our KeyedBroadcastProcessFunction represent:
// 1. the key of the keyed stream
// 2. the type of elements in the non-broadcast side
// 3. the type of elements in the broadcast side
// 4. the type of the result, here a string
new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
// my matching logic
}
);
BroadcastProcessFunction 与 KeyedBroadcastProcessFunction
在 CoProcessFunction 接口中,有两个处理方法需要实现:processBroadcastElement() 方法,处理broadcast 流中的数据;processElement() 方法处理 non-broadcast流的数据。两个方法的详细方法签名如下:
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}
首先需要注意的是:两个接口都需要实现 processBroadcastElement() 方法来处理 broadcast流中的数据,以及 processElement() 方法来处理 non-broadcast流中的数据。
两个方法的区别在于他们入参的context 的不同。处理non-broadcast 的方法的入参为 ReadOnlyContext,处理 broadcast 的方法的入参为 Context。
两个context都有如下特点
- 获取 broadcast.state 的能力:ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
- 允许查询元素的时间戳:ctx.timestamp(
- 获取当前的watermark:ctx.currentWatermark()
- 获取当前processing time:ctx.currentProcessingTime()
- 发射数据到 side-output:ctx.output(OutputTag<X> outputTag, X value)
getBroadcastState()方法中的 stateDescriptor 需要与 .broadcast(ruleStateDescriptor) 中的相同。
两个context的不同点在于两者对 broadcast state 的访问级别。 处理broadcast 流的方法的context对broadcast state 是读写权限,而处理 non-broadcast流的方法的context对broadcast state是只读权限。这样做的原因是:在Flink中,是没有跨任务数据交换(no cross-task communication)的。因此,为了保证在操作符的所有并发实例中的broadcast state都是相同的,我们仅给broadcast流读写权限,这样所有任务中的broadcast的数据都会相同,我们也能保证应用于non-broadcast数据上的计算在所有task中都是相同的。不这么做,就无法达到一致性的保证,导致不一致且很难调试的结果。
注意:processBroadcast() 方法中实现的逻辑也需要在所有并发操作实例中保持一致性。
最终,由于 KeyedBroadcastProcessFunction 是在 keyed 流上进行操作,它提供了 BroadcastProcessFunction 没有的一些功能:
- processElement()中的 ReadOnlyContext可以访问Flink提供的 timer server服务,它允许注册一个 event time/processing time的时间回调函数。当触发回调时,会调用onTime()方法,该方法的 OnTimeContext 参数包含了 ReadOnlyContext的所有功能,再加上:
- 判断该timer是基于event time还是processing time的
- 查询timer关联的key
2.processBroadcastElement()方法中的 Context 有 applyToKeyedState(StateDescriptor<S, VS> stateDescriptor, KeyedStateFunction<KS, S> function) 方法。该方法可以注册一个
KeyedStateFunction 应用于stateDescriptor所指代的state,所有key上的符合要求的state都会应用此functino。
注意:只可以在 KeyedBroadcastProcessFunction 中的 processElement() 中注册timer。不可以在 processBroadcastElement()中注册,因为这个方法中处理的是broadcast element,没有关联的key。
回到我们一开始的例子,我们的 KeyedBroadcastProcessFunction 可以这样写:
new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
// store partial matches, i.e. first elements of the pair waiting for their second element
// we keep a list as we may have many first elements waiting
private final MapStateDescriptor<String, List<Item>> mapStateDesc =
new MapStateDescriptor<>(
"items",
BasicTypeInfo.STRING_TYPE_INFO,
new ListTypeInfo<>(Item.class));
// identical to our ruleStateDescriptor above
private final MapStateDescriptor<String, Rule> ruleStateDescriptor =
new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Rule>() {}));
@Override
public void processBroadcastElement(Rule value,
Context ctx,
Collector<String> out) throws Exception {
ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);
}
@Override
public void processElement(Item value,
ReadOnlyContext ctx,
Collector<String> out) throws Exception {
final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc);
final Shape shape = value.getShape();
for (Map.Entry<String, Rule> entry :
ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {
final String ruleName = entry.getKey();
final Rule rule = entry.getValue();
List<Item> stored = state.get(ruleName);
if (stored == null) {
stored = new ArrayList<>();
}
if (shape == rule.second && !stored.isEmpty()) {
for (Item i : stored) {
out.collect("MATCH: " + i + " - " + value);
}
stored.clear();
}
// there is no else{} to cover if rule.first == rule.second
if (shape.equals(rule.first)) {
stored.add(value);
}
if (stored.isEmpty()) {
state.remove(ruleName);
} else {
state.put(ruleName, stored);
}
}
}
}
重要考量
介绍完提供的API后,接下来的这部分用于提醒你,在使用broadcast state时,需要记住这些重要的事情:
- 没有跨任务(实例)间的数据交换(no cross-task communication):之前已经提过没什么在 (Keyed)-BroadcastProcessFunction 中仅有处理 broadcast数据的方法可以修改 broadcast state 的内容。除此之外,使用者需要保证所有并发实例中修改broadcast state时的逻辑都是一样的。否则,不同的实例就会有不同的state内容,导致计算结果的不一致。
- 不同任务(实例)中broadcast state的内容的顺序可能会不一致:尽管广播一个流的数据会保证所有数据将会(最终)到达所有下游任务,但是数据到达每一个任务的顺序可能不一样。因此,state更新时,绝对不可以根据数据的顺序来更新(而是根据数据的属性)
- 所有的任务都会checkpoint它们的 broadcast state:尽管所有的任务(实例)的broadcast state都保存着相同的数据,但是在checkpoint时,所有的任务(实例)都会对它们所维持的broadcast state进行备份,而不是只选择其中的一个实例的broadcast state进行备份。这样的设计是为了避免在故障恢复时,所有的任务都读取同一个文件,尽管这样做会带来增加state快照的消耗问题,这个消耗与并发度p相关。Flink会保证,在重启/伸缩 应用时,没有副本并且不会丢失数据。为了以相同的并发度或更少的并发度来重启,每个task都会读取它自己状态快照。当以更多的并发度来重启时,每一个task读取自己的状态快照,剩下的任务(p_new - p_old)以轮询的方式依次读取旧任务的checkpoint。
- No RocksDB state backend:broadcast state会保存在内存中,需要完成相应的内存配置。这适用于所有的 operator state。