Flink 源码之异步state API

Flink源码分析系列文档目录

请点击:Flink 源码分析系列文档目录

前言

Flink 2.0的重要特性之一是存算分离。存算分离场景下使用传统同步方式操作状态后端,性能下降十分显著。为了应对这个问题,FLIP-424提出了异步化的state API执行模型。具有如下特性:

  • 异步执行:所有state操作异步化。但是对于key相同的state操作必须要保证操作的顺序,避免出现脏读幻读的情况。
  • 攒批执行:尽可能的推迟request执行。当可立即执行的request攒够一个批量大小(batchSize)的时候一起执行。这种机制可充分利用状态后端数据库的批量执行优化特性,提高性能。

下面以最为常用的ValueState为例展开分析异步化的实现方式。

ValueState

asyncValue方法用于异步获取ValueState的值。该方法位于AbstractValueState中:

@Override
public final StateFuture<V> asyncValue() {
    return handleRequest(StateRequestType.VALUE_GET, null);
}

显然,该异步方法没有立即返回state的值,而是创建了一个类型为VALUE_GET的请求。

handleRequest方法返回的是一个StateFuture对象。其实现类为StateFutureImpl。这个类的行为用法和JDK的CompletableFuture非常类似,提供了大量的thenXxx方法用来声明异步回调。

handleRequest方法位于AbstractKeyedState,代码如下:

protected final <IN, OUT> StateFuture<OUT> handleRequest(
        StateRequestType stateRequestType, IN payload) {
    return stateRequestHandler.handleRequest(this, stateRequestType, payload);
}

上面stateRequestHandler的实现类为AsyncExecutionController。接下来对它的handleRequest方法展开分析。

AsyncExecutionController

AsyncExecutionController负责各个operator发来的异步状态请求。它能够确保持有相同key的异步状态请求按顺序执行,确保结果最终一致,不影响业务逻辑。

当遇到过多待处理数据(inflight),它会阻塞input端,从而实现反压(back pressure)。

前面提到的handleRequest代码如下所示。它构造出state request,检查当前inflight record数量是否超过了最大允许值。然后判断是否有相同的key被占用来决定该request可被立即执行还是阻塞等待。最后检查可被立即执行的request数量是否超过了batch size,如果超过了,立即执行一批。

@Override
public <IN, OUT> InternalStateFuture<OUT> handleRequest(
        @Nullable State state, StateRequestType type, @Nullable IN payload) {
    // Step 1: build state future & assign context.
    // 包装context到InternalStateFuture中。需要根据生命周期触发一些操作,例如更新引用计数,更新引用计数的逻辑后面分析
    InternalStateFuture<OUT> stateFuture = stateFutureFactory.create(currentContext);
    StateRequest<K, ?, IN, OUT> request =
            new StateRequest<>(state, type, payload, stateFuture, currentContext);

    // Step 2: try to seize the capacity, if the current in-flight records exceeds the limit,
    // block the current state request from entering until some buffered requests are processed.
    // 检查inflight record数量是否超过了最大允许值,如果是,尝试触发计算直到inflight record数量小于最大允许值
    // 期间会阻塞该state request
    seizeCapacity();

    // Step 3: try to occupy the key and place it into right buffer.
    // 判断currentContext中持有的key是否正在被处理。如果没有request加入到activeBuffer中
    // 如果有,加入request到blockingBuffer中
    if (tryOccupyKey(currentContext)) {
        // active buffer队列中的request可立即执行
        insertActiveBuffer(request);
    } else {
        // blocking buffer中的requet不可执行(存在key相同的request在active buffer中)
        // 需要active buffer中相同key的request执行完毕之后,把request从blocking buffer移动到active buffer中,才有执行的资格
        insertBlockingBuffer(request);
    }
    // Step 4: trigger the (active) buffer if needed.
    // 如果activeQueue中request数量超过batchSize
    // 执行一批activeRequest
    triggerIfNeeded(false);
    return stateFuture;
}

下面展开分析handleRequest中的步骤。

StateFutureFactorycreate方法包装contextInternalStateFuture中,这里InternalStateFuture的实现类为ContextStateFutureImpl

public <OUT> InternalStateFuture<OUT> create(RecordContext<K> context) {
    return new ContextStateFutureImpl<>(
            (runnable) ->
                    callbackRunner.submit(
                            () -> {
                                asyncExecutionController.setCurrentContext(context);
                                runnable.run();
                            }),
            exceptionHandler,
            context);
}

setCurrentContext方法通知状态后端切换上下文(切换key和keygroup):

