Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
前言
Flink 2.0的重要特性之一是存算分离。存算分离场景下使用传统同步方式操作状态后端,性能下降十分显著。为了应对这个问题,FLIP-424提出了异步化的state API执行模型。具有如下特性:
- 异步执行:所有state操作异步化。但是对于key相同的state操作必须要保证操作的顺序,避免出现脏读幻读的情况。
- 攒批执行:尽可能的推迟request执行。当可立即执行的request攒够一个批量大小(batchSize)的时候一起执行。这种机制可充分利用状态后端数据库的批量执行优化特性,提高性能。
下面以最为常用的ValueState为例展开分析异步化的实现方式。
ValueState
asyncValue
方法用于异步获取ValueState的值。该方法位于AbstractValueState
中:
@Override
public final StateFuture<V> asyncValue() {
return handleRequest(StateRequestType.VALUE_GET, null);
}
显然,该异步方法没有立即返回state的值,而是创建了一个类型为VALUE_GET
的请求。
handleRequest
方法返回的是一个StateFuture
对象。其实现类为StateFutureImpl
。这个类的行为用法和JDK的CompletableFuture
非常类似,提供了大量的thenXxx
方法用来声明异步回调。
handleRequest
方法位于AbstractKeyedState
,代码如下:
protected final <IN, OUT> StateFuture<OUT> handleRequest(
StateRequestType stateRequestType, IN payload) {
return stateRequestHandler.handleRequest(this, stateRequestType, payload);
}
上面stateRequestHandler
的实现类为AsyncExecutionController
。接下来对它的handleRequest
方法展开分析。
AsyncExecutionController
AsyncExecutionController
负责各个operator发来的异步状态请求。它能够确保持有相同key的异步状态请求按顺序执行,确保结果最终一致,不影响业务逻辑。
当遇到过多待处理数据(inflight),它会阻塞input端,从而实现反压(back pressure)。
前面提到的handleRequest
代码如下所示。它构造出state request,检查当前inflight record数量是否超过了最大允许值。然后判断是否有相同的key被占用来决定该request可被立即执行还是阻塞等待。最后检查可被立即执行的request数量是否超过了batch size,如果超过了,立即执行一批。
@Override
public <IN, OUT> InternalStateFuture<OUT> handleRequest(
@Nullable State state, StateRequestType type, @Nullable IN payload) {
// Step 1: build state future & assign context.
// 包装context到InternalStateFuture中。需要根据生命周期触发一些操作,例如更新引用计数,更新引用计数的逻辑后面分析
InternalStateFuture<OUT> stateFuture = stateFutureFactory.create(currentContext);
StateRequest<K, ?, IN, OUT> request =
new StateRequest<>(state, type, payload, stateFuture, currentContext);
// Step 2: try to seize the capacity, if the current in-flight records exceeds the limit,
// block the current state request from entering until some buffered requests are processed.
// 检查inflight record数量是否超过了最大允许值,如果是,尝试触发计算直到inflight record数量小于最大允许值
// 期间会阻塞该state request
seizeCapacity();
// Step 3: try to occupy the key and place it into right buffer.
// 判断currentContext中持有的key是否正在被处理。如果没有request加入到activeBuffer中
// 如果有,加入request到blockingBuffer中
if (tryOccupyKey(currentContext)) {
// active buffer队列中的request可立即执行
insertActiveBuffer(request);
} else {
// blocking buffer中的requet不可执行(存在key相同的request在active buffer中)
// 需要active buffer中相同key的request执行完毕之后,把request从blocking buffer移动到active buffer中,才有执行的资格
insertBlockingBuffer(request);
}
// Step 4: trigger the (active) buffer if needed.
// 如果activeQueue中request数量超过batchSize
// 执行一批activeRequest
triggerIfNeeded(false);
return stateFuture;
}
下面展开分析handleRequest
中的步骤。
StateFutureFactory
的create
方法包装context
到InternalStateFuture
中,这里InternalStateFuture
的实现类为ContextStateFutureImpl
。
public <OUT> InternalStateFuture<OUT> create(RecordContext<K> context) {
return new ContextStateFutureImpl<>(
(runnable) ->
callbackRunner.submit(
() -> {
asyncExecutionController.setCurrentContext(context);
runnable.run();
}),
exceptionHandler,
context);
}
setCurrentContext
方法通知状态后端切换上下文(切换key和keygroup):
public void setCurrentContext(RecordContext<K> switchingContext) {
currentContext = switchingContext;
if (switchContextListener != null) {
switchContextListener.switchContext(switchingContext);
}
}
接下来是seizeCapacity
方法。如果当前key没有被占用。触发计算减少inflight record数量直到小于maxInFlightRecordNum
。
private void seizeCapacity() {
// 1. Check if the record is already in buffer. If yes, this indicates that it is a state
// request resulting from a callback statement, otherwise, it signifies the initial state
// request for a newly entered record.
// 检查这个record是否已经在buffer中
// 如果是,无操作直接返回
if (currentContext.isKeyOccupied()) {
return;
}
RecordContext<K> storedContext = currentContext;
// 2. If the state request is for a newly entered record, the in-flight record number should
// be less than the max in-flight record number.
// Note: the currentContext may be updated by {@code StateFutureFactory#build}.
// 如果inflight(activeBuffer和blockingBuffer中的数量)数量超过最大允许的inflight数量maxInFlightRecordNum
// 尝试触发计算,直到InFlightRecord小于maxInFlightRecordNum
drainInflightRecords(maxInFlightRecordNum);
// 3. Ensure the currentContext is restored.
// 设置当前的context
setCurrentContext(storedContext);
// 待处理(activeBuffer和blockingBuffer中的数量)加1enqu
inFlightRecordNum.incrementAndGet();
}
tryOccupyKey
判断recordContext携带的是不是同一个key。不是同一个key的请求放入activeBuffer队列,是的话放入到阻塞队列中。
boolean tryOccupyKey(RecordContext<K> recordContext) {
// 当前record的key是否被占用
boolean occupied = recordContext.isKeyOccupied();
// 如果没有被占用
// 尝试占用这个key成功
if (!occupied
&& keyAccountingUnit.occupy(recordContext.getRecord(), recordContext.getKey())) {
// 标记recordContext所持的key状态为occupied
recordContext.setKeyOccupied();
// 标记occupied
occupied = true;
}
return occupied;
}
最后是triggerIfNeeded
方法。检查activeQueue中的request数量是否超过了batchSize。如果超过了,批量执行他们。对应上了Flink 2.0的新特性攒批执行,对性能提升有帮助。
public void triggerIfNeeded(boolean force) {
// 如果不是强制触发
// 并且activeQueue中的request数量小于batchSize的话,不满足执行条件
// 直接返回,什么也不做
if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) {
return;
}
// 从activeQueue中弹出batchSize个request到stateRequestContainer中
Optional<StateRequestContainer> toRun =
stateRequestsBuffer.popActive(
batchSize, () -> stateExecutor.createStateRequestContainer());
if (!toRun.isPresent() || toRun.get().isEmpty()) {
return;
}
// 批量执行这些request
stateExecutor.executeBatchRequests(toRun.get());
// sequence number增加1,用来区分不同的trigger
stateRequestsBuffer.advanceSeq();
}
批量执行requests的逻辑位于stateExecutor
中。Flink 2.0新引入了ForStDB,该DB的executor为ForStStateExecutor
。ForStStateExecutor
通过ForStStateRequestClassifier
(即前面createStateRequestContainer
生成的对象)将stateRequest转换为ForStDB request,赋予了操作ForStDB的能力。这部分涉及较多的是数据库的操作,此处不再详细分析。
StateRequestBuffer
StateRequestBuffer
负责缓存state request,最为重要的是其中维护的是两个队列:
- activeQueue:当buffer满的时候,在此队列中的state request都会执行。
- blockingQueue:需要等待activeQueue中涉及到相同key的state request执行完毕之后,才能够执行。blockingQueue中的request在activeQueue中相同key的request执行完毕之后,会移动到activeQueue中。
// 将request装入activeQueue中
void enqueueToActive(StateRequest<K, ?, ?, ?> request) {
if (request.getRequestType() == StateRequestType.SYNC_POINT) {
request.getFuture().complete(null);
} else {
activeQueue.add(request);
if (bufferTimeout > 0 && seqAndTimeout == null) {
seqAndTimeout =
Tuple2.of(currentSeq.get(), System.currentTimeMillis() + bufferTimeout);
}
}
}
// 将request装入blockingQueue中
void enqueueToBlocking(StateRequest<K, ?, ?, ?> request) {
blockingQueue
.computeIfAbsent(request.getRecordContext().getKey(), k -> new LinkedList<>())
.add(request);
blockingQueueSize++;
}
Key的reference count如何维护
前面提到在handleRequest
的时候。RecordContext会被包装在InternalStateFuture
中。大家可能会问为什么要如此包装。查看代码发现该接口的实现类为ContextStateFutureImpl
。该类的Javadoc中提到它负责维护reference count(引用计数)。
引用计数的作用是判断当前key没有被引用的时候,执行RecordContext
的清理逻辑。
ContextStateFutureImpl
类代码和分析如下。
public class ContextStateFutureImpl<T> extends StateFutureImpl<T> {
private final RecordContext<?> recordContext;
ContextStateFutureImpl(
CallbackRunner callbackRunner,
AsyncFrameworkExceptionHandler exceptionHandler,
RecordContext<?> recordContext) {
super(callbackRunner, exceptionHandler);
this.recordContext = recordContext;
// When state request submitted, ref count +1, as described in FLIP-425:
// To cover the statements without a callback, in addition to the reference count marked
// in Fig.5, each state request itself is also protected by a paired reference count.
// 在state request提交的时候,引用计数加1
recordContext.retain();
}
@Override
public <A> StateFutureImpl<A> makeNewStateFuture() {
return new ContextStateFutureImpl<>(callbackRunner, exceptionHandler, recordContext);
}
@Override
// 注册callback的时候,引用计数加1
public void callbackRegistered() {
// When a callback registered, as shown in Fig.5 of FLIP-425, at the point of 3 and 5, the
// ref count +1.
recordContext.retain();
}
@Override
// 执行完成的时候,引用计数减1
public void postComplete(boolean inCallbackRunner) {
// When a state request completes, ref count -1, as described in FLIP-425:
// To cover the statements without a callback, in addition to the reference count marked
// in Fig.5, each state request itself is also protected by a paired reference count.
if (inCallbackRunner) {
// 这里会检查引用计数减1之后如果是0,会执行RecordContext中的disposeContext方法
recordContext.release(Runnable::run);
} else {
recordContext.release(
runnable -> {
try {
callbackRunner.submit(runnable::run);
} catch (Exception e) {
exceptionHandler.handleException(
"Caught exception when post complete StateFuture.", e);
}
});
}
}
@Override
// callback调用结束的时候,引用计数减1
public void callbackFinished() {
// When a callback ends, as shown in Fig.5 of FLIP-425, at the
// point of 2,4 and 6, the ref count -1.
recordContext.release(Runnable::run);
}
}
ContextStateFutureImpl
继承自StateFutureImpl
(文章开头时提及)。在thenXxx
方法异步回调开始执行时调用callbackRegistered
增加引用计数,callback调用结束时调用callbackFinished
减少引用计数。在使用complete
方法的时候调用postComplete
方法释放引用计数。如果引用计数为0,执行清理方法。
以上是维护RecordContext
对应key的引用计数的原理。
buildContext
方法在operator接收到一个record的时候(processElement
方法)调用。该方法创建出RecordContext
。
public RecordContext<K> buildContext(Object record, K key, boolean inheritEpoch) {
if (record == null) {
return new RecordContext<>(
RecordContext.EMPTY_RECORD,
key,
// reference count达到0的时候,如何处理这个context的逻辑
this::disposeContext,
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism),
inheritEpoch
? epochManager.onEpoch(currentContext.getEpoch())
: epochManager.onRecord());
}
return new RecordContext<>(
record,
key,
this::disposeContext,
KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism),
inheritEpoch
? epochManager.onEpoch(currentContext.getEpoch())
: epochManager.onRecord());
}
通过上面的代码可以找到RecordContext的清理逻辑位于disposeContext
。它在reference count为0的时候执行。
void disposeContext(RecordContext<K> toDispose) {
epochManager.completeOneRecord(toDispose.getEpoch());
// 释放key的占用
keyAccountingUnit.release(toDispose.getRecord(), toDispose.getKey());
// inflight record数量减1
inFlightRecordNum.decrementAndGet();
// 尝试从blocking queue中拿出一个key相同的RecordContext
RecordContext<K> nextRecordCtx =
stateRequestsBuffer.tryActivateOneByKey(toDispose.getKey());
// 如果它存在,占用这个key
if (nextRecordCtx != null) {
Preconditions.checkState(tryOccupyKey(nextRecordCtx));
}
}