译:Flink---状态

Flink 1.7

键控状态和操作状态


Flink中有两种基本的状态类型:键控状态和运算符状态

键控状态

键控状态总是与键相关并且只能在键控流的函数和运算符中使用

你可以将Keyed State视为已分区或分片的操作符状态,每个键只有一个状态分区。每个键控状态在逻辑上绑定到<parallel-operator-instance,key>的唯一复合,并且由于每个键“属于”键控运算符的一个并行实例,我们可以将其简单地视为<operator,key >

键控状态进一步组织成所谓的键控组。键控组是Flink可以重新分配密钥状态的原子单元;键控组与定义的最大并行度完全一样多。在执行期间,键控运算符的每个并行实例都使用一个或多个键控组的键。

运算符状态

使用运算符状态(或非键控状态), 每个运算符状态绑定到一个运算符实例。Kafka连接器在Flink中是一个使用运算符状态的好例子。任一个Kafka消费者并行实例都包含一个主题与偏移映射作为运算符状态。

运算符状态接口支持当并行度改变时通过并行操作符重新分配实例。进行重新分配有多种不同的方案

原生及托管状态


键控状态及运算符状态以两种方式存在:托管状态及原生状态

托管状态由Flink运行时控制的数据结构表示,例如内部的hash表,或者RocksDB.例如“ValueState”,"ListState"等。Flink将state编码并写入checkpoints中

原生状态是把运算符保存在自己数据结构中的状态。当状态检查时,它们仅将字节序列写入到checkpoint。Flink不感知状态的数据结构,它只能看到原生的字节数据

所有的datastream函数都可以使用托管状态,但是原生状态仅能在实现运算符时使用。建议使用托管状态而非原生状态,因为在托管状态下,当并行度改变时Flink可以自动重新分配状态,也可以更好的做内存管理。

注意:如果你的托管状态需要自定义序列化逻辑,请查看相关引导以保证未来的兼容性。Flink的默认序列化不需要特殊对待

使用托管监控状态


监控托管转台接口提供不同的状态类型的访问,这些状态都限定为当前输入元素的键。这意味着这些状态类型只能在监控流中使用,监控流使用stream.keyBy()来创建
现在,我们首先来看一下可用的不同状态类型,之后我们可以看到如何在程序中使用它们。可用的原生状态:

  • ValueState<T>: 它保留了可以更新和检索的值(如上所述,作用于输入元素的键的范围,因此操作看到的每个键可能有一个值)使用update(T)更新值,使用T value()函数检索
  • ListState<T>: 这保留了元素列表。您可以追加元素并在所有当前存储的元素上检索Iterable。使用add(T)或addAll(List <T>)添加元素,可以使用Iterable <T> get()检索Iterable。您还可以使用update覆盖现有列表(List <T>)
  • ReducingState<T>: 这保留了一个值,表示添加到状态的所有值的聚合。该接口类似于ListState,但使用add(T)添加的元素使用指定的ReduceFunction缩减为聚合
  • AggregatingState<IN, OUT>: 这保留了一个值,表示添加到状态的所有值的聚合。与ReducingState相反,聚合类型可能与添加到状态的元素类型不同。接口与ListState相同,但使用add(IN)添加的元素使用指定的AggregateFunction进行聚合
  • FoldingState<T, ACC>: 这保留了一个值,表示添加到状态的所有值的聚合。与ReducingState相反,聚合类型可能与添加到状态的元素类型不同。该接口类似于ListState,但使用add(T)添加的元素使用指定的FoldFunction折叠为聚合
  • MapState<UK, UV>: 这保留了映射列表。您可以将键值对放入状态,并在所有当前存储的映射上检索Iterable。使用put(UK,UV)或putAll(Map <UK,UV>)添加映射。可以使用get(UK)检索与用户密钥关联的值。可以分别使用entries(),keys()和values()来检索映射,键和值的可迭代视图

所有类型的状态还有一个方法clear(),它清除当前活动键的状态,即输入元素的键

注意: FoldingState和FoldingStateDescriptor在Flink 1.4中废弃了并且将在未来完全删除。请使用AggregatingState和AggregatingStateDescriptor

重要的是要记住,这些状态对象仅用于与状态接口。状态不一定存储在内部,但可能驻留在磁盘或其他位置。要记住的第二件事是,从状态获得的值取决于input元素的键。因此,如果涉及的键不同,则在一次调用用户函数时获得的值可能与另一次调用中的值不同

要获取状态句柄,您必须创建StateDescriptor。这保存了状态的名称(正如我们稍后将看到的,您可以创建多个状态,并且它们必须具有唯一的名称以便您可以引用它们),状态所持有的值的类型,并且可能是用户 - 指定的函数,例如ReduceFunction。根据要检索的状态类型,创建ValueStateDescriptor,ListStateDescriptor,ReducingStateDescriptor,FoldingStateDescriptor或MapStateDescriptor