public void setCurrentContext(RecordContext<K> switchingContext) {
    currentContext = switchingContext;
    if (switchContextListener != null) {
        switchContextListener.switchContext(switchingContext);
    }
}

接下来是seizeCapacity方法。如果当前key没有被占用。触发计算减少inflight record数量直到小于maxInFlightRecordNum

private void seizeCapacity() {
    // 1. Check if the record is already in buffer. If yes, this indicates that it is a state
    // request resulting from a callback statement, otherwise, it signifies the initial state
    // request for a newly entered record.
    // 检查这个record是否已经在buffer中
    // 如果是,无操作直接返回
    if (currentContext.isKeyOccupied()) {
        return;
    }
    RecordContext<K> storedContext = currentContext;
    // 2. If the state request is for a newly entered record, the in-flight record number should
    // be less than the max in-flight record number.
    // Note: the currentContext may be updated by {@code StateFutureFactory#build}.
    // 如果inflight(activeBuffer和blockingBuffer中的数量)数量超过最大允许的inflight数量maxInFlightRecordNum
    // 尝试触发计算,直到InFlightRecord小于maxInFlightRecordNum
    drainInflightRecords(maxInFlightRecordNum);
    // 3. Ensure the currentContext is restored.
    // 设置当前的context
    setCurrentContext(storedContext);
    // 待处理(activeBuffer和blockingBuffer中的数量)加1enqu
    inFlightRecordNum.incrementAndGet();
}

tryOccupyKey判断recordContext携带的是不是同一个key。不是同一个key的请求放入activeBuffer队列,是的话放入到阻塞队列中。

boolean tryOccupyKey(RecordContext<K> recordContext) {
    // 当前record的key是否被占用
    boolean occupied = recordContext.isKeyOccupied();
    // 如果没有被占用
    // 尝试占用这个key成功
    if (!occupied
            && keyAccountingUnit.occupy(recordContext.getRecord(), recordContext.getKey())) {
        // 标记recordContext所持的key状态为occupied
        recordContext.setKeyOccupied();
        // 标记occupied
        occupied = true;
    }
    return occupied;
}

最后是triggerIfNeeded方法。检查activeQueue中的request数量是否超过了batchSize。如果超过了,批量执行他们。对应上了Flink 2.0的新特性攒批执行,对性能提升有帮助。

public void triggerIfNeeded(boolean force) {
    // 如果不是强制触发
    // 并且activeQueue中的request数量小于batchSize的话,不满足执行条件
    // 直接返回,什么也不做
    if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) {
        return;
    }

    // 从activeQueue中弹出batchSize个request到stateRequestContainer中
    Optional<StateRequestContainer> toRun =
            stateRequestsBuffer.popActive(
                    batchSize, () -> stateExecutor.createStateRequestContainer());
    if (!toRun.isPresent() || toRun.get().isEmpty()) {
        return;
    }
    // 批量执行这些request
    stateExecutor.executeBatchRequests(toRun.get());
    // sequence number增加1,用来区分不同的trigger
    stateRequestsBuffer.advanceSeq();
}

批量执行requests的逻辑位于stateExecutor中。Flink 2.0新引入了ForStDB,该DB的executor为ForStStateExecutorForStStateExecutor通过ForStStateRequestClassifier(即前面createStateRequestContainer生成的对象)将stateRequest转换为ForStDB request,赋予了操作ForStDB的能力。这部分涉及较多的是数据库的操作,此处不再详细分析。

StateRequestBuffer

StateRequestBuffer负责缓存state request,最为重要的是其中维护的是两个队列:

  • activeQueue:当buffer满的时候,在此队列中的state request都会执行。
  • blockingQueue:需要等待activeQueue中涉及到相同key的state request执行完毕之后,才能够执行。blockingQueue中的request在activeQueue中相同key的request执行完毕之后,会移动到activeQueue中。
// 将request装入activeQueue中
void enqueueToActive(StateRequest<K, ?, ?, ?> request) {
    if (request.getRequestType() == StateRequestType.SYNC_POINT) {
        request.getFuture().complete(null);
    } else {
        activeQueue.add(request);
        if (bufferTimeout > 0 && seqAndTimeout == null) {
            seqAndTimeout =
                    Tuple2.of(currentSeq.get(), System.currentTimeMillis() + bufferTimeout);
        }
    }
}

// 将request装入blockingQueue中
void enqueueToBlocking(StateRequest<K, ?, ?, ?> request) {
    blockingQueue
            .computeIfAbsent(request.getRecordContext().getKey(), k -> new LinkedList<>())
            .add(request);
    blockingQueueSize++;
}

