【Flink 精选】基于 state 缩放原理的作业弹性

flink 作业的弹性是基于 state 缩放,探讨 state 缩放和算子并发度的关系。


1.作业弹性与 state 缩放的概念

1.1 概念

flink 作业弹性是指作业的扩缩容,实质上就是算子并发度的缩容和扩容。state 缩放是指状态的缩容和扩容。下面我们主要讨论

1.2 场景

Flink 作业已经运行了一段时间,用户扩容作业的主要手段:增大算子并发度,提高作业性能和吞吐量,如下图所示。

image

2.state 的持久化和扩容分配

2.1 state 持久化

(1)state 持久化的背景

在实时计算场景中,数据流会源源不断进入 flink 系统,每条数据都会触发作业计算

问题1:如果某作业需要进行聚合 count 计算,每次计算是将历史的所有数据重新计算,还是每次计算是基于上一次的计算结果进行增量计算。

解答1:flink 的聚合 count 计算(状态)是基于上一次结果的增量计算

问题2:上一次计算结果的缓存数据保存在哪里?

解答2:如果缓存数据保存在内存中,则在节点故障恢复的时候,需要重新计算历史的所有数据。这是不可取的。因此,为了提高缓存的可靠性和性能,缓存数据需要进行本地持久化

(2)state 持久化的原理

以 RocksDB + HDFS 存储为例,state 存储有两个阶段:首先缓存数据存储到本地RocksDB,然后异步保存到分布式文件 HDFS。优点是既克服 HeapStateBackend 的内存大小和可靠性的问题,也避免了 FsStateBackend 的产生大量网络 IO 问题。

image

Flink 有4种 state 的存储方式

① 基于内存的HeapStateBackend: 在debug模式使用。

② 基于HDFS的FsStateBackend:分布式文件持久化,每次读写都产生网络IO,性能不好

③ 基于RocksDB的RocksDBStateBackend:本地文件+异步HDFS持久化。

2.2 state 扩容分配问题

Flink 是一个分布式有状态的流处理系统。Flink 作业的 DAG 图在逻辑上 StreamGraph 优化为 JobGraph,最终转化为物理执行的 ExecutionGraph 运行在 TaskManager。ExecutionGraph 的每个节点就是算子实例。每个算子实例都可以看作是一个独立的任务。

如下图所示,Flink 作业的 DAG 图在垂直方向有网络 IO,在水平方向的 stateful operator 算子之间没有网络通信。这种模型保证了每个算子实例维护一份自己的 state,并且保存在本地,不会导致算子实例之间的产生网络通信

image

Flink 有两种状态:operator state 和 keyed state。如果进行扩容(增大并发度),如何重新分配 state?例如,外部 MQ 有5个 partition,在 source 的并发由 1 扩容到 2,中间 stateful operation 的并发度由 2 扩容到 3。

image

3.operator state 扩容重新分配

选取kafka connector案例,如上图所示,kafka broker 的 partition 数量是5,source 的并发度从 2 扩容到 5 。如何恢复 state ?下面的源码是基于1.11版本。
FlinkKafkaConsumerBase.java源码分析。如果restoredState是从operator state恢复,即从savepoint或者checkpoint恢复,把restoredState的数据设置到subscribedPartitionsToStartOffsets。在设置的过程中,需要重新把partition分配到subtask,即使用KafkaTopicPartitionAssigner的assign方法重新计算该partition属于哪个subtask。

    /** The set of topic partitions that the source will read, with their initial offsets to start reading from. */

    private Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets;
    /**
     * The offsets to restore to, if the consumer restores state from a checkpoint.
     *
     * <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)} method.
     *
     * <p>Using a sorted map as the ordering is important when using restored state
     * to seed the partition discoverer.
     */
    private transient volatile TreeMap<KafkaTopicPartition, Long> restoredState;

    /** Accessor for state in the operator state backend. */
    private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;

    // 算子初始化配置 
    @Override
    public void open(Configuration configuration) throws Exception {
        // 省略 ...

        subscribedPartitionsToStartOffsets = new HashMap<>();
        final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
                // 如果是从operator state恢复, 即不为null
        if (restoredState != null) {
            for (KafkaTopicPartition partition : allPartitions) {
                if (!restoredState.containsKey(partition)) {
                    restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
                }
            }

            for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) {
                if (!restoredFromOldState) {
                    // seed the partition discoverer with the union state while filtering out
                    // restored partitions that should not be subscribed by this subtask
                    if (KafkaTopicPartitionAssigner.assign(
                        restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
                            == getRuntimeContext().getIndexOfThisSubtask()){
                        subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
                    }
                } else {
                    // when restoring from older 1.1 / 1.2 state, the restored state would not be the union state;
                    // in this case, just use the restored state as the subscribed partitions
                    subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
                }
            }

            if (filterRestoredPartitionsWithCurrentTopicsDescriptor) {
                subscribedPartitionsToStartOffsets.entrySet().removeIf(entry -> {
                    if (!topicsDescriptor.isMatchingTopic(entry.getKey().getTopic())) {
                        LOG.warn(
                            "{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution.",
                            entry.getKey());
                        return true;
                    }
                    return false;
                });
            }

            LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
                getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
        }
    }

    // checkpoint 恢复缓存数据
    @Override
    public final void initializeState(FunctionInitializationContext context) throws Exception {
        // 省略 ...

        if (context.isRestored() && !restoredFromOldState) {
            restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());

            // migrate from 1.2 state, if there is any
            for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : oldRoundRobinListState.get()) {
                restoredFromOldState = true;
                unionOffsetStates.add(kafkaOffset);
            }
            oldRoundRobinListState.clear();

            if (restoredFromOldState && discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) {
                throw new IllegalArgumentException(
                    "Topic / partition discovery cannot be enabled if the job is restored from a savepoint from Flink 1.2.x.");
            }

            // populate actual holder for restored state
            for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
                restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
            }

            LOG.info("Consumer subtask {} restored state: {}.", getRuntimeContext().getIndexOfThisSubtask(), restoredState);
        } else {
            LOG.info("Consumer subtask {} has no restore state.", getRuntimeContext().getIndexOfThisSubtask());
        }
    }

