Flink 源码:StreamTask 介绍及初始化过程详解

本文仅为笔者平日学习记录之用,侵删
原文:https://mp.weixin.qq.com/s/GuA9o09EEue66fEpGgoGaQ

本文是 Flink 源码解析系列,通过阅读本文你能 get 到以下点:

  • StreamTask 类的基本功能及其职责
  • StreamTask 初始化详细流程
  • StateBackend 与 keyedStateBackend 以及 operatorStateBackend 之间的关系

这里先留一个思考题:如下代码所示,开发 Flink Job 时 dataStream keyBy 后连续跟 map、filter、flatMap 三个算子,请问这三个自定义的 Function 内都可以使用 Flink 的 ValueState 吗?

dataStream.keyby(_._1)
  .map(new MyMapFunction())
  .filter(new MyFilterFunction())
  .flatMap(new MyFlatMapFunction())

一、 StreamTask 介绍

Flink 中数据的整个处理流程都是围绕 StreamTask 来做的,所以先介绍一下 StreamTask 这个类。StreamTask 类的 doc 如下所示:

StreamTask 类 doc

注释的大概意思是:StreamTask 是所有 Streaming Task 的基类,是由 TM 部署并执行的本地处理单元。每个 StreamTask 运行一个或多个 Chain 在一起的 StreamOperator,这些 Operator 将会在一个线程内同步执行。常见的 case:map、flatmap、filter 三个算子连续的算子。

通俗的讲,StreamTask 就是对应一个 subtask 实例。如下图 Job 的 ExecutionGraph 所示,Source 算子和 map 算子 Chain 在一起,组成一个 OperatorChain,所以这两个算子运行在一个 subtask 里,同时这两个算子的并行度为 2,所以在对应两个 subtask。图中后续的算子也是类似,图中任务如果运行起来,就会对应 5 个 subtask,也就是对应 5 个 StreamTask。

Job 的 ExecutionGraph

从资源角度讲,每个 TaskManager 内部有多个 slot,每个 slot 内部运行着一个 subtask,也就是说每个 slot 内部运行着一个 StreamTask。

看完这个案例,再回顾一遍源码中注释,应该比较容易理解了:

  • StreamTask 是由 TM 部署并执行的本地处理单元
  • 每个 StreamTask 运行一个或多个 Chain 在一块的 StreamOperator,即:Source 算子和 map 算子就是 Chain 在一起的 Operator
  • 这些 Operator 将会在一个线程内同步的执行。即:线程中 Source 算子和 map 算子不能同时执行。

二、StreamTask 职责简介

如源码注释所示,StreamTask 的生命周期如下所示:

StreamTask 的生命周期

简单概括分为三个阶段:初始化、run、close。

初始化阶段包括:Operator 的配置、task 特定的初始化、初始化算子的 State、open-operators。

做 Flink 开发的同学应该都知道:自定义一个 Function 时可以实现 RichFunction,实现 open 方法,然后 Job 启动时,就会调用 open 方法做一些初始化操作。
open-operators 指的是 StreamTask 在初始化阶段,会调用所有实现了 RichFunction 算子 的 open 方法。

run 阶段:主要就是数据处理了。

close 阶段:做一些关闭操作,例如调用算子的 close 方法等,并做一些清理工作。

三、StreamTask 的初始化

StreamTask 的整个流程都在 invoke 方法中,直接从 invoke 方法开始分析。invoke 方法就是上面介绍的三个阶段:初始化、run、close。初始化阶段做了很多事情,有些直接略过了(例如:创建线程池等),当然初始化阶段重要的操作会深入分析。

invoke 中初始化相关的代码做了部分删减,如下所示:

asyncOperationsThreadPool = Executors.newCachedThreadPool(new ExecutorThreadFactory("AsyncOperations", uncaughtExceptionHandler));

// 创建 StateBackend, 优先从 app 的设置中去加载,再去 config 中去加载,
// 都没有配置,则创建默认的 MemoryStateBackend
stateBackend = createStateBackend();
checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID());

operatorChain = new OperatorChain<>(this, recordWriters);
headOperator = operatorChain.getHeadOperator();

