Flink Client端作业处理流程

有Flink开发经验的用户应该知道,在flink-conf.yaml文件进行作业参数配置后,通过如下指令以Perjob模式,将作业提交到Yarn集群运行。

flink 1.12版本,命令行提交指令:
./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar

早期版本,命令行提交指令:
./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar 

本文将对命令行后的代码逻辑进行讲解,让用户了解在Client端,Flink如何解析命令行参数、构建jobGraph、并提交到远程Yarn集群执行。

运行架构图

当 Flink 集群启动后,首先会启动一个 JobManger 和一个或多个的 TaskManager。由 Client 提交任务给 JobManager,JobManager 再调度任务到各个 TaskManager 去执行,然后 TaskManager 将心跳和统计信息汇报给 JobManager。TaskManager 之间以流的形式进行数据的传输。

image.png
image.png
  • Client:提交Job的客户端,可以是运行在任何机器上,只要与JobManager环境连通即可。提交Job后,Client可以结束进程,也可以不结束并等待结果返回。在命令行中通过--detached参数指定。
  • JobManager:主要负责调度Job并协调Task做Checkpoint。从Client处接收到Job和JAR包等资源后,会生成优化后的执行计划,并以Task的单元调度到各个TaskManager去执行。
  • TaskManager:启动时就设置好了槽位数(Slot),每个 slot 能启动一个Task线程。从 JobManager 处接收需要部署的Task,部署启动后,与自己的上游建立 Netty 连接,接收数据并处理。

由此可见,Flink在Client端,基于用户作业构建出执行计划JobGraph,并将该JobGraph交由JobManager进
行调度执行。

Yarn 部署模式

基于Yarn,Flink支持三种模式的作业执行,即Application Mode、Per-Job Cluster Mode、Session Mode。

  • Per-Job Cluster Mode:为每一个Flink作业单独启动一个Flink集群,作业停止后对应的Flink集群也停止。程序部署前需要从Client端上传Flink程序包及用户程序。生产中常用该模式。
  • Session Mode:预先启动一个常驻的Flink集群,多个Flink作业公用一个集群。
  • Application Mode:Per-Job Cluster集群模式的升级版,旨在减少客户端传输文件的网络开销。Flink程序包及用户程序均可存储在Hdfs,从JobManager端构建出JobGraph。

CliFrontend处理逻辑

flink run指令执行后,CliFrontend#Main方法处理具体的提交流程。在提交过程中主要围绕一下几个阶段进行,即构建最终使用的Configuration配置项、基于用户程序构建JobGraph、根据提交模式构建出ClusterDescriptor、由ClusterDescriptor部署生成的JobGraph。

构建配置信息

提交动作执行时,首先,基于flink-conf.yaml文件构建出全局默认的Configuration配置,接着,通过接收的命令指令覆盖默认的配置项。同时,也接收和程序执行相关的参数。

  1. find the configuration directory:

查找顺序:

  1. 查找环境变量中的FLINK_CONF_DIR参数值。
  2. 判断../conf下是否存在。
  3. 判断./conf下是否存在。
  4. load the global configuration:
    1. 加载configuration directory下的flink-conf.yaml文件,作为默认的configuration。
  5. load the custom command lines: 构建CustomCommandLine用来接收命令行参数,其中一个为Active。
    1. add GenericCLI: 新版本指令接收器, 接收-t -D参数。
    2. add FlinkYarnSessionCli。 向YarnSession集群提交作业时,接收的参数。
    3. add DefaultCLI。 向Standalone集群提交作业时,接收的参数。
  6. validate and get active commandLine:根据参数信息,筛选出active commandLine。

通过使用新版本命令行 -t yarn-per-job,确定active commandLine为GenericCLI。

  1. build ProgramOptions: 解析命令行中与程序相关的参数,填充给ProgramOptions。
  2. get effective Configuration: 通过调用active CommandLine的toConfiguration方法,构建出最终有效的
    Configuration。

构建JobGraph