接着分析subscribedPartitionsToStartOffsets属性,类型是Map<KafkaTopicPartition, Long>,KafkaTopicPartition包含topic和partition,Long表示offset。

public final class KafkaTopicPartition implements Serializable {

    /** THIS SERIAL VERSION UID MUST NOT CHANGE, BECAUSE IT WOULD BREAK
     * READING OLD SERIALIZED INSTANCES FROM SAVEPOINTS. */
    private static final long serialVersionUID = 722083576322742325L;

    // ------------------------------------------------------------------------

    private final String topic;
    private final int partition;
    private final int cachedHash;

    public KafkaTopicPartition(String topic, int partition) {
        this.topic = requireNonNull(topic);
        this.partition = partition;
        this.cachedHash = 31 * topic.hashCode() + partition;
    }
}

最后我们看看KafkaTopicPartitionAssigner是如何为subtask分配partition。首先使用topic取(hash* 31) & 0x7FFFFFFF是为了保证其结果是一个质数,然后再(startIndex + partition.getPartition()) % numParallelSubtasks是为了同一个topic的partition尽量连续分配给同一个subtask。
operator state扩容.JPG

4.keyed state 扩容重新分配

思路:为了减少网络IO即本地化缓存数据,算子实例subtask需要读取连续的key,从而就有了KeyGroup的设计。这里需要解决两个问题:①如何把KeyGroup分配给subtask? ②每条数据即key如何确定KeyGroup?

4.1 KeyGroup

public class KeyGroupRange implements KeyGroupsList, Serializable {
        ...
        ...
        private final int startKeyGroup;
        private final int endKeyGroup;
        ...
        ...
}

KeyGroupRange两个重要的属性就是 startKeyGroup和endKeyGroup,定义了startKeyGroup和endKeyGroup属性后,即subtask的KeyGroup的范围(连续数量)。

4.2 分配数据(key)给KeyGroup

分析KeyGroupRangeAssignment源码,assignToKeyGroup方法采用取mod的方式,将key划分到指定的KeyGroup中。

    /**
     * Assigns the given key to a key-group index.
     *
     * @param key the key to assign
     * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
     * @return the key-group to which the given key is assigned
     */
    public static int assignToKeyGroup(Object key, int maxParallelism) {
        Preconditions.checkNotNull(key, "Assigned key must not be null!");
        return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism);
    }

    /**
     * Assigns the given key to a key-group index.
     *
     * @param keyHash the hash of the key to assign
     * @param maxParallelism the maximum supported parallelism, aka the number of key-groups.
     * @return the key-group to which the given key is assigned
     */
    public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) {
        return MathUtils.murmurHash(keyHash) % maxParallelism;
    }

分配key到指定的KeyGroup的逻辑是利用key的hashCode和maxParallelism进行取余操作来分配的。如下图所示,parallelism=2,maxParallelism=10,key与KeyGroup的对应关系。
key和KeyGroup的关系.JPG