使用RuntimeContext访问状态,因此只能在丰富的函数中使用。请参阅此处了解相关信息,但我们很快也会看到一个示例。 RichFunction中可用的RuntimeContext具有这些访问状态的方法:

  • ValueState<T> getState(ValueStateDescriptor<T>)
  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
  • ListState<T> getListState(ListStateDescriptor<T>)
  • AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
  • FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

这是一个示例FlatMapFunction,显示所有部件如何组合在一起:

public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

    /**
     * The ValueState handle. The first field is the count, the second field a running sum.
     */
    private transient ValueState<Tuple2<Long, Long>> sum;

    @Override
    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

        // access the state value
        Tuple2<Long, Long> currentSum = sum.value();

        // update the count
        currentSum.f0 += 1;

        // add the second field of the input value
        currentSum.f1 += input.f1;

        // update the state
        sum.update(currentSum);

        // if the count reaches 2, emit the average and clear the state
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                new ValueStateDescriptor<>(
                        "average", // the state name
                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
        sum = getRuntimeContext().getState(descriptor);
    }
}

// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(0)
        .flatMap(new CountWindowAverage())
        .print();

// the printed output will be (1,4) and (1,5)

这个例子实现了一个穷人的计数窗口。我们通过第一个字段键入元组(在示例中都具有相同的键1)。该函数将计数和运行总和存储在ValueState中。一旦计数达到2,它将发出平均值并清除状态,以便我们从0开始。注意,如果我们在第一个字段中具有不同值的元组,这将为每个不同的输入键保持不同的状态值

TTL状态


可以将生存时间(TTL)分配给任何类型的键控状态。如果配置了TTL并且状态值已过期,则将尽力清除存储的值,这将在下面更详细地讨论。

所有状态集合类型都支持每个条目的TTL。这意味着列表元素和映射条目将独立过期。

为了使用状态TTL,必须首先构建StateTtlConfig配置对象。然后,可以通过传递配置在任何状态描述符中启用TTL功能

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

它的配置项有一下几点需要考虑:

第一个参数newBuilder是强制性的,指定value有效期

更新类型配置为TTL刷新的时机(默认是OnCreateAndWrite)

  • StateTtlConfig.UpdateType.OnCreateAndWrite - only on creation and write access
  • StateTtlConfig.UpdateType.OnReadAndWrite - also on read access

状态可见性配置如果过期值尚未清除,是否在读取访问时返回它(默认情况下NeverReturnExpired)

  • StateTtlConfig.StateVisibility.NeverReturnExpired - 从不返回
  • StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 没有被清除时返回

在NeverReturnExpired的情况下,过期状态表现得好像它不再存在,即使它仍然必须被删除。
该选项对于在TTL之后必须严格用于读取访问的数据的用例是有用的,例如,应用程序使用隐私敏感数据

另一个选项ReturnExpiredIfNotCleanedUp允许返回过期状态当它没有被清除时

注意:

  • 状态后端存储上次修改的时间戳以及用户值,这意味着启用此功能会增加状态存储的消耗。
    堆状态后端存储一个额外的Java对象,其中包含对用户状态对象的引用和内存中的原始long值。
    RocksDB状态后端为每个存储值,列表条目或映射条目添加8个字节。
  • 目前仅支持参考处理时间的TTL
  • 尝试恢复先前未配置TTL的状态,使用启用TTL的描述符(反之亦然)将导致兼容性失败和StateMigrationException
  • TTL配置不是检查点或保存点的一部分,而是Flink如何在当前运行的作业中处理它的方式。
  • 仅当用户值序列化程序可以处理空值时,具有TTL的映射状态当前才支持空用户值。如果序列化程序不支持空值,
    它可以用NullableSerializer包装,代价是序列化形式的额外字节

清除过期状态

当前,仅在过期值独立读取时才可以移除,比如调用ValueState.value()

注意:这意味着默认情况下,如果未读取过期状态,则不会将其删除,从而可能导致状态不断增长。这可能在将来的版本中发生变化

此外,您可以在获取完整状态快照时激活清理,这将减小其大小。在当前实现下不清除本地状态,但是在从先前快照恢复的情况下它
不会包括已移除的过期状态。它可以在StateTtlConfig中配置

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot()
    .build();

该操作不支持在RocksDB状态后端中增量的checkpointing,更多的在后台自动清理过期状态的策略将在未来加入

使用托管运算符状态

要使用托管操作符状态,有状态函数可以实现更通用的CheckpointedFunction接口,或者ListCheckpointed <T extends Serializable>接口

CheckpointedFunction

CheckpointedFunction 接口提供对具有不同重新分发方案的非键控状态的访问,它需要实现以下两个方法

void snapshotState(FunctionSnapshotContext context) throws Exception;

void initializeState(FunctionInitializationContext context) throws Exception;

每当必须执行检查点时,都会调用snapshotState()。每次初始化用户定义的函数时,都会调用对应的initializeState(),即首次初始化函数时,或者当函数实际从早期检查点恢复时。鉴于此,initializeState()不仅是初始化不同类型状态的地方,而且还包括状态恢复逻辑的位置