在构建配置信息过程中,生成了programOptions包含程序执行使用的参数信息,生成effectiveConfiguration包含作业执行使用的配置信息。通过programOptions及effectiveConfiguration生成PackagedProgram,代表生成JobGraph使用的外部配置,例如JarFile、mainClas以及为加载用户程序构建的自定义类加载器userCodeClassLoader。

  1. getPackagedProgram(programOptions, effectiveConfiguration)
  PackagedProgram buildProgram(final ProgramOptions runOptions, final Configuration configuration)
            throws FileNotFoundException, ProgramInvocationException, CliArgsException {
        runOptions.validate();

        String[] programArgs = runOptions.getProgramArgs();
        String jarFilePath = runOptions.getJarFilePath();
        List<URL> classpaths = runOptions.getClasspaths();

        // Get assembler class
        String entryPointClass = runOptions.getEntryPointClassName();
        File jarFile = jarFilePath != null ? getJarFile(jarFilePath) : null;

        return PackagedProgram.newBuilder()
                .setJarFile(jarFile)
                .setUserClassPaths(classpaths)
                .setEntryPointClassName(entryPointClass)
                .setConfiguration(configuration)
                .setSavepointRestoreSettings(runOptions.getSavepointRestoreSettings())
                .setArguments(programArgs)
                .build();
    }
  1. executeProgram。执行用户程序,生成Jobgraph,同时进行提交操作。

    在早期的flink版本中,Jobgraph的构建只由PackagedProgramUtils#createJobGraph来完成,提交动作由CliFronted进程从外部来完成。新版本中,CliFronted进程负责外部提交环境的准备,JobGraph的生成、提交均由StreamExecutionEnvironment#execute方法触发。
    CliFronted进程准备外部提交环境相关逻辑:

// execute Program
protected void executeProgram(.....) {
  ClientUtils.executeProgram( 
    ## *** 根据执行环境通过SPI加载PipelineExecutor(当前环境YarnJobClusterExecutor),
    ## *** 并最终创建ClusterDescriptor来进行作业提交。
    new DefaultExecutorServiceLoader(), 
    configuration, 
    program, 
    false, 
    false
  );
}

 public static void executeProgram(....) {
    final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
    final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
    try {
        Thread.currentThread().setContextClassLoader(userCodeClassLoader);
        // 1. 准备作业执行依赖的外部执行环境
        ContextEnvironment.setAsContext(
                executorServiceLoader,
                configuration,
                userCodeClassLoader,
                enforceSingleJobExecution,
                suppressSysout);

        StreamContextEnvironment.setAsContext(
                executorServiceLoader,
                configuration,
                userCodeClassLoader,
                enforceSingleJobExecution,
                suppressSysout);

        try {
            //2. 通过反射触发用户程序的执行
            program.invokeInteractiveModeForExecution();
        } finally {
            ContextEnvironment.unsetAsContext();
            StreamContextEnvironment.unsetAsContext();
        }
    } finally {
        Thread.currentThread().setContextClassLoader(contextClassLoader);
    }
}  

用户程序触发构建JobGraph及提交流程:

  1. 通过SPI加载PipelineExecutorFactory。
  2. 根据PipelineExecutorFactory创建出PipelineExecutor。
  3. 由PipelineExecutor从StreamGraph中生成JobGraph,并提交远程集群。
 public JobExecutionResult execute(String jobName) throws Exception {
      return execute(getStreamGraph(jobName));
 }
 
 public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
      1. SPI加载PipelineExecutorFactory
    final PipelineExecutorFactory executorFactory =
            executorServiceLoader.getExecutorFactory(configuration);
        2. 创建PipelineExecutor:   getExecutor(configuration)
    3. 完成创建jobGraph并提交:     execute
    CompletableFuture<JobClient> jobClientFuture =
            executorFactory
                    .getExecutor(configuration)
                    .execute(streamGraph, configuration, userClassloader);
            .....
}
AbstractJobClusterExecutor#execute:
public CompletableFuture<JobClient> execute(...)
        throws Exception {
    1. 从StreamGraph中提取JobGraph.
    final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
      
    2. 由构建出的ClusterDescriptor 提交JobGraph.
    try (final ClusterDescriptor<ClusterID> clusterDescriptor =
            clusterClientFactory.createClusterDescriptor(configuration)) {
        final ExecutionConfigAccessor configAccessor =
                ExecutionConfigAccessor.fromConfiguration(configuration);

        final ClusterSpecification clusterSpecification =
                clusterClientFactory.getClusterSpecification(configuration);

        final ClusterClientProvider<ClusterID> clusterClientProvider =
                clusterDescriptor.deployJobCluster(
                        clusterSpecification, jobGraph, configAccessor.getDetachedMode());
        LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());

        return CompletableFuture.completedFuture(
                new ClusterClientJobClientAdapter<>(
                        clusterClientProvider, jobGraph.getJobID(), userCodeClassloader));
    }
}