4.3 分配KeyGroup给subtask

分析KeyGroupRangeAssignment的computeKeyGroupRangeForOperatorIndex方法,每个算子实例subtask都可以分配到连续的KeyGroup范围,并且KeyGroup范围式[0,maxParallelism]。

    /**
     * Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum
     * parallelism.
     *
     * <p>IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want
     * to go beyond this boundary, this method must perform arithmetic on long values.
     *
     * @param maxParallelism Maximal parallelism that the job was initially created with.
     * @param parallelism    The current parallelism under which the job runs. Must be <= maxParallelism.
     * @param operatorIndex  Id of a key-group. 0 <= keyGroupID < maxParallelism.
     * @return the computed key-group range for the operator.
     */
    public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
        int maxParallelism,
        int parallelism,
        int operatorIndex) {

        checkParallelismPreconditions(parallelism);
        checkParallelismPreconditions(maxParallelism);

        Preconditions.checkArgument(maxParallelism >= parallelism,
            "Maximum parallelism must not be smaller than parallelism.");

        int start = ((operatorIndex * maxParallelism + parallelism - 1) / parallelism);
        int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
        return new KeyGroupRange(start, end);
    }

如下图所示,Stateful Operation算子实例的maxParallelism=10,即10个KeyGroup。算子实例的并发度从 2 扩容到 3 的分配情况如下图。

keyed state扩容.JPG

上图分析可知,大部分state是本地的,如Task0只有KG-4被分出去,其他的还是保持在本地。如果作业的maxParallelism变化了,那么会直接影响到KeyGroup的数量和key的分配,也会打乱所有的KeyGroup的分配。因此,作业的maxParallelism要充分扩容场景,否则如果修改maxParallelism,会直接影响到缓存数据的恢复。

5.合理设置 maxParallelism

5.1 最大并发度的概念

maxParallelism 表示当前算子设置的 maxParallelism,而不是 Flink 任务的并行度。maxParallelism 为 实质是 KeyGroup 的总数

当设置算子的并行度大于 maxParallelism 时,有些并行度就分配不到 KeyGroup,此时 Flink 任务是无法>从 Checkpoint 处恢复的。
maxParallelism的有效值在1到Short.MAX_VALUE之间。

    /**
     * Sets the maximum parallelism for the task.
     *
     * @param maxParallelism The maximum parallelism to be set. must be between 1 and Short.MAX_VALUE.
     */
    public void setMaxParallelism(int maxParallelism) {
        this.maxParallelism = maxParallelism;
    }

如果不设置 maxParallelism,根据并发度 parallelism 计算默认的最大并发度,算子并行度 * 1.5 后,向上取整到 2 的 n 次幂,同时保证计算的结果在最小值和最大值之间。最小值 DEFAULT_LOWER_BOUND_MAX_PARALLELISM 是 2 的 7 次方 = 128,最大值 UPPER_BOUND_MAX_PARALLELISM 是 2 的 15 次方 = 32768,即 flink 自动生成的 maxParallelism 在于 128 和 32768 之间。

    /**
     * Computes a default maximum parallelism from the operator parallelism. This is used in case the user has not
     * explicitly configured a maximum parallelism to still allow a certain degree of scale-up.
     *
     * @param operatorParallelism the operator parallelism as basis for computation.
     * @return the computed default maximum parallelism.
     */
    public static int computeDefaultMaxParallelism(int operatorParallelism) {

        checkParallelismPreconditions(operatorParallelism);

        return Math.min(
                Math.max(
                        MathUtils.roundUpToPowerOfTwo(operatorParallelism + (operatorParallelism / 2)),
                        DEFAULT_LOWER_BOUND_MAX_PARALLELISM),
                UPPER_BOUND_MAX_PARALLELISM);
    }

5.2 实战

一般情况,新的 flink 作业的业务数据量较小,初期设置的并行度也会很小,可能没有给每个 作业或者算子设置 maxParallelism。根据默认并发度的计算规则,flink 自动生成的 maxParallelism 是 128。然而,后期随着业务数据量暴涨,当 作业 的并发度大于 128 的时候,发现作业无法从 checkpoint 或者 savepoint 中恢复,即 "并发度不能上调"。如果有些 flink 作业是带状态的,就会有很大的问题。

因此,应该结合业务场景主动为每个 flink 作业设置合理的 maxParallelism,防止上述问题。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,192评论 6 511
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,858评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,517评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,148评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,162评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,905评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,537评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,439评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,956评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,083评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,218评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,899评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,565评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,093评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,201评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,539评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,215评论 2 358