Flink1.10任务提交流程分析(一)

本文仅为笔者平日学习记录之用,侵删
原文:https://mp.weixin.qq.com/s/gPx6q1pZRAuQuJslLER4Mg

Flink任务常见的提交方式通过flink run命令方式提交,如果我们想自己通过API方式实现任务提交,那么就需要了解flink run执行过程,本篇主要透过源码分析其提交流程。(注:基于1.10.1分析)

提交入口

查看bin/flink脚本可以看到提交入口类为:org.apache.flink.client.cli.CliFrontend,传入的参数就是flink 命令后面的参数,查看main方法:

public static void main(final String[] args) {
   EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
   // 1. $FLINK_HOME/conf
   final String configurationDirectory = getConfigurationDirectoryFromEnv();
   // 2. 加载flink-conf.yaml
   final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);
   // 3. 初始化所有的提交模式的参数解析器
   final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(
      configuration,
      configurationDirectory);
   try {
     //初始化执行入口
      final CliFrontend cli = new CliFrontend(
         configuration,
         customCommandLines);
      SecurityUtils.install(new SecurityConfiguration(cli.configuration));
      int retCode = SecurityUtils.getInstalledContext()

        //parseParameters 会根据不同的类型:run、info、list、modify等执行不同的流程
            .runSecured(() -> cli.parseParameters(args));
      System.exit(retCode);
   }
   catch (Throwable t) {
      final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
      LOG.error("Fatal error while running command line interface.", strippedThrowable);
      strippedThrowable.printStackTrace();
      System.exit(31);
   }
}

CustomCommandLine 表示的是一个命令行的参数解析的接口,其实现有FlinkYarnSessionCli、DefaultCLI,FlinkYarnSessionCli解析per-job或者session模式参数,DefaultCLI解析standalone模式参数。程序会根据传入的参数选项选择合适的参数解析器,通过其isActive方法其匹配,然后调用applyCommandLineOptionsToConfiguration解析参数。

RUN流程

protected void run(String[] args) throws Exception {
   LOG.info("Running 'run' command.");
   //savepoint恢复参数
   final Options commandOptions = CliFrontendParser.getRunCommandOptions();
   //将参数封装在CommandLine中
   final CommandLine commandLine = getCommandLine(commandOptions, args, true);
   //实例一个ProgramOptions对象,包含了jar路径、用户程序入口类、用户程序参数、classpath等
   final ProgramOptions programOptions = new ProgramOptions(commandLine);
   // 帮助命令
   if (commandLine.hasOption(HELP_OPTION.getOpt())) {
      CliFrontendParser.printHelpForRun(customCommandLines);
      return;
   }

   if (!programOptions.isPython()) {
      // Java program should be specified a JAR file
      if (programOptions.getJarFilePath() == null) {
         throw new CliArgsException("Java program should be specified a JAR file.");
      }
   }
  //代表程序,包含jar、参数等信息
   final PackagedProgram program;
   try {
      LOG.info("Building program from JAR file");
      program = buildProgram(programOptions);
   }
   catch (FileNotFoundException e) {
      throw new CliArgsException("Could not build the program from JAR file.", e);
   }
  //程序所需要jar信息,主要是用户jar包
   final List<URL> jobJars = program.getJobJarAndDependencies();
  //获取有效的配置信息,在这里会根据不同的参数解析器获取有效的配置信息
   final Configuration effectiveConfiguration =
         getEffectiveConfiguration(commandLine, programOptions, jobJars);

   LOG.debug("Effective executor configuration: {}", effectiveConfiguration);

   try {
      executeProgram(effectiveConfiguration, program);
   } finally {
      program.deleteExtractedLibraries();
   }
}

在getEffectiveConfiguration方法中,会根据参数选择不同的参数解析器,例如在per-job模式会使用 -m yarn-cluster,那么就会选择FlinkYarnSessionCli参数解析器,在这个过程中有一个重要的参数配置:execution.target,目标执行器,决定后面什么类型的执行器提交任务:yarn-session、yarn-per-job、remote,这个参数的配置也是通过不同的提交模式来配置的。

执行Program流程

executeProgram 方法直接调用ClientUtils.executeProgram方法:

public static void executeProgram(
      PipelineExecutorServiceLoader executorServiceLoader,
      Configuration configuration,
      PackagedProgram program) throws ProgramInvocationException {
   checkNotNull(executorServiceLoader);
   final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
   final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
   try {
      Thread.currentThread().setContextClassLoader(userCodeClassLoader);

      LOG.info("Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED));
     //用户创建程序执行的上下文
      ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
            executorServiceLoader,
            configuration,
            userCodeClassLoader);
     //会将factory赋给ExecutionEnvironment中变量
      ContextEnvironment.setAsContext(factory);

      try {
        //调用程序main方法
         program.invokeInteractiveModeForExecution();
      } finally {
         ContextEnvironment.unsetContext();
      }
   } finally {
      Thread.currentThread().setContextClassLoader(contextClassLoader);
   }
}

PipelineExecutorServiceLoader 用户Executor执行器的选择,参考Flink1.10基于工厂模式的任务提交与SPI机制;ContextEnvironmentFactory用于创建程序执行的上下文ExecutionEnvironment,可以理解为其封装了程序与外界之间的交互方式,例如per-job模式还是standalone模式、需要的资源大小等等,同时也会根据其类型创建不同StreamExecutionEnvironment(看下文详解)。对于客户端提交方式创建的是ContextEnvironment类型的ExecutionEnvironment。

Main提交流程

program.invokeInteractiveModeForExecution方法用户调用用户程序的main方法,在main方法中会调用StreamExecutionEnvironment.getExecutionEnvironment 获取合适的StreamExecutionEnvironment:

//StreamExecutionEnvironment.java
public static StreamExecutionEnvironment getExecutionEnvironment() {

  //threadLocalContextEnvironmentFactory、contextEnvironmentFactory默认都为空,所以会调用createStreamExecutionEnvironment方法
   return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
      .map(StreamExecutionEnvironmentFactory::createExecutionEnvironment)
      .orElseGet(StreamExecutionEnvironment::createStreamExecutionEnvironment);
}

private static StreamExecutionEnvironment createStreamExecutionEnvironment() {

   ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
   if (env instanceof ContextEnvironment) {
      return new StreamContextEnvironment((ContextEnvironment) env);
   } else if (env instanceof OptimizerPlanEnvironment) {
      return new StreamPlanEnvironment(env);
   } else {
      return createLocalEnvironment();
   }
}
//ExecutionEnvironment.java
public static ExecutionEnvironment getExecutionEnvironment() {
   return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
      .map(ExecutionEnvironmentFactory::createExecutionEnvironment)
       //在本地local的模式,创建LocalEnvironment
      .orElseGet(ExecutionEnvironment::createLocalEnvironment);
}

在ClientUtils.executeProgram 中分析到,会通过ContextEnvironment. setAsContext( factory)给threadLocalContextEnvironment Factory与contextEnvironmentFactory赋值,那么调用ContextEnvironmentFactory. createExecutionEnvironment 得到一个ContextEnvironment。最终StreamExecutionEnvironment. getExecutionEnvironment 得到一个内部封装了ContextEnvironment 对象的StreamExecutionEnvironment对象。

Execute流程

待main方法执行用户代码流程之后会调用StreamExecutionEnvironment.execute方法,接着会调用executeAsync(StreamGraph)方法:

public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
   checkNotNull(streamGraph, "StreamGraph cannot be null.");
   checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");
   //根绝提交模式选择匹配的factory
   final PipelineExecutorFactory executorFactory =
      executorServiceLoader.getExecutorFactory(configuration);

   checkNotNull(
      executorFactory,
      "Cannot find compatible factory for specified execution.target (=%s)",
      configuration.get(DeploymentOptions.TARGET));
  //选择合适的executor提交任务
   CompletableFuture<? extends JobClient> jobClientFuture = executorFactory
      .getExecutor(configuration)
      .execute(streamGraph, configuration);

   try {
      JobClient jobClient = jobClientFuture.get();
      jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
      return jobClient;
   } catch (Throwable t) {
      jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, t));
      ExceptionUtils.rethrow(t);

      // make javac happy, this code path will not be reached
      return null;
   }
}

这里就是上一篇讲到的根据SPI机制加载出所有PipelineExecutorFactory,然后选择匹配的factory,匹配的条件就是符合上文提到的execution.target参数的factory,对于yarn-per-job就是YarnJobClusterExecutorFactory,最终会获取到YarnJobClusterExecutor类型的Executor去向yarn提交作业。

总结

本文主要分析了flink run的开始到提交到集群前的流程,我认为可以简化为三步:

  • 选择合适的参数解析器解析命令参数(CustomCommandLine);

  • 选择合适的执行上下文环境(StreamExecutionEnvironment)

  • 选择合适的任务提交器(PipelineExecutor)

下一篇将会以yarn-per-job提交模式为例分析其具体提交过程。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,002评论 6 509
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,777评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,341评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,085评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,110评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,868评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,528评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,422评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,938评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,067评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,199评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,877评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,540评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,079评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,192评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,514评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,190评论 2 357