一文搞定 Flink Job 的运行过程

背景

之前我们知道了Flink 是如何生成 StreamGraph 以及 如何生成 job如何生成Task,现在我们通过 Flink Shell 将他们串起来,这样我们就学习了从写代码开始到 Flink 运行 task 的整个过程是怎么样的。

正文

我们经常通过 Flink Shell 提交代码,如 flink run -p 2 -m yarn-cluster -ynm test -c test ./test-1.0-SNAPSHOT.jar "file" "./test.properties"&通过 flink shell 我们可以知道 org.apache.flink.client.cli.CliFrontend 为整个 Flink Job 的入口类

/**
     * Submits the job based on the arguments.
     */
    public static void main(final String[] args) {
        EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

        // 1. find the configuration directory
        final String configurationDirectory = getConfigurationDirectoryFromEnv();

        // 2. load the global configuration
        final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);

        // 3. load the custom command lines
        final List<CustomCommandLine<?>> customCommandLines = loadCustomCommandLines(
            configuration,
            configurationDirectory);

        try {
            final CliFrontend cli = new CliFrontend(
                configuration,
                customCommandLines);

            SecurityUtils.install(new SecurityConfiguration(cli.configuration));
            int retCode = SecurityUtils.getInstalledContext()
                    .runSecured(() -> cli.parseParameters(args));
            System.exit(retCode);
        }
        catch (Throwable t) {
            final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
            LOG.error("Fatal error while running command line interface.", strippedThrowable);
            strippedThrowable.printStackTrace();
            System.exit(31);
        }
    }

main 很简单,主要就两步,发现并加载配置文件,加载并解析命令。在解析命令的过程当中,如果传入的命令是 run,则可以一直追踪到 executeProgram 方法

protected void executeProgram(PackagedProgram program, ClusterClient<?> client, int parallelism) throws ProgramMissingJobException, ProgramInvocationException {
        logAndSysout("Starting execution of program");
        final JobSubmissionResult result = client.run(program, parallelism);
        ......
    }

通过 client run 方法来执行,最终调用我们传入的主方法( 通过 -c 参数),然后就开始执行用户代码了,首先会构建 StreamGraph ,最终调用 StreamContextEnvironment execute(String jobName) 方法

@Override
    public JobExecutionResult execute(String jobName) throws Exception {
        
        Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");

        StreamGraph streamGraph = this.getStreamGraph();
        streamGraph.setJobName(jobName);

        transformations.clear();

        // execute the programs 存在 -d 时
        if (ctx instanceof DetachedEnvironment) {
            LOG.warn("Job was executed in detached mode, the results will be available on completion.");
            ((DetachedEnvironment) ctx).setDetachedPlan(streamGraph);
            return DetachedEnvironment.DetachedJobExecutionResult.INSTANCE;
        } else {
            return ctx
                .getClient()
                .run(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader(), ctx.getSavepointRestoreSettings())
                .getJobExecutionResult();
        }
    }

然后

public JobSubmissionResult run(FlinkPlan compiledPlan,
            List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings)
            throws ProgramInvocationException {
        // 构建 jobGraph
        JobGraph job = getJobGraph(flinkConfig, compiledPlan, libraries, classpaths, savepointSettings);
        //将 job 提交至 cluster 上
        return submitJob(job, classLoader);
    }

主要就是构建 jobGraph ,关于构建 jobGraph 的细节可以参考 如何构建 job ,构建成功之后就开始提交 job 了。我们以 MiniCluster 为例

@Override
    public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
        final CompletableFuture<JobSubmissionResult> jobSubmissionResultFuture = submitJob(jobGraph);
        ......
    }
public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
        final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture();
        
        // we have to allow queued scheduling in Flip-6 mode because we need to request slots
        // from the ResourceManager
        jobGraph.setAllowQueuedScheduling(true);
        
        final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture);
        
        // cache jars and files
        final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph);
        
        final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture
            .thenCombine(
                dispatcherGatewayFuture,
                // 这里真正 submit 操作,交给了 dispatcher 去执行
                (Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout))
            .thenCompose(Function.identity());
        
        return acknowledgeCompletableFuture.thenApply(
            (Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
    }

接下来就到了 job 正式运行的时候了

private CompletableFuture<Void> runJob(JobGraph jobGraph) {
        Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));

        //创建 jobManagerRunner 同时也会创建 jobMaster,在创建 JobMaster 的时候构建了 ExecutionGraph
        final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);

        jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);

        // start jobManagerRunner 同时也启动了 jobMaster 等一系列 service,然后就开始调度 executionGraph,execution.deploy task.start
        return jobManagerRunnerFuture
            .thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner))
            .thenApply(FunctionUtils.nullFn())
            .whenCompleteAsync(
                (ignored, throwable) -> {
                    if (throwable != null) {
                        jobManagerRunnerFutures.remove(jobGraph.getJobID());
                    }
                },
                getMainThreadExecutor());
    }

这部分内容与 如何构建Job 是一致的,省略若干,具体可以参考 如何构建 job ,需要强调一点就是当 执行到 ExecutionGraph 的 scheduleForExecution方法时

// 调度 execution
    public void scheduleForExecution() throws JobException {

        assertRunningInJobMasterMainThread();

        final long currentGlobalModVersion = globalModVersion;

        // 会启动 startCheckpointScheduler
        if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
            ......
    }

会启动 CheckpointScheduler 从而开始出发 checkpoint。
接下来就开始部署,可以参考 如何构建 job如何生成Task
至此为止,从写代码到代码的计算执行,整个过程我们都已经学习清楚了。

总结

在这里插入图片描述
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容