构建ClusterDescriptor

在上一阶段构建JobGraph过程中,也同时创建了ClusterDescriptor,ClusterDescriptor是对作业要部署到Yarn、Mesos、k8s集群的描述,包含了要提交集群的相关信息。
以YarnClusterDescriptor为例,查看其生成过程:

  1. 通过DefaultExecutorServiceLoader#getExecutorFactory,创建YarnJobClusterExecutorFactory。
public PipelineExecutorFactory getExecutorFactory(final Configuration configuration) {
    checkNotNull(configuration);
    1. 通过SPI加载当前环境下PipelineExecutorFactory的实现
    final ServiceLoader<PipelineExecutorFactory> loader =
            ServiceLoader.load(PipelineExecutorFactory.class);

    final List<PipelineExecutorFactory> compatibleFactories = new ArrayList<>();
    final Iterator<PipelineExecutorFactory> factories = loader.iterator();
    while (factories.hasNext()) {
        try {
            final PipelineExecutorFactory factory = factories.next();
            if (factory != null && factory.isCompatibleWith(configuration)) {
                compatibleFactories.add(factory);
            }
        } catch (Throwable e) {
            if (e.getCause() instanceof NoClassDefFoundError) {
                LOG.info("Could not load factory due to missing dependencies.");
            } else {
                throw e;
            }
        }
    }

    return compatibleFactories.get(0);
}

2. ClusterExecutorFactory#getExecutor创建YarnJobClusterExecutor。YarnJobClusterExecutor的createClusterDescriptor创建出YarnClusterDescriptor。

   YarnJobClusterExecutorFactory
   @Override
    public PipelineExecutor getExecutor(@Nonnull final Configuration configuration) {
        try {
            return new YarnJobClusterExecutor();
        } catch (NoClassDefFoundError e) {
            throw new IllegalStateException(YarnDeploymentTarget.ERROR_MESSAGE);
        }
    }

3. YarnClusterClientFactory#createClusterDescriptor 构建逻辑。

  1. 从本地查找log4j.propertis文件路径,并填充到$internal.yarn.log-config-file。
  2. 创建指定集群的YarnClient,构建出YarnClusterDescriptor。
public YarnClusterDescriptor createClusterDescriptor(Configuration configuration) {
  // 1. 查找日志文件,并填充到$internal.yarn.log-config-file参数
  final String configurationDirectory = configuration.get(DeploymentOptionsInternal.CONF_DIR);
  YarnLogConfigUtil.setLogConfigFileInConfig(configuration, configurationDirectory);
  return getClusterDescriptor(configuration);
}

private YarnClusterDescriptor getClusterDescriptor(Configuration configuration) {
    // 2. 创建YarnClient
    final YarnClient yarnClient = YarnClient.createYarnClient();
    final YarnConfiguration yarnConfiguration =
            Utils.getYarnAndHadoopConfiguration(configuration);
    // 3. 初始化要链接的集群配置
    yarnClient.init(yarnConfiguration);
    yarnClient.start();
    // 4. 创建YarnClusterDescriptor
    return new YarnClusterDescriptor(
            configuration,
            yarnConfiguration,
            yarnClient,
            YarnClientYarnClusterInformationRetriever.create(yarnClient),
            false);
}

