一文搞定 Flink Job 提交全流程

前言

前面,我们已经分析了 一文搞定 Flink 消费消息的全流程写给大忙人看的 Flink Window原理 还有 一文搞定 Flink Checkpoint Barrier 全流程 等等,接下来也该回归到最初始的时候,Flink Job 是如何提交的。

正文

我们知道 Flink 总共有两种提交模式:本地模式和远程模式( 当然也对应着不同的 environment,具体可以参考 Flink Context到底是什么?),我们以本地模式为主,两种模式基本上相似。

当我们执行 env.execute ,实际上调用了 LocalStreamEnvironment.execute 方法

/**
     * Executes the JobGraph of the on a mini cluster of CLusterUtil with a user
     * specified name.
     *
     * @param jobName
     *            name of the job
     * @return The result of the job execution, containing elapsed time and accumulators.
     */
    @Override
    // 本地模式执行方法 env.execute
    public JobExecutionResult execute(String jobName) throws Exception {
        // transform the streaming program into a JobGraph
        //TODO 111
        //获取 streamGraph
        StreamGraph streamGraph = getStreamGraph();
        streamGraph.setJobName(jobName);
        
        //获取 jobGraph
        JobGraph jobGraph = streamGraph.getJobGraph();
        jobGraph.setAllowQueuedScheduling(true);

        Configuration configuration = new Configuration();
        configuration.addAll(jobGraph.getJobConfiguration());
        configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");

        // add (and override) the settings with what the user defined
        configuration.addAll(this.configuration);

        if (!configuration.contains(RestOptions.BIND_PORT)) {
            configuration.setString(RestOptions.BIND_PORT, "0");
        }

        int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());

        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
            .setConfiguration(configuration)
            .setNumSlotsPerTaskManager(numSlotsPerTaskManager)
            .build();

        if (LOG.isInfoEnabled()) {
            LOG.info("Running job on local embedded Flink mini cluster");
        }

        MiniCluster miniCluster = new MiniCluster(cfg);

        try {
            //启动集群,包括启动JobMaster,进行leader选举等等
            miniCluster.start();
            configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort());

            // 提交任务到JobMaster
            return miniCluster.executeJobBlocking(jobGraph);
        }
        finally {
            transformations.clear();
            miniCluster.close();
        }
    }

这里构建了 StreamGraph、JobGraph,到后面还会有 ExecutionGraph,关于这些图的一些东西,一张图就差不多了


在这里插入图片描述

当 miniCluster.start() 时

// start cluster
    public void start() throws Exception {
        synchronized (lock) {
            checkState(!running, "MiniCluster is already running");

            ......
                ioExecutor = Executors.newFixedThreadPool(
                    Hardware.getNumberCPUCores(),
                    new ExecutorThreadFactory("mini-cluster-io"));
                //创建 HA service
                haServices = createHighAvailabilityServices(configuration, ioExecutor);

                //启动 blobServer
                blobServer = new BlobServer(configuration, haServices.createBlobStore());
                blobServer.start();

                heartbeatServices = HeartbeatServices.fromConfiguration(configuration);

                blobCacheService = new BlobCacheService(
                    configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort())
                );

                // task executor
                startTaskManagers();
......
                resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);
                dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);
                webMonitorLeaderRetrievalService.start(webMonitorLeaderRetriever);
            ......
    }

创建了 HaService,启动了 blobCacheService、resourceManagerLeaderRetriever、dispatcherLeaderRetriever、webMonitorLeaderRetrievalService,我们重点看一下 startTaskManagers

@VisibleForTesting
    void startTaskExecutor() throws Exception {
        synchronized (lock) {
            final Configuration configuration = miniClusterConfiguration.getConfiguration();

            final TaskExecutor taskExecutor = TaskManagerRunner.startTaskManager(
                configuration,
                new ResourceID(UUID.randomUUID().toString()),
                taskManagerRpcServiceFactory.createRpcService(),
                haServices,
                heartbeatServices,
                metricRegistry,
                blobCacheService,
                useLocalCommunication(),
                taskManagerTerminatingFatalErrorHandlerFactory.create(taskManagers.size()));

            taskExecutor.start();
            taskManagers.add(taskExecutor);
        }
    }

TaskExecutor ( 其实就是 TaskManager ) 创建了并启动了。最终的用来submitTask、cancalTask、stopTask 、执行 task 、confirmCheckpoint、requestSlot、freeSlot 等等。

一些必要的组件已经启动成功,接下来该提交 jobGraph 了 miniCluster.executeJobBlocking(jobGraph); 跟踪代码

public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
        final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture();
        
        // we have to allow queued scheduling in Flip-6 mode because we need to request slots
        // from the ResourceManager
        jobGraph.setAllowQueuedScheduling(true);
        
        final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture);
        
        // cache jars and files
        final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph);
        
        final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture
            .thenCombine(
                dispatcherGatewayFuture,
                // 这里真正 submit 操作,交给了 dispatcher 去执行
                (Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout))
            .thenCompose(Function.identity());
        
        return acknowledgeCompletableFuture.thenApply(
            (Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
    }

最终交给了 dispatcher 来进行 jobGraph 的提交,最终到这里

private CompletableFuture<Void> runJob(JobGraph jobGraph) {
        Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));

        //创建 job Manager runner
        final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);

        jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);

        // start job manager
        return jobManagerRunnerFuture
            .thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner))
            .thenApply(FunctionUtils.nullFn())
            .whenCompleteAsync(
                (ignored, throwable) -> {
                    if (throwable != null) {
                        jobManagerRunnerFutures.remove(jobGraph.getJobID());
                    }
                },
                getMainThreadExecutor());
    }

这个时候开始创建 jobManagerRunner,在创建 jobManagerRunner 的同时也会创建 jobMaster

public JobMaster(
            RpcService rpcService,
            JobMasterConfiguration jobMasterConfiguration,
            ResourceID resourceId,
            JobGraph jobGraph,
            HighAvailabilityServices highAvailabilityService,
            SlotPoolFactory slotPoolFactory,
            SchedulerFactory schedulerFactory,
            JobManagerSharedServices jobManagerSharedServices,
            HeartbeatServices heartbeatServices,
            JobManagerJobMetricGroupFactory jobMetricGroupFactory,
            OnCompletionActions jobCompletionActions,
            FatalErrorHandler fatalErrorHandler,
            ClassLoader userCodeLoader) throws Exception {

        super(rpcService, AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME));

        this.jobMasterConfiguration = checkNotNull(jobMasterConfiguration);
        this.resourceId = checkNotNull(resourceId);
        this.jobGraph = checkNotNull(jobGraph);
        this.rpcTimeout = jobMasterConfiguration.getRpcTimeout();
        this.highAvailabilityServices = checkNotNull(highAvailabilityService);
        this.blobWriter = jobManagerSharedServices.getBlobWriter();
        this.scheduledExecutorService = jobManagerSharedServices.getScheduledExecutorService();
        this.jobCompletionActions = checkNotNull(jobCompletionActions);
        this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
        this.userCodeLoader = checkNotNull(userCodeLoader);
        this.heartbeatServices = checkNotNull(heartbeatServices);
        this.jobMetricGroupFactory = checkNotNull(jobMetricGroupFactory);

        final String jobName = jobGraph.getName();
        final JobID jid = jobGraph.getJobID();

        log.info("Initializing job {} ({}).", jobName, jid);

        final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
                jobGraph.getSerializedExecutionConfig()
                        .deserializeValue(userCodeLoader)
                        .getRestartStrategy();

        // 设置重启策略
        this.restartStrategy = RestartStrategyResolving.resolve(restartStrategyConfiguration,
            jobManagerSharedServices.getRestartStrategyFactory(),
            jobGraph.isCheckpointingEnabled());

        .....
        //TODO 111
        //createExecutionGraph 可能会 restore from checkpoint(savepoint)
        this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup);
        ......
    }

最关键的时在创建 jobMaster 的同时还 create executionGraph。然后开始启动 jobManagerRunner,最终会启动 jobMaster

private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {
        log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
            jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, getAddress());

        try {
            runningJobsRegistry.setJobRunning(jobGraph.getJobID());
        } catch (IOException e) {
            return FutureUtils.completedExceptionally(
                new FlinkException(
                    String.format("Failed to set the job %s to running in the running jobs registry.", jobGraph.getJobID()),
                    e));
        }

        final CompletableFuture<Acknowledge> startFuture;
        try {
            // 通过给定的 jobId start job master
            startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
        } catch (Exception e) {
            return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e));
        }

        final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
        return startFuture.thenAcceptAsync(
            (Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(leaderSessionId, currentLeaderGatewayFuture),
            executor);
    }

jobMaster 启动完,就会正式开始执行 job 了

public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {
        // make sure we receive RPC and async calls
        start();
        // 正式 开始执行  Job
        return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
    }

开始正式执行 job

// start job execution
    private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {

        validateRunsInMainThread();

        checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");

        if (Objects.equals(getFencingToken(), newJobMasterId)) {
            log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);

            return Acknowledge.get();
        }

        setNewFencingToken(newJobMasterId);

        startJobMasterServices();

        log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId);
        
        // 重新设置或者调度 executionGraph
        resetAndScheduleExecutionGraph();

        return Acknowledge.get();
    }

然后就开始调度 executionGraph 了

// 调度 execution
    public void scheduleForExecution() throws JobException {

        assertRunningInJobMasterMainThread();

        final long currentGlobalModVersion = globalModVersion;
        
        //改变 job 的状态,由 CREATED 变为 RUNNING
        if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {

            final CompletableFuture<Void> newSchedulingFuture;

            switch (scheduleMode) {

                case LAZY_FROM_SOURCES:
                    newSchedulingFuture = scheduleLazy(slotProvider);
                    break;

                case EAGER:
                    // 300000 ms default
                    //开始调度
                    newSchedulingFuture = scheduleEager(slotProvider, allocationTimeout);
                    break;

                default:
                    throw new JobException("Schedule mode is invalid.");
            }

            if (state == JobStatus.RUNNING && currentGlobalModVersion == globalModVersion) {
                schedulingFuture = newSchedulingFuture;
                newSchedulingFuture.whenComplete(
                    (Void ignored, Throwable throwable) -> {
                        if (throwable != null && !(throwable instanceof CancellationException)) {
                            // only fail if the scheduling future was not canceled
                            failGlobal(ExceptionUtils.stripCompletionException(throwable));
                        }
                    });
            } else {
                newSchedulingFuture.cancel(false);
            }
        }
        else {
            throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
        }
    }

调度了之后就开始 deploy 了

/**
     * Deploys the execution to the previously assigned resource.
     *
     * @throws JobException if the execution cannot be deployed to the assigned resource
     */
    // 从 source 到 sink 循环部署
    public void deploy() throws JobException {
        assertRunningInJobMasterMainThread();

        final LogicalSlot slot  = assignedResource;
        .....

            // TaskDeploymentDescriptor 这个类保存了 task 执行所必须的所有内容,
            // 例如序列化的算子,输入的 InputGate 和输出的 ResultPartition 的定义,该 task 要作为几个 subtask 执行等等。
            final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
                attemptId,
                slot,
                taskRestore,
                attemptNumber);

            // null taskRestore to let it be GC'ed
            taskRestore = null;

            final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

            final ComponentMainThreadExecutor jobMasterMainThreadExecutor =
                vertex.getExecutionGraph().getJobMasterMainThreadExecutor();


            // We run the submission in the future executor so that the serialization of large TDDs does not block
            // the main thread and sync back to the main thread once submission is completed.
            // 提交 task 先 source
            // 对于 TM 来说,执行 task 就是把收到的 TaskDeploymentDescriptor 对象转换成一个 task 并执行的过程。
            CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor)
                .thenCompose(Function.identity())
                .whenCompleteAsync(
                    (ack, failure) -> {
                        // only respond to the failure case
                        if (failure != null) {
                            if (failure instanceof TimeoutException) {
                                String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';

                                markFailed(new Exception(
                                    "Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
                                        + ") not responding after a rpcTimeout of " + rpcTimeout, failure));
                            } else {
                                markFailed(failure);
                            }
                        }
                    },
                    jobMasterMainThreadExecutor);

        }
        catch (Throwable t) {
            markFailed(t);
            ExceptionUtils.rethrow(t);
        }
    }

部署的过程当中可能会申请资源,然后就开始提交 task 了,再往下就开始执行 task 了。

总结

yarn 模式


在这里插入图片描述
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,657评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,889评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,057评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,509评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,562评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,443评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,251评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,129评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,561评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,779评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,902评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,621评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,220评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,838评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,971评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,025评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,843评论 2 354