Key的reference count如何维护

前面提到在handleRequest的时候。RecordContext会被包装在InternalStateFuture中。大家可能会问为什么要如此包装。查看代码发现该接口的实现类为ContextStateFutureImpl。该类的Javadoc中提到它负责维护reference count(引用计数)。

引用计数的作用是判断当前key没有被引用的时候,执行RecordContext的清理逻辑。

ContextStateFutureImpl类代码和分析如下。

public class ContextStateFutureImpl<T> extends StateFutureImpl<T> {

    private final RecordContext<?> recordContext;

    ContextStateFutureImpl(
            CallbackRunner callbackRunner,
            AsyncFrameworkExceptionHandler exceptionHandler,
            RecordContext<?> recordContext) {
        super(callbackRunner, exceptionHandler);
        this.recordContext = recordContext;
        // When state request submitted, ref count +1, as described in FLIP-425:
        // To cover the statements without a callback, in addition to the reference count marked
        // in Fig.5, each state request itself is also protected by a paired reference count.
        // 在state request提交的时候,引用计数加1
        recordContext.retain();
    }

    @Override
    public <A> StateFutureImpl<A> makeNewStateFuture() {
        return new ContextStateFutureImpl<>(callbackRunner, exceptionHandler, recordContext);
    }

    @Override
    // 注册callback的时候,引用计数加1
    public void callbackRegistered() {
        // When a callback registered, as shown in Fig.5 of FLIP-425, at the point of 3 and 5, the
        // ref count +1.
        recordContext.retain();
    }

    @Override
    // 执行完成的时候,引用计数减1
    public void postComplete(boolean inCallbackRunner) {
        // When a state request completes, ref count -1, as described in FLIP-425:
        // To cover the statements without a callback, in addition to the reference count marked
        // in Fig.5, each state request itself is also protected by a paired reference count.
        if (inCallbackRunner) {
            // 这里会检查引用计数减1之后如果是0,会执行RecordContext中的disposeContext方法
            recordContext.release(Runnable::run);
        } else {
            recordContext.release(
                    runnable -> {
                        try {
                            callbackRunner.submit(runnable::run);
                        } catch (Exception e) {
                            exceptionHandler.handleException(
                                    "Caught exception when post complete StateFuture.", e);
                        }
                    });
        }
    }

    @Override
    // callback调用结束的时候,引用计数减1
    public void callbackFinished() {
        // When a callback ends, as shown in Fig.5 of FLIP-425, at the
        // point of 2,4 and 6, the ref count -1.
        recordContext.release(Runnable::run);
    }
}

ContextStateFutureImpl继承自StateFutureImpl(文章开头时提及)。在thenXxx方法异步回调开始执行时调用callbackRegistered增加引用计数,callback调用结束时调用callbackFinished减少引用计数。在使用complete方法的时候调用postComplete方法释放引用计数。如果引用计数为0,执行清理方法。

以上是维护RecordContext对应key的引用计数的原理。

buildContext方法在operator接收到一个record的时候(processElement方法)调用。该方法创建出RecordContext

public RecordContext<K> buildContext(Object record, K key, boolean inheritEpoch) {
    if (record == null) {
        return new RecordContext<>(
                RecordContext.EMPTY_RECORD,
                key,
                // reference count达到0的时候,如何处理这个context的逻辑
                this::disposeContext,
                KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism),
                inheritEpoch
                        ? epochManager.onEpoch(currentContext.getEpoch())
                        : epochManager.onRecord());
    }
    return new RecordContext<>(
            record,
            key,
            this::disposeContext,
            KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism),
            inheritEpoch
                    ? epochManager.onEpoch(currentContext.getEpoch())
                    : epochManager.onRecord());
}

通过上面的代码可以找到RecordContext的清理逻辑位于disposeContext。它在reference count为0的时候执行。

void disposeContext(RecordContext<K> toDispose) {
    epochManager.completeOneRecord(toDispose.getEpoch());
    // 释放key的占用
    keyAccountingUnit.release(toDispose.getRecord(), toDispose.getKey());
    // inflight record数量减1
    inFlightRecordNum.decrementAndGet();
    // 尝试从blocking queue中拿出一个key相同的RecordContext
    RecordContext<K> nextRecordCtx =
            stateRequestsBuffer.tryActivateOneByKey(toDispose.getKey());
    // 如果它存在,占用这个key
    if (nextRecordCtx != null) {
        Preconditions.checkState(tryOccupyKey(nextRecordCtx));
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352

推荐阅读更多精彩内容