有Flink开发经验的用户应该知道,在flink-conf.yaml文件进行作业参数配置后,通过如下指令以Perjob模式,将作业提交到Yarn集群运行。
flink 1.12版本,命令行提交指令:
./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar
早期版本,命令行提交指令:
./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar
本文将对命令行后的代码逻辑进行讲解,让用户了解在Client端,Flink如何解析命令行参数、构建jobGraph、并提交到远程Yarn集群执行。
运行架构图
当 Flink 集群启动后,首先会启动一个 JobManger 和一个或多个的 TaskManager。由 Client 提交任务给 JobManager,JobManager 再调度任务到各个 TaskManager 去执行,然后 TaskManager 将心跳和统计信息汇报给 JobManager。TaskManager 之间以流的形式进行数据的传输。

- Client:提交Job的客户端,可以是运行在任何机器上,只要与JobManager环境连通即可。提交Job后,Client可以结束进程,也可以不结束并等待结果返回。在命令行中通过--detached参数指定。
- JobManager:主要负责调度Job并协调Task做Checkpoint。从Client处接收到Job和JAR包等资源后,会生成优化后的执行计划,并以Task的单元调度到各个TaskManager去执行。
- TaskManager:启动时就设置好了槽位数(Slot),每个 slot 能启动一个Task线程。从 JobManager 处接收需要部署的Task,部署启动后,与自己的上游建立 Netty 连接,接收数据并处理。
由此可见,Flink在Client端,基于用户作业构建出执行计划JobGraph,并将该JobGraph交由JobManager进
行调度执行。
Yarn 部署模式
基于Yarn,Flink支持三种模式的作业执行,即Application Mode、Per-Job Cluster Mode、Session Mode。
- Per-Job Cluster Mode:为每一个Flink作业单独启动一个Flink集群,作业停止后对应的Flink集群也停止。程序部署前需要从Client端上传Flink程序包及用户程序。生产中常用该模式。
- Session Mode:预先启动一个常驻的Flink集群,多个Flink作业公用一个集群。
- Application Mode:Per-Job Cluster集群模式的升级版,旨在减少客户端传输文件的网络开销。Flink程序包及用户程序均可存储在Hdfs,从JobManager端构建出JobGraph。
CliFrontend处理逻辑
flink run指令执行后,CliFrontend#Main方法处理具体的提交流程。在提交过程中主要围绕一下几个阶段进行,即构建最终使用的Configuration配置项、基于用户程序构建JobGraph、根据提交模式构建出ClusterDescriptor、由ClusterDescriptor部署生成的JobGraph。
构建配置信息
提交动作执行时,首先,基于flink-conf.yaml文件构建出全局默认的Configuration配置,接着,通过接收的命令指令覆盖默认的配置项。同时,也接收和程序执行相关的参数。
- find the configuration directory:
查找顺序:
- 查找环境变量中的FLINK_CONF_DIR参数值。
- 判断../conf下是否存在。
- 判断./conf下是否存在。
- load the global configuration:
- 加载configuration directory下的flink-conf.yaml文件,作为默认的configuration。
- load the custom command lines: 构建CustomCommandLine用来接收命令行参数,其中一个为Active。
- add GenericCLI: 新版本指令接收器, 接收-t -D参数。
- add FlinkYarnSessionCli。 向YarnSession集群提交作业时,接收的参数。
- add DefaultCLI。 向Standalone集群提交作业时,接收的参数。
- validate and get active commandLine:根据参数信息,筛选出active commandLine。
通过使用新版本命令行 -t yarn-per-job,确定active commandLine为GenericCLI。
- build ProgramOptions: 解析命令行中与程序相关的参数,填充给ProgramOptions。
- get effective Configuration: 通过调用active CommandLine的toConfiguration方法,构建出最终有效的
Configuration。
构建JobGraph
在构建配置信息过程中,生成了programOptions包含程序执行使用的参数信息,生成effectiveConfiguration包含作业执行使用的配置信息。通过programOptions及effectiveConfiguration生成PackagedProgram,代表生成JobGraph使用的外部配置,例如JarFile、mainClas以及为加载用户程序构建的自定义类加载器userCodeClassLoader。
- getPackagedProgram(programOptions, effectiveConfiguration)
PackagedProgram buildProgram(final ProgramOptions runOptions, final Configuration configuration)
throws FileNotFoundException, ProgramInvocationException, CliArgsException {
runOptions.validate();
String[] programArgs = runOptions.getProgramArgs();
String jarFilePath = runOptions.getJarFilePath();
List<URL> classpaths = runOptions.getClasspaths();
// Get assembler class
String entryPointClass = runOptions.getEntryPointClassName();
File jarFile = jarFilePath != null ? getJarFile(jarFilePath) : null;
return PackagedProgram.newBuilder()
.setJarFile(jarFile)
.setUserClassPaths(classpaths)
.setEntryPointClassName(entryPointClass)
.setConfiguration(configuration)
.setSavepointRestoreSettings(runOptions.getSavepointRestoreSettings())
.setArguments(programArgs)
.build();
}
-
executeProgram。执行用户程序,生成Jobgraph,同时进行提交操作。
在早期的flink版本中,Jobgraph的构建只由PackagedProgramUtils#createJobGraph来完成,提交动作由CliFronted进程从外部来完成。新版本中,CliFronted进程负责外部提交环境的准备,JobGraph的生成、提交均由StreamExecutionEnvironment#execute方法触发。
CliFronted进程准备外部提交环境相关逻辑:
// execute Program
protected void executeProgram(.....) {
ClientUtils.executeProgram(
## *** 根据执行环境通过SPI加载PipelineExecutor(当前环境YarnJobClusterExecutor),
## *** 并最终创建ClusterDescriptor来进行作业提交。
new DefaultExecutorServiceLoader(),
configuration,
program,
false,
false
);
}
public static void executeProgram(....) {
final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
// 1. 准备作业执行依赖的外部执行环境
ContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
StreamContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
try {
//2. 通过反射触发用户程序的执行
program.invokeInteractiveModeForExecution();
} finally {
ContextEnvironment.unsetAsContext();
StreamContextEnvironment.unsetAsContext();
}
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
用户程序触发构建JobGraph及提交流程:
- 通过SPI加载PipelineExecutorFactory。
- 根据PipelineExecutorFactory创建出PipelineExecutor。
- 由PipelineExecutor从StreamGraph中生成JobGraph,并提交远程集群。
public JobExecutionResult execute(String jobName) throws Exception {
return execute(getStreamGraph(jobName));
}
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
1. SPI加载PipelineExecutorFactory
final PipelineExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);
2. 创建PipelineExecutor: getExecutor(configuration)
3. 完成创建jobGraph并提交: execute
CompletableFuture<JobClient> jobClientFuture =
executorFactory
.getExecutor(configuration)
.execute(streamGraph, configuration, userClassloader);
.....
}
AbstractJobClusterExecutor#execute:
public CompletableFuture<JobClient> execute(...)
throws Exception {
1. 从StreamGraph中提取JobGraph.
final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
2. 由构建出的ClusterDescriptor 提交JobGraph.
try (final ClusterDescriptor<ClusterID> clusterDescriptor =
clusterClientFactory.createClusterDescriptor(configuration)) {
final ExecutionConfigAccessor configAccessor =
ExecutionConfigAccessor.fromConfiguration(configuration);
final ClusterSpecification clusterSpecification =
clusterClientFactory.getClusterSpecification(configuration);
final ClusterClientProvider<ClusterID> clusterClientProvider =
clusterDescriptor.deployJobCluster(
clusterSpecification, jobGraph, configAccessor.getDetachedMode());
LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
return CompletableFuture.completedFuture(
new ClusterClientJobClientAdapter<>(
clusterClientProvider, jobGraph.getJobID(), userCodeClassloader));
}
}
构建ClusterDescriptor
在上一阶段构建JobGraph过程中,也同时创建了ClusterDescriptor,ClusterDescriptor是对作业要部署到Yarn、Mesos、k8s集群的描述,包含了要提交集群的相关信息。
以YarnClusterDescriptor为例,查看其生成过程:
- 通过DefaultExecutorServiceLoader#getExecutorFactory,创建YarnJobClusterExecutorFactory。
public PipelineExecutorFactory getExecutorFactory(final Configuration configuration) {
checkNotNull(configuration);
1. 通过SPI加载当前环境下PipelineExecutorFactory的实现
final ServiceLoader<PipelineExecutorFactory> loader =
ServiceLoader.load(PipelineExecutorFactory.class);
final List<PipelineExecutorFactory> compatibleFactories = new ArrayList<>();
final Iterator<PipelineExecutorFactory> factories = loader.iterator();
while (factories.hasNext()) {
try {
final PipelineExecutorFactory factory = factories.next();
if (factory != null && factory.isCompatibleWith(configuration)) {
compatibleFactories.add(factory);
}
} catch (Throwable e) {
if (e.getCause() instanceof NoClassDefFoundError) {
LOG.info("Could not load factory due to missing dependencies.");
} else {
throw e;
}
}
}
return compatibleFactories.get(0);
}
2. ClusterExecutorFactory#getExecutor创建YarnJobClusterExecutor。YarnJobClusterExecutor的createClusterDescriptor创建出YarnClusterDescriptor。
YarnJobClusterExecutorFactory
@Override
public PipelineExecutor getExecutor(@Nonnull final Configuration configuration) {
try {
return new YarnJobClusterExecutor();
} catch (NoClassDefFoundError e) {
throw new IllegalStateException(YarnDeploymentTarget.ERROR_MESSAGE);
}
}
3. YarnClusterClientFactory#createClusterDescriptor 构建逻辑。
- 从本地查找log4j.propertis文件路径,并填充到$internal.yarn.log-config-file。
- 创建指定集群的YarnClient,构建出YarnClusterDescriptor。
public YarnClusterDescriptor createClusterDescriptor(Configuration configuration) {
// 1. 查找日志文件,并填充到$internal.yarn.log-config-file参数
final String configurationDirectory = configuration.get(DeploymentOptionsInternal.CONF_DIR);
YarnLogConfigUtil.setLogConfigFileInConfig(configuration, configurationDirectory);
return getClusterDescriptor(configuration);
}
private YarnClusterDescriptor getClusterDescriptor(Configuration configuration) {
// 2. 创建YarnClient
final YarnClient yarnClient = YarnClient.createYarnClient();
final YarnConfiguration yarnConfiguration =
Utils.getYarnAndHadoopConfiguration(configuration);
// 3. 初始化要链接的集群配置
yarnClient.init(yarnConfiguration);
yarnClient.start();
// 4. 创建YarnClusterDescriptor
return new YarnClusterDescriptor(
configuration,
yarnConfiguration,
yarnClient,
YarnClientYarnClusterInformationRetriever.create(yarnClient),
false);
}
接下来查看Flink如何获取hadoop conf,从而将作业提交到指定集群?
- 从flink.yarn.开头的配置项中读取。
- 从环境变量中的HADOOP_HOME文件夹查找。
- 从Flink configuration中的fs.hdfs.hdfsdefault参数项查找,已废弃。
- 从环境变量中的HADOOP_CONF_DIR文件夹下查找。
- 使用flink Configuration中的以flink.hadoop 为前缀的配置项。
Utils.getYarnAndHadoopConfiguration(configuration);
public static YarnConfiguration getYarnAndHadoopConfiguration(..) {
// 1. 从flink.yarn开头的配置项。
YarnConfiguration yarnConfig = getYarnConfiguration(flinkConfig);
// 2.
yarnConfig.addResource(HadoopUtils.getHadoopConfiguration(flinkConfig));
return yarnConfig;
}
public static Configuration getHadoopConfiguration(
org.apache.flink.configuration.Configuration flinkConfiguration) {
Configuration result = new HdfsConfiguration();
boolean foundHadoopConfiguration = false
// Approach 1: HADOOP_HOME environment variables
String[] possibleHadoopConfPaths = new String[2];
final String hadoopHome = System.getenv("HADOOP_HOME");
if (hadoopHome != null) {
possibleHadoopConfPaths[0] = hadoopHome + "/conf";
possibleHadoopConfPaths[1] = hadoopHome + "/etc/hadoop"; // hadoop 2.2
}
for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
if (possibleHadoopConfPath != null) {
foundHadoopConfiguration = addHadoopConfIfFound(result, possibleHadoopConfPath);
}
}
// Approach 2: Flink configuration (deprecated)
final String hdfsDefaultPath =
flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
if (hdfsDefaultPath != null) {
result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
LOG.debug(
"Using hdfs-default configuration-file path from Flink config: {}",
hdfsDefaultPath);
foundHadoopConfiguration = true;
}
final String hdfsSitePath =
flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
if (hdfsSitePath != null) {
result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
LOG.debug(
"Using hdfs-site configuration-file path from Flink config: {}", hdfsSitePath);
foundHadoopConfiguration = true;
}
final String hadoopConfigPath =
flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
if (hadoopConfigPath != null) {
LOG.debug("Searching Hadoop configuration files in Flink config: {}", hadoopConfigPath);
foundHadoopConfiguration =
addHadoopConfIfFound(result, hadoopConfigPath) || foundHadoopConfiguration;
}
// Approach 3: HADOOP_CONF_DIR environment variable
String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
if (hadoopConfDir != null) {
LOG.debug("Searching Hadoop configuration files in HADOOP_CONF_DIR: {}", hadoopConfDir);
foundHadoopConfiguration =
addHadoopConfIfFound(result, hadoopConfDir) || foundHadoopConfiguration;
}
// Approach 4: Flink configuration
// add all configuration key with prefix 'flink.hadoop.' in flink conf to hadoop conf
for (String key : flinkConfiguration.keySet()) {
for (String prefix : FLINK_CONFIG_PREFIXES) {
if (key.startsWith(prefix)) {
String newKey = key.substring(prefix.length());
String value = flinkConfiguration.getString(key, null);
result.set(newKey, value);
LOG.debug(
"Adding Flink config entry for {} as {}={} to Hadoop config",
key,
newKey,
value);
foundHadoopConfiguration = true;
}
}
}
return result;
}
JobGraph提交部署
通过ClusterDescriptor#deployJobCluster将JobGraph提交到远程集群。YarnClusterDescriptor提交流程就是创建YarnApp使用的几部。具体可以对比 Hadoop: Writing YARN Applications。
### 相关参考
Flink 原理与实现:架构和拓扑概览)