接下来查看Flink如何获取hadoop conf,从而将作业提交到指定集群?

  1. 从flink.yarn.开头的配置项中读取。
  2. 从环境变量中的HADOOP_HOME文件夹查找。
  3. 从Flink configuration中的fs.hdfs.hdfsdefault参数项查找,已废弃。
  4. 从环境变量中的HADOOP_CONF_DIR文件夹下查找。
  5. 使用flink Configuration中的以flink.hadoop 为前缀的配置项。
Utils.getYarnAndHadoopConfiguration(configuration);

public static YarnConfiguration getYarnAndHadoopConfiguration(..) {
     // 1. 从flink.yarn开头的配置项。
     YarnConfiguration yarnConfig = getYarnConfiguration(flinkConfig);
   // 2.
     yarnConfig.addResource(HadoopUtils.getHadoopConfiguration(flinkConfig));
    return yarnConfig;
}

public static Configuration getHadoopConfiguration(
        org.apache.flink.configuration.Configuration flinkConfiguration) {
    Configuration result = new HdfsConfiguration();
    boolean foundHadoopConfiguration = false

    // Approach 1: HADOOP_HOME environment variables
    String[] possibleHadoopConfPaths = new String[2];
    final String hadoopHome = System.getenv("HADOOP_HOME");
    if (hadoopHome != null) {
        possibleHadoopConfPaths[0] = hadoopHome + "/conf";
        possibleHadoopConfPaths[1] = hadoopHome + "/etc/hadoop"; // hadoop 2.2
    }
    for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
        if (possibleHadoopConfPath != null) {
            foundHadoopConfiguration = addHadoopConfIfFound(result, possibleHadoopConfPath);
        }
    }

    // Approach 2: Flink configuration (deprecated)
    final String hdfsDefaultPath =
            flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
    if (hdfsDefaultPath != null) {
        result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
        LOG.debug(
                "Using hdfs-default configuration-file path from Flink config: {}",
                hdfsDefaultPath);
        foundHadoopConfiguration = true;
    }

    final String hdfsSitePath =
            flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
    if (hdfsSitePath != null) {
        result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
        LOG.debug(
                "Using hdfs-site configuration-file path from Flink config: {}", hdfsSitePath);
        foundHadoopConfiguration = true;
    }

    final String hadoopConfigPath =
            flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
    if (hadoopConfigPath != null) {
        LOG.debug("Searching Hadoop configuration files in Flink config: {}", hadoopConfigPath);
        foundHadoopConfiguration =
                addHadoopConfIfFound(result, hadoopConfigPath) || foundHadoopConfiguration;
    }

    // Approach 3: HADOOP_CONF_DIR environment variable
    String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
    if (hadoopConfDir != null) {
        LOG.debug("Searching Hadoop configuration files in HADOOP_CONF_DIR: {}", hadoopConfDir);
        foundHadoopConfiguration =
                addHadoopConfIfFound(result, hadoopConfDir) || foundHadoopConfiguration;
    }

    // Approach 4: Flink configuration
    // add all configuration key with prefix 'flink.hadoop.' in flink conf to hadoop conf
    for (String key : flinkConfiguration.keySet()) {
        for (String prefix : FLINK_CONFIG_PREFIXES) {
            if (key.startsWith(prefix)) {
                String newKey = key.substring(prefix.length());
                String value = flinkConfiguration.getString(key, null);
                result.set(newKey, value);
                LOG.debug(
                        "Adding Flink config entry for {} as {}={} to Hadoop config",
                        key,
                        newKey,
                        value);
                foundHadoopConfiguration = true;
            }
        }
    }

    return result;
}

JobGraph提交部署

通过ClusterDescriptor#deployJobCluster将JobGraph提交到远程集群。YarnClusterDescriptor提交流程就是创建YarnApp使用的几部。具体可以对比 Hadoop: Writing YARN Applications

### 相关参考
Flink 原理与实现:架构和拓扑概览)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,928评论 6 509
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,748评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,282评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,065评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,101评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,855评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,521评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,414评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,931评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,053评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,191评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,873评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,529评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,074评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,188评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,491评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,173评论 2 357

推荐阅读更多精彩内容