// task 特定的初始化,例如 当前 StreamTask 有 input 的情况下,会初始化 inputProcessor
init();

synchronized (lock) {
 // 循环遍历,对该 task 所有 Operator 进行状态初始化,
 // 包括初始化 StateBackend ,并调用 udf 的 initializeState 方法
 initializeState();
 openAllOperators();
}

初始化部分代码较多,下面主要介绍几部分:

  • createStateBackend
  • init
  • initializeState();
  • openAllOperators();

1. createStateBackend

见名之意,该方法就是创建 StateBackend,Flink 目前支持三种 StateBackend:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。该方法决定了当前 Job 具体要创建哪种 StateBackend。

源码如下:

// (1) the application defined state backend has precedence
// 代码中创建了 StateBackend,则按照代码中配置来
if (fromApplication != null) {
 // see if this is supposed to pick up additional configuration parameters
 if (fromApplication instanceof ConfigurableStateBackend) {
  // needs to pick up configuration
  if (logger != null) {
   logger.info("Configuring application-defined state backend with job/cluster config");
  }

  backend = ((ConfigurableStateBackend) fromApplication).configure(config, classLoader);
 }
 else {
  // keep as is!
  backend = fromApplication;
 }
}
else {
 // (2) check if the config defines a state backend
 // 代码中没有配置,按照 配置文件来:即按照 flink-conf.yaml 文件中的配置来
 final StateBackend fromConfig = loadStateBackendFromConfig(config, classLoader, logger);
 if (fromConfig != null) {
  backend = fromConfig;
 }
 else {
  // (3) use the default
  // 代码中没有配置,配置文件也没有配置,则创建默认的 MemoryStateBackend
  backend = new MemoryStateBackendFactory().createFromConfig(config, classLoader);
 }
}

整体流程比较简单:

  • 代码中创建了 StateBackend,则按照代码中配置来
  • 代码中没有配置,按照 配置文件来:即按照 flink-conf.yaml 文件中的配置来
  • 代码中没有配置,配置文件也没有配置,则创建默认的 MemoryStateBackend

2. init

init 运行 task 特定的初始化,例如当前 StreamTask 有 input 的情况下,会初始化 inputProcessor 读取并处理数据。关于 inputProcessor 会在 run 部分重点介绍,这里先略过。

3. initializeState

initializeState 方法源码:

// 循环遍历,对该 task 所有 Operator 进行状态初始化,
// 包括初始化 StateBackend ,并调用 udf 的 initializeState 方法
private void initializeState() throws Exception {

 StreamOperator<?>[] allOperators = operatorChain.getAllOperators();

 for (StreamOperator<?> operator : allOperators) {
  if (null != operator) {
   operator.initializeState();
  }
 }
}

源码逻辑比较简单:直接调用当前 StreamTask 的 operatorChain 中所有 StreamOperator 的 initializeState 方法。假如当前 operatorChain 包含了 MapFunction、FilterFunction,两个算子将会被封装在 StreamMap 和 StreamFilter 中,那么此时就会调用这两个算子所对应的 StreamOperator 的 initializeState 方法,根据继承,最后调用的是 AbstractStreamOperator 的无参 initializeState() 方法。

这里专门强调无参 initializeState() 方法,是因为 AbstractStreamOperator 中还有一个有参的 initializeState(StateInitializationContext context) 方法,不要混淆。

注:有参的 initializeState 方法参数类型较长,下文将缩写为 initializeState(context) ;无参的 initializeState 方法继续用 initializeState() 表示。

AbstractStreamOperator 类的 initializeState() 方法介绍

initializeState 方法的简洁版源码如下:

public final void initializeState() throws Exception {
 final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());

  // 创建 StreamTaskStateInitializerImpl
 final StreamTaskStateInitializer streamTaskStateManager =
  Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer());

 // 使用 StreamTaskStateInitializerImpl 初始化
  // 各种 operatorStateBackend 和 keyedStateBackend,
 // 并从 Checkpoint 处恢复 State
 final StreamOperatorStateContext context =
  streamTaskStateManager.streamOperatorStateContext(XXX);

 try {
    // new Context 
  StateInitializationContext initializationContext = new 
      StateInitializationContextImpl(XXX);

  /**
   * 重点关注 AbstractUdfStreamOperator,它重写了 initializeState(context) 方法,
   * 去真正调用 各个 udf 的 initializeState 方法,
   */
  initializeState(initializationContext);
 } finally {
    XXX
 }
}

initializeState 方法主要完成两个工作:

  • 1、初始化 KeyedStateBackend 和 OperatorStateBackend,并从 Checkpoint 处恢复 State
  • 2、如果封装了 udf,则调用 udf 的 initializeState 方法(前提是 userFunction 实现了 CheckpointedFunction 接口)

源码流程:创建 StreamTaskStateInitializer。StreamTaskStateInitializer 只有一个实现类:StreamTaskStateInitializerImpl,所以会创建 StreamTaskStateInitializerImpl。创建时,将之前初始化好的 StateBackend 传递给 StreamTaskStateInitializerImpl,然后调用 streamOperatorStateContext 方法初始化 KeyedStateBackend 和 OperatorStateBackend。

下面重点关注 StreamTaskStateInitializerImpl 类的 streamOperatorStateContext 方法,源码如下所示:

public StreamOperatorStateContext streamOperatorStateContext(XXX){
// -------------- 初始化 Keyed State Backend --------------
 keyedStatedBackend = keyedStatedBackend(XXX);

 // -------------- 初始化 Operator State Backend --------------
 operatorStateBackend = operatorStateBackend(XXX);

 // -------------- Raw State 相关操作 --------------
 rawKeyedStateInputs = rawKeyedStateInputs(XXX);
 streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);

 rawOperatorStateInputs = rawOperatorStateInputs(XXX);
 streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);

 // -------------- Internal Timer Service Manager --------------
 timeServiceManager = internalTimeServiceManager(keyedStatedBackend, keyContext, rawKeyedStateInputs);

 // -------------- Preparing return value --------------

 return new StreamOperatorStateContextImpl(
  prioritizedOperatorSubtaskStates.isRestored(),
  operatorStateBackend,
  keyedStatedBackend,
  timeServiceManager,
  rawOperatorStateInputs,
  rawKeyedStateInputs);
}

首先会初始化 Keyed State Backend 和 Operator State Backend,Flink 还支持 Raw 类型的 State,基本用不到,除非 Flink 内的 Managed State 不能满足作业的需求。重点关注 Keyed State Backend 和 Operator State Backend 的初始化。keyedStatedBackend 方法用于初始化 keyedStatedBackend,operatorStateBackend 方法用于初始化 operatorStateBackend。

StateBackend 与 keyedStateBackend 以及 operatorStateBackend 之间的关系

介绍初始化源码之前,先介绍一下 StateBackend 与 keyedStateBackend 以及 operatorStateBackend 之间的关系。我们都知道 Flink 中支持两种类型的 State,即:KeyedState 和 OperatorState;Flink 目前支持三种状态后端存储,即:Memory、Fs 和 RocksDB。所以 StateBackend 有三种实现即:MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend。每种 StateBackend 都要支持 KeyedState 和 OperatorState,所以每种 StateBackend 要负责创建出自己相应的 keyedStateBackend 以及 operatorStateBackend。具体 KeyedState 与存储系统如何交互是由 keyedStateBackend 完成的,具体 OperatorState 与存储系统如何交互是由 operatorStateBackend。例如 RocksDBStateBackend 会创建出 RocksDBKeyedStateBackend,每个 RocksDBKeyedStateBackend 会持有 RocksDB 数据库实例,然后 Flink 引擎就可以与 RocksDB 进行交互了。

简言之:根据用户配置创建出不同类型的 StateBackend,然后不同的 StateBackend 再创建出对应的 keyedStateBackend 以及 operatorStateBackend,keyedStateBackend 和 operatorStateBackend 会真正的存储状态数据。

每种 StateBackend 到底会创建出哪种 keyedStateBackend 和哪种 operatorStateBackend 呢?这里引用 Flink 社区分享的图:

StateBackend 与 keyedStateBackend 以及 operatorStateBackend 的映射关系

图中可以看出,Memory 和 Fs 会创建出 HeapKeyedStateBackend,RocksDB 会创建出 RocksDBKeyedStateBackend。无论哪种 StateBackend,都会使用 DefaultOperatorStateBackend。这里也验证了一点:RocksDB 数据库中只会存储 KeyedState,不会存储 OperatorState。因为 RocksDBStateBackend 对应的 OperatorState 的存储也是基于内存的。读到这里,读者应该理解 StateBackend 与 keyedStateBackend 以及 operatorStateBackend 之间的关系了。

初始化 keyedStateBackend 流程

下面重点关注初始化 keyedStateBackend 的流程,keyedStatedBackend 方法源码如下所示:

protected <K> AbstractKeyedStateBackend<K> keyedStatedBackend(XXX){
 // 如果不是 KeyedStream 直接就返回,即:不创建 keyedStatedBackend
 if (keySerializer == null) {
  return null;
 }
  // 计算当前 subtask 负责的 KeyGroupRange
 final KeyGroupRange keyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
  taskInfo.getMaxNumberOfParallelSubtasks(),
  taskInfo.getNumberOfParallelSubtasks(),
  taskInfo.getIndexOfThisSubtask());

 BackendRestorerProcedure<AbstractKeyedStateBackend<K>, KeyedStateHandle> 
    backendRestorer =
  new BackendRestorerProcedure<>(
   // 这里是函数式接口,并不是去 create KeyedStateBackend
   (stateHandles) -> stateBackend.createKeyedStateBackend(XXX),
   backendCloseableRegistry,
   logDescription);

 try {
  // 这里去 create StateBackend 并恢复状态文件
  return backendRestorer.createAndRestore(
   // 获取 StateHandle 的集合
   prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState());
 } finally {
  if (backendCloseableRegistry.unregisterCloseable(cancelStreamRegistryForRestore)) {
   IOUtils.closeQuietly(cancelStreamRegistryForRestore);
  }
 }
}

可以看到方法第一行:if (keySerializer == null) {return null;} 表示如果不是 KeyedStream 直接就返回,即:不创建 keyedStatedBackend。计算当前 subtask 负责的 KeyGroupRange,然后创建 BackendRestorerProcedure 类型的 backendRestorer,且将 (stateHandles) -> stateBackend.createKeyedStateBackend(XXX) 传递给 BackendRestorerProcedure 构造器的 instanceSupplier,instanceSupplier 是一个函数式接口,用于创建 KeyedStateBackend。

backendRestorer.createAndRestore 方法会循环调用 attemptCreateAndRestore 恢复一个个 State, attemptCreateAndRestore 方法中调用 instanceSupplier(函数式接口)真正的创建 keyedStatedBackend。

instanceSupplier 函数式接口的工作:调用相应 StateBackend 的 createKeyedStateBackend 方法创建 AbstractKeyedStateBackend。如果 stateBackend 是 RocksDBStateBackend,就会创建出 RocksDBKeyedStateBackend。如果是 Memory 或 Fs 则会创建出 HeapKeyedStateBackend。在创建完 KeyedStateBackend 的过程中,会从 Checkpoint 中恢复状态到 Flink 引擎。

注:具体 KeyedStateBackend 恢复状态的流程比较复杂,每种 StateBackend 的恢复流程都不同,同时还牵扯到从 dfs 中拉取状态数据用于恢复,所以后续会有单独的博客介绍恢复流程。

初始化 operatorStateBackend 流程

operatorStateBackend 方法用于初始化 operatorStateBackend。operatorStateBackend 初始化流程与 keyedStateBackend 比较类似,区别在于最后调用的是 stateBackend.createOperatorStateBackend()。

三种 StateBackend 的 createOperatorStateBackend 方法非常相似,源码如下:

public OperatorStateBackend createOperatorStateBackend(
 Environment env,
 String operatorIdentifier,
 @Nonnull Collection<OperatorStateHandle> stateHandles,
 CloseableRegistry cancelStreamRegistry) throws Exception {

 return new DefaultOperatorStateBackendBuilder(
  env.getUserClassLoader(),
  env.getExecutionConfig(),
  isUsingAsynchronousSnapshots(),
  stateHandles,
  cancelStreamRegistry).build();
}

这里无论是何种 StateBackend,都会创建出 DefaultOperatorStateBackend。也就验证了一点:RocksDB 只支持 KeyedState,OperatorState 都是按照 Heap 的方案。

具体 new DefaultOperatorStateBackend 的过程由建造器 DefaultOperatorStateBackendBuilder 完成,build 的功能是创建出 OperatorStateBackend,并从 Checkpoint 中将 State 恢复到 Flink 引擎端。整体流程比较复杂,这里不阐述会在后面博客中单独介绍。

如果当前是 udf,则调用 udf 的 initializeState 方法

接下来重点又回到了 AbstractStreamOperator 类的 initializeState() 方法中,根据创建好的 operatorStateBackend 和 keyedStateStore 构造 Context。然后调用 initializeState(Context) 方法,之前说过要区分有参和无参的 initializeState 方法,现在执行到了有参的 initializeState(Context) 方法。

前面介绍过所有自定义的 UDF 都被包装起来,例如 MapFunction 都被 StreamMap 类包装起来,且这些 UDF 的包装类都继承自 AbstractUdfStreamOperator,AbstractStreamOperator 类的 initializeState(Context) 方法没有任何实现,这里重点关注 AbstractUdfStreamOperator 重写的 initializeState(Context) 方法。

AbstractUdfStreamOperator 类的 initializeState 方法源码:

@Override
public void initializeState(StateInitializationContext context) throws Exception {
 // super 表示 AbstractStreamOperator 类
 super.initializeState(context);
 // 调用 udf 的 initializeState 方法
 StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}

super 表示 AbstractStreamOperator 类,即调用 AbstractStreamOperator 类的 initializeState(context) 空方法,重点在于工具类 StreamingFunctionUtils 的 restoreFunctionState(context, userFunction) 方法,restoreFunctionState 方法内会对包装的 udf 进行解包装,然后执行 tryRestoreFunction 方法。

tryRestoreFunction 方法部分源码如下所示:

private static boolean tryRestoreFunction(
  StateInitializationContext context,
  Function userFunction) throws Exception {
 // 调用 udf 的  initializeState 方法,
  // 前提是 userFunction instanceof CheckpointedFunction
 if (userFunction instanceof CheckpointedFunction) {
  ((CheckpointedFunction) userFunction).initializeState(context);
  return true;
 }
}

tryRestoreFunction 会对 userFunction 进行判断,如果实现了 CheckpointedFunction 接口,就调用 userFunction 的 initializeState(context) 对状态进行初始化。使用过 CheckpointedFunction 接口的同学应该清楚:自定义的 Function 可以实现 CheckpointedFunction 接口,重写 initializeState 方法,做一些状态的初始化操作。例如在 initializeState 方法申请创建 OperatorState。

udf 使用 initializeState 的经典案例就是 FlinkKafkaConsumerBase 类,FlinkKafkaConsumerBase 类实现了 CheckpointedFunction 接口,在 initializeState 方法中定义了 OperatorState 类型的 ListState,将 Flink 消费 Kafka 的 offset 信息维护在 ListState 中。每次启动任务时,都会从 ListState 中恢复之前的 offset,并从 offset 处继续消费。

initializeState 小结

initializeState 过程比较复杂,总的来说就两个事情:

  • 1、 创建相应的 keyedStateBackend 和 OperatorStateBackend,并从 Checkpoint 处恢复 State(具体恢复流程后续讲述)
  • 2、 如果 udf 实现了 CheckpointedFunction 接口,则调用 udf 的 initializeState 方法

4. openAllOperators

此时回到了 StreamTask 初始化流程的下一步:openAllOperators。openAllOperators 方法比较简单,源码如下所示:

private void openAllOperators() throws Exception {
 for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
  if (operator != null) {
   operator.open();
  }
 }
}

openAllOperators 方法会调用 OperatorChain 中所有 StreamOperator 的 open 方法,通过继承关系,最后调用的仍然是 AbstractUdfStreamOperator 类的 open 方法。AbstractUdfStreamOperator 类的 open 方法源码如下所示:

// AbstractUdfStreamOperator 类的 open 方法
@Override
public void open() throws Exception {
 super.open();
 FunctionUtils.openFunction(userFunction, new Configuration());
}

// FunctionUtils 类的 openFunction 方法
public static void openFunction(Function function
                                , Configuration parameters) {
 if (function instanceof RichFunction) {
  RichFunction richFunction = (RichFunction) function;
  richFunction.open(parameters);
 }
}

AbstractUdfStreamOperator 类的 open 方法调用 FunctionUtils 类的 openFunction 方法,openFunction 方法中会判断当前 userFunction 是否实现了 RichFunction 接口,如果实现了 RichFunction 接口,则调用 userFunction 的 open 方法。

openAllOperators 小结

openAllOperators 的流程比较简单,就是判断 userFunction 是否实现了 RichFunction 接口,在 Flink 中实现了 RichFunction 表示富函数,可以定义 open 和 close 相关的逻辑,在算子初始化或者关闭时会被调用。

四、 思考题

如下代码所示,开发 Flink Job 时 dataStream keyBy 后连续跟 map、filter、flatMap 三个算子,请问这三个自定义的 Function 内都可以使用 Flink 的 ValueState 吗?

dataStream.keyby(_._1)
  .map(new MyMapFunction())
  .filter(new MyFilterFunction())
  .flatMap(new MyFlatMapFunction())

先说答案吧:在 MyMapFunction 中可以使用 ValueState,在 MyFilterFunction 和 MyFlatMapFunction 中不能使用 ValueState。如果在 MyFilterFunction 和 MyFlatMapFunction 中定义 ValueState 或 MapState,都会报错,会显示 keyedStateBackend 为 null。为什么呢?

首先 Flink 中只有 KeyedState 才支持 ValueState 和 MapState,OperatorState 不支持 ValueState 和 MapState。只要对 KeyedStream 的操作才能使用 KeyedState,KeyedState 表示相同的 key 共享同一个 State,普通的 DataStream 中没有 key 的概念不能使用 KeyedState。

DataStream 的 keyBy 方法源码如下所示,由源码 DataStream 可以看到,DataStream 的 keyBy 方法会返回 KeyedStream,KeyedStream 是 DataStream 的子类,KeyedStream 经过 map 转换后又会变成 DataStream。所以上图中只有 MyMapFunction 是基于 KeyedStream 操作的,MyFilterFunction 和 MyFlatMapFunction 都是基于 DataStream 操作的,没有 key 的概念,因此不能使用 KeyedState,即不能使用 ValueState。

// DataStream 的 keyBy 方法源码
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
 Preconditions.checkNotNull(key);
 return new KeyedStream<>(this, clean(key));
}

MyFilterFunction 和 MyFlatMapFunction 中定义 ValueState 或 MapState 时,为什么会报出 keyedStateBackend 为 null 呢?

回顾一下创建 keyedStateBackend 的流程,第一步就是 if (keySerializer == null) {return null;},如果不是 KeyedStream 直接就返回,即:不创建 keyedStatedBackend。所以出现了上述现象。

五、 总结

本文介绍了 StreamTask 类的基本功能,StreamTask 映射到 ExecutionGraph 中对应的是一个 subtask,每个 StreamTask 运行一个或多个 Chain 在一起的 StreamOperator,这些 Operator 将会在一个线程内同步执行。随后介绍了 StreamTask 的生命周期,主要包括了初始化、run、close 三个流程。后半部分重点描述了 StreamTask 初始化的过程,主要是:createStateBackend、init、initializeState()、openAllOperators() 四个过程。

后续会给大家详细介绍 initializeState 部分如何从 Checkpoint 中恢复 State,也会详细介绍 run 流程到底是如何处理一条条数据的。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容