目前,支持列表样式的托管操作符状态。该状态应该是一个可序列化对象的列表,彼此独立,因此有资格在重新缩放时重新分配。换句话说,这些对象是可以重新分配非键控状态的最精细的粒度。根据状态访问方法,定义了以下重新分发方案

  • Even-split redistribution: 每个运算符都返回一个状态元素列表。整个状态在逻辑上是所有列表的串联。在恢复/重新分发时,列表被平均分成与并行运算符一样多的子列表。每个运算符都获得一个子列表,该子列表可以为空,或包含一个或多个元素。例如,如果使用并行性1,运算符的检查点状态包含元素element1和element2,当将并行性增加到2时,element1可能最终在运算符实例0中,而element2将转到运算符实例1
  • Union redistribution: 每个运算符都返回一个状态元素列表。整个状态在逻辑上是所有列表的串联。在恢复/重新分配时,每个运算符都会获得完整的状态元素列表

下面是一个有状态SinkFunction的示例,它使用CheckpointedFunction缓冲元素,然后再将它们发送到外部世界。它演示了基本的 even-split再分配列表状态

public class BufferingSink
        implements SinkFunction<Tuple2<String, Integer>>,
                   CheckpointedFunction {

    private final int threshold;

    private transient ListState<Tuple2<String, Integer>> checkpointedState;

    private List<Tuple2<String, Integer>> bufferedElements;

    public BufferingSink(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        bufferedElements.add(value);
        if (bufferedElements.size() == threshold) {
            for (Tuple2<String, Integer> element: bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear();
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for (Tuple2<String, Integer> element : bufferedElements) {
            checkpointedState.add(element);
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Integer>> descriptor =
            new ListStateDescriptor<>(
                "buffered-elements",
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

        checkpointedState = context.getOperatorStateStore().getListState(descriptor);

        if (context.isRestored()) {
            for (Tuple2<String, Integer> element : checkpointedState.get()) {
                bufferedElements.add(element);
            }
        }
    }
}

initializeState方法将FunctionInitializationContext作为参数。这用于初始化非键控状态“容器”。
这些是ListState类型的容器,其中非键控状态对象将在检查点存储

注意状态是如何初始化的,类似于键控状态,StateDescriptor包含状态名称和有关状态所持有的值类型的信息

ListStateDescriptor<Tuple2<String, Integer>> descriptor =
    new ListStateDescriptor<>(
        "buffered-elements",
        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));

checkpointedState = context.getOperatorStateStore().getListState(descriptor);

状态访问方法的命名约定包含其重新分发模式,后跟其状态结构.例如,要在还原时使用联合重新分发方案的列表状态,请使用getUnionListState(descriptor)访问该状态。如果方法名称不包含重新分发模式,例如getListState(描述符),它只是意味着将使用基本的even-split再分配方案

在初始化容器之后,我们使用上下文的isRestored()方法来检查我们是否在失败后恢复。如果这是真的,即我们正在恢复,则应用恢复逻辑

如修改后的BufferingSink的代码所示,在状态初始化期间恢复的ListState保存在类变量中以供将来在snapshotState()中使用。在那里,ListState被清除前一个检查点包含的所有对象,然后填充我们想要检查点的新对象。

作为旁注,键控状态也可以在initializeState()方法中初始化。这可以使用FunctionInitializationContext提供的方式完成

ListCheckpointed

ListCheckpointed接口是CheckpointedFunction的一个更有限的变体,它仅支持在恢复时具有even-split再分配方案的列表样式状态。
它还需要实现两种方法

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;

void restoreState(List<T> state) throws Exception;

在snapshotState()上,运算符应该返回检查点的对象列表,restoreState必须在恢复时处理这样的列表。如果状态不可重新分区,
则始终可以在snapshotState()中返回Collections.singletonList(MY_STATE)

有状态源函数


与其他运算符相比,有状态的源需要更多的关注。为了使状态和输出集合的更新成为原子(在故障/恢复时exactly-once所需),
用户需要从源的上下文获取锁定。

public static class CounterSource
        extends RichParallelSourceFunction<Long>
        implements ListCheckpointed<Long> {

    /**  current offset for exactly once semantics */
    private Long offset;

    /** flag for job cancellation */
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<Long> ctx) {
        final Object lock = ctx.getCheckpointLock();

        while (isRunning) {
            // output and state update are atomic
            synchronized (lock) {
                ctx.collect(offset);
                offset += 1;
            }
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }

    @Override
    public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
        return Collections.singletonList(offset);
    }

    @Override
    public void restoreState(List<Long> state) {
        for (Long s : state)
            offset = s;
    }
}

当Flink完全确认检查点与外界通信时,某些运算符可能需要这些信息,在这种情况下,
请参阅org.apache.flink.runtime.state.CheckpointListener接口

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,084评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,623评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,450评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,322评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,370评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,274评论 1 300
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,126评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,980评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,414评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,599评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,773评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,470评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,080评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,713评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,852评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,865评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,689评论 2 354

推荐阅读更多精彩内容