Flink源码阅读(三)--- ExecutionGraph 的生成

本文内容是基于Flink 1.9来讲解。在执行Flink任务的时候,会涉及到三个Graph,分别是StreamGraph,JobGraph,ExecutionGraph。其中StreamGraph和JobGraph是在client端生成的,ExecutionGraph是在JobMaster中执行的。

  • StreamGraph是根据用户代码生成的最原始执行图,也就是直接翻译用户逻辑得到的图
  • JobGraph是对StreamGraph进行优化,比如设置哪些算子可以chain,减少网络开销
  • ExecutionGraph是用于作业调度的执行图,对JobGraph加了并行度的概念

本篇文章在Flink源码阅读(二)--- JobGraph 的生成 基础上,介绍下ExecutionGraph的生成

1. ExecutionJobVertex

ExecutionJobVertex与JobGraph中的JobVertex一一对应,在graph中使用JobVertexID来唯一标识。
来看下ExecutionJobVertex类的成员变量

    private final ExecutionGraph graph;

    private final JobVertex jobVertex;

    /**
     * The IDs of all operators contained in this execution job vertex.
     *
     * <p>The ID's are stored depth-first post-order; for the forking chain below the ID's would be stored as [D, E, B, C, A].
     *  A - B - D
     *   \    \
     *    C    E
     * This is the same order that operators are stored in the {@code StreamTask}.
     */
    private final List<OperatorID> operatorIDs;

    /**
     * The alternative IDs of all operators contained in this execution job vertex.
     *
     * <p>The ID's are in the same order as {@link ExecutionJobVertex#operatorIDs}.
     */
    private final List<OperatorID> userDefinedOperatorIds;

    private final ExecutionVertex[] taskVertices;

    private final IntermediateResult[] producedDataSets;

    private final List<IntermediateResult> inputs;

    private final int parallelism;

    private final SlotSharingGroup slotSharingGroup;

    private final CoLocationGroup coLocationGroup;

    private final InputSplit[] inputSplits;

    private final boolean maxParallelismConfigured;

    private int maxParallelism;

从源码可以看出,ExecutionJobVertex是由JobVertex转换来的,会包含一个ExecutionVertex list和IntermediateResult list。

2. ExecutionVertex

对于每个ExecutionJobVertex,有多少并发就有多少个ExecutionVertex。ExecutionVertex代表subtask的单个并发,可以使用ExecutionJobVertex+subtask的并发index来表示。
来看下ExecutionVertex类的成员变量

    private final ExecutionJobVertex jobVertex;

    private final Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitions;

    private final ExecutionEdge[][] inputEdges;

    private final int subTaskIndex;

    private final ExecutionVertexID executionVertexId;

    private final EvictingBoundedList<ArchivedExecution> priorExecutions;

    private final Time timeout;

    /** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations. */
    private final String taskNameWithSubtask;

    private volatile CoLocationConstraint locationConstraint;

    /** The current or latest execution attempt of this vertex's task. */
    private volatile Execution currentExecution;    // this field must never be null

    private final ArrayList<InputSplit> inputSplits;

从源码可以看出:

  • 一个ExecutionVertex会包含一个ExecutionJobVertex和subTaskIndex
  • 使用ExecutionVertexID唯一标识,
  • 一个ExecutionVertex会包含一个Execution
  • 一个ExecutionVertex会包含一个resultPartitions map

3. Execution

Execution是ExecutionVertex的一次执行。对于一个ExecutionVertex可能需要执行多次,那这种情况就会有多个Execution,比如恢复,重新计算等。一个Execution使用一个ExecutionAttemptID唯一标识。
接下来看下Execution类的成员变量

    /** The executor which is used to execute futures. */
    private final Executor executor;

    /** The execution vertex whose task this execution executes. */
    private final ExecutionVertex vertex;

    /** The unique ID marking the specific execution instant of the task. */
    private final ExecutionAttemptID attemptId;

    /** Gets the global modification version of the execution graph when this execution was created.
     * This version is bumped in the ExecutionGraph whenever a global failover happens. It is used
     * to resolve conflicts between concurrent modification by global and local failover actions. */
    private final long globalModVersion;

    /** The timestamps when state transitions occurred, indexed by {@link ExecutionState#ordinal()}. */
    private final long[] stateTimestamps;

    private final int attemptNumber;

4. IntermediateResult

JobGraph 使用 IntermediateDataSet 表示 JobVertex 的输出,一个 JobVertex 可能有 0或者多个输出。在 ExecutionGraph 中,与此对应的就是 IntermediateResult,使用IntermediateDataSetID来唯一标识。
接下来看下IntermediateResult类的成员变量

    private final IntermediateDataSetID id;

    private final ExecutionJobVertex producer;

    private final IntermediateResultPartition[] partitions;

    /**
     * Maps intermediate result partition IDs to a partition index. This is
     * used for ID lookups of intermediate results. I didn't dare to change the
     * partition connect logic in other places that is tightly coupled to the
     * partitions being held as an array.
     */
    private final HashMap<IntermediateResultPartitionID, Integer> partitionLookupHelper = new HashMap<>();

    // ExecutionJobVertex producer的并行度
    private final int numParallelProducers;

    private final AtomicInteger numberOfRunningProducers;

    private int partitionsAssigned;

    private int numConsumers;

    private final int connectionIndex;
  • 一个ExecutionJobVertex可能包含0或者多个IntermediateResult,每一个并行的子任务ExecutionVertex可能会包含0或者多个IntermediateResultPartition。

5. IntermediateResultPartition

IntermediateResult可以包含0或者多个IntermediateResultPartition,IntermediateResultPartition是ExecutionVertex的一个输出分区。
接下来看下IntermediateResultPartition类的成员变量

    private final IntermediateResult totalResult;

    private final ExecutionVertex producer;

    private final int partitionNumber;

    private final IntermediateResultPartitionID partitionId;

    private List<List<ExecutionEdge>> consumers;

从源码可以看出,IntermediateResultPartition的生产者是ExecutionVertex,消费者是一个或若干个ExecutionEdge。

6. ExecutionGraph生成

入口ExecutionGraphBuilder#buildGraph()方法
主要做的工作内容:
6.1 构建ExecutionGraph,主要是对jobInformation,restartStrategy,slotProvider等初始化。
6.2 JobVertex在Master上进行初始化

        // initialize the vertices that have a master initialization hook
        // file output formats create directories here, input formats create splits

        final long initMasterStart = System.nanoTime();
        log.info("Running initialization on master for job {} ({}).", jobName, jobId);

        for (JobVertex vertex : jobGraph.getVertices()) {
            String executableClass = vertex.getInvokableClassName();
            if (executableClass == null || executableClass.isEmpty()) {
                throw new JobSubmissionException(jobId,
                        "The vertex " + vertex.getID() + " (" + vertex.getName() + ") has no invokable class.");
            }

            try {
                vertex.initializeOnMaster(classLoader);
            }
            catch (Throwable t) {
                    throw new JobExecutionException(jobId,
                            "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t);
            }
        }

        log.info("Successfully ran initialization on master in {} ms.",
                (System.nanoTime() - initMasterStart) / 1_000_000);
  • 设置input format和input splits
  • 设置output format并且调用initializeGlobal方法。
    关于initializeGlobal说明下面两点:
       - initializeGlobal是在分布式程序执行之前,在JM中调用
       - initializeGlobal方法通过指定的write mode在分布式文件系统中初始化ouput目录
    6.3 把JobGraph中的所有JobVertex从source进行拓扑排序
        // topologically sort the job vertices and attach the graph to the existing one
        List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
        if (log.isDebugEnabled()) {
            log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(), jobName, jobId);
        }

6.4 将6.3排序之后的JobVertex list加到ExecutionGraph中

    public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {

        assertRunningInJobMasterMainThread();

        LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} " +
                "vertices and {} intermediate results.",
            topologiallySorted.size(),
            tasks.size(),
            intermediateResults.size());

        final ArrayList<ExecutionJobVertex> newExecJobVertices = new ArrayList<>(topologiallySorted.size());
        final long createTimestamp = System.currentTimeMillis();

        for (JobVertex jobVertex : topologiallySorted) {

            if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
                this.isStoppable = false;
            }

            // create the execution job vertex and attach it to the graph
            ExecutionJobVertex ejv = new ExecutionJobVertex(
                    this,
                    jobVertex,
                    1,
                    maxPriorAttemptsHistoryLength,
                    rpcTimeout,
                    globalModVersion,
                    createTimestamp);

            ejv.connectToPredecessors(this.intermediateResults);

            ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
            if (previousTask != null) {
                throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
                    jobVertex.getID(), ejv, previousTask));
            }

            for (IntermediateResult res : ejv.getProducedDataSets()) {
                IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
                if (previousDataSet != null) {
                    throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
                        res.getId(), res, previousDataSet));
                }
            }

            this.verticesInCreationOrder.add(ejv);
            this.numVerticesTotal += ejv.getParallelism();
            newExecJobVertices.add(ejv);
        }

        failoverStrategy.notifyNewVertices(newExecJobVertices);

        schedulingTopology = new ExecutionGraphToSchedulingTopologyAdapter(this);
        partitionReleaseStrategy = partitionReleaseStrategyFactory.createInstance(
            schedulingTopology,
            new DefaultFailoverTopology(this));
    }

主要做了哪些工作:

  • 每个JobVertex生成一个对应的ExecutionJobVertex
  • 将生成的ExecutionJobVertex加入tasks map中,map key就是JobVertexID
  • 把ExecutionJobVertex节点生成的结果IntermediateResult加入intermediateResults map中,map key是IntermediateDataSetID
  • ExecutionGraph有个成员变量numVerticesTotal,这个变量会把所有ExecutionJobVertex的并发度都算进去,也就是后面申请资源的时候,slot的个数

6.5 设置checkpoint

  • 为job创建一个CompletedCheckpointStore实例,主要工作是设置checkpoint路径
  • 为job创建一个CheckpointIDCounter实例,主要是设置checkpointIdCounterPath
  • 创建CheckpointFailureManager,当checkpoint失败时增加callback,比如fail job
  • 创建CheckpointCoordinator实例,可以看下对象的成员变量
    /** The job whose checkpoint this coordinator coordinates. */
    private final JobID job;

    /** Default checkpoint properties. **/
    private final CheckpointProperties checkpointProperties;

    /** The executor used for asynchronous calls, like potentially blocking I/O. */
    private final Executor executor;

    /** Tasks who need to be sent a message when a checkpoint is started. */
    private final ExecutionVertex[] tasksToTrigger;

    /** Tasks who need to acknowledge a checkpoint before it succeeds. */
    private final ExecutionVertex[] tasksToWaitFor;

    /** Tasks who need to be sent a message when a checkpoint is confirmed. */
    private final ExecutionVertex[] tasksToCommitTo;

    /** Map from checkpoint ID to the pending checkpoint. */
    private final Map<Long, PendingCheckpoint> pendingCheckpoints;

    /** Completed checkpoints. Implementations can be blocking. Make sure calls to methods
     * accessing this don't block the job manager actor and run asynchronously. */
    private final CompletedCheckpointStore completedCheckpointStore;

    /** The root checkpoint state backend, which is responsible for initializing the
     * checkpoint, storing the metadata, and cleaning up the checkpoint. */
    private final CheckpointStorageCoordinatorView checkpointStorage;

    /** A list of recent checkpoint IDs, to identify late messages (vs invalid ones). */
    private final ArrayDeque<Long> recentPendingCheckpoints;

    /** Checkpoint ID counter to ensure ascending IDs. In case of job manager failures, these
     * need to be ascending across job managers. */
    private final CheckpointIDCounter checkpointIdCounter;

    /** The base checkpoint interval. Actual trigger time may be affected by the
     * max concurrent checkpoints and minimum-pause values */
    private final long baseInterval;

    /** The max time (in ms) that a checkpoint may take. */
    private final long checkpointTimeout;

    /** The min time(in ns) to delay after a checkpoint could be triggered. Allows to
     * enforce minimum processing time between checkpoint attempts */
    private final long minPauseBetweenCheckpointsNanos;

    /** The maximum number of checkpoints that may be in progress at the same time. */
    private final int maxConcurrentCheckpointAttempts;

    /** The timer that handles the checkpoint timeouts and triggers periodic checkpoints. */
    private final ScheduledThreadPoolExecutor timer;

    /** The master checkpoint hooks executed by this checkpoint coordinator. */
    private final HashMap<String, MasterTriggerRestoreHook<?>> masterHooks;

    /** Actor that receives status updates from the execution graph this coordinator works for. */
    private JobStatusListener jobStatusListener;

    /** The number of consecutive failed trigger attempts. */
    private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0);

    /** A handle to the current periodic trigger, to cancel it when necessary. */
    private ScheduledFuture<?> currentPeriodicTrigger;

    /** The timestamp (via {@link System#nanoTime()}) when the last checkpoint completed. */
    private long lastCheckpointCompletionNanos;

    /** Flag whether a triggered checkpoint should immediately schedule the next checkpoint.
     * Non-volatile, because only accessed in synchronized scope */
    private boolean periodicScheduling;

    /** Flag whether a trigger request could not be handled immediately. Non-volatile, because only
     * accessed in synchronized scope */
    private boolean triggerRequestQueued;

    /** Flag marking the coordinator as shut down (not accepting any messages any more). */
    private volatile boolean shutdown;

    /** Optional tracker for checkpoint statistics. */
    @Nullable
    private CheckpointStatsTracker statsTracker;

    /** A factory for SharedStateRegistry objects. */
    private final SharedStateRegistryFactory sharedStateRegistryFactory;

    /** Registry that tracks state which is shared across (incremental) checkpoints. */
    private SharedStateRegistry sharedStateRegistry;

    private boolean isPreferCheckpointForRecovery;

    private final CheckpointFailureManager failureManager;

  • 里面包含了checkpoint 制作间隔,job所有的tasks以及checkpointIdCounter等信息
  • checkpointIdCounter开始计数,checkpointIDCounter.start();

6.6 为ExecutionGraph增加metric

小结

ExecutionGraph是分布式执行job时最核心的数据结构,在JobGraph的基础上增加了并发度的概念。
ExecutionGraph涉及到节点总结如下:

  • JobGraph 的 JobVertex 与 ExecutionGraph 的 ExecutionJobVertex 一一对应。
  • 每个ExecutionJobVertex 有 parallelism 个 ExecutionVertex。
  • 每个JobVertex 可能有 n(n>=0) 个 IntermediateDataSet,在 ExecutionJobVertex 中,一个 IntermediateDataSet 对应一个 IntermediateResult,每一个 IntermediateResult 都有 parallelism 个生产者, 对应 parallelism 个 IntermediateResultPartition
  • 每个ExecutionJobVertex 都会和前面的 IntermediateResult 连接,ExecutionVertex 和 IntermediateResult 建立连接生成 ExecutionEdge。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,525评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,203评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,862评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,728评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,743评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,590评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,330评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,244评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,693评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,885评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,001评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,723评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,343评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,919评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,042评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,191评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,955评论 2 355

推荐阅读更多精彩内容