本文内容是基于Flink 1.9来讲解。在执行Flink任务的时候,会涉及到三个Graph,分别是StreamGraph,JobGraph,ExecutionGraph。其中StreamGraph和JobGraph是在client端生成的,ExecutionGraph是在JobMaster中执行的。
- StreamGraph是根据用户代码生成的最原始执行图,也就是直接翻译用户逻辑得到的图
- JobGraph是对StreamGraph进行优化,比如设置哪些算子可以chain,减少网络开销
- ExecutionGraph是用于作业调度的执行图,对JobGraph加了并行度的概念
本篇文章在Flink源码阅读(二)--- JobGraph 的生成 基础上,介绍下ExecutionGraph的生成
1. ExecutionJobVertex
ExecutionJobVertex与JobGraph中的JobVertex一一对应,在graph中使用JobVertexID来唯一标识。
来看下ExecutionJobVertex类的成员变量
private final ExecutionGraph graph;
private final JobVertex jobVertex;
/**
* The IDs of all operators contained in this execution job vertex.
*
* <p>The ID's are stored depth-first post-order; for the forking chain below the ID's would be stored as [D, E, B, C, A].
* A - B - D
* \ \
* C E
* This is the same order that operators are stored in the {@code StreamTask}.
*/
private final List<OperatorID> operatorIDs;
/**
* The alternative IDs of all operators contained in this execution job vertex.
*
* <p>The ID's are in the same order as {@link ExecutionJobVertex#operatorIDs}.
*/
private final List<OperatorID> userDefinedOperatorIds;
private final ExecutionVertex[] taskVertices;
private final IntermediateResult[] producedDataSets;
private final List<IntermediateResult> inputs;
private final int parallelism;
private final SlotSharingGroup slotSharingGroup;
private final CoLocationGroup coLocationGroup;
private final InputSplit[] inputSplits;
private final boolean maxParallelismConfigured;
private int maxParallelism;
从源码可以看出,ExecutionJobVertex是由JobVertex转换来的,会包含一个ExecutionVertex list和IntermediateResult list。
2. ExecutionVertex
对于每个ExecutionJobVertex,有多少并发就有多少个ExecutionVertex。ExecutionVertex代表subtask的单个并发,可以使用ExecutionJobVertex+subtask的并发index来表示。
来看下ExecutionVertex类的成员变量
private final ExecutionJobVertex jobVertex;
private final Map<IntermediateResultPartitionID, IntermediateResultPartition> resultPartitions;
private final ExecutionEdge[][] inputEdges;
private final int subTaskIndex;
private final ExecutionVertexID executionVertexId;
private final EvictingBoundedList<ArchivedExecution> priorExecutions;
private final Time timeout;
/** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations. */
private final String taskNameWithSubtask;
private volatile CoLocationConstraint locationConstraint;
/** The current or latest execution attempt of this vertex's task. */
private volatile Execution currentExecution; // this field must never be null
private final ArrayList<InputSplit> inputSplits;
从源码可以看出:
- 一个ExecutionVertex会包含一个ExecutionJobVertex和subTaskIndex
- 使用ExecutionVertexID唯一标识,
- 一个ExecutionVertex会包含一个Execution
- 一个ExecutionVertex会包含一个resultPartitions map
3. Execution
Execution是ExecutionVertex的一次执行。对于一个ExecutionVertex可能需要执行多次,那这种情况就会有多个Execution,比如恢复,重新计算等。一个Execution使用一个ExecutionAttemptID唯一标识。
接下来看下Execution类的成员变量
/** The executor which is used to execute futures. */
private final Executor executor;
/** The execution vertex whose task this execution executes. */
private final ExecutionVertex vertex;
/** The unique ID marking the specific execution instant of the task. */
private final ExecutionAttemptID attemptId;
/** Gets the global modification version of the execution graph when this execution was created.
* This version is bumped in the ExecutionGraph whenever a global failover happens. It is used
* to resolve conflicts between concurrent modification by global and local failover actions. */
private final long globalModVersion;
/** The timestamps when state transitions occurred, indexed by {@link ExecutionState#ordinal()}. */
private final long[] stateTimestamps;
private final int attemptNumber;
4. IntermediateResult
JobGraph 使用 IntermediateDataSet 表示 JobVertex 的输出,一个 JobVertex 可能有 0或者多个输出。在 ExecutionGraph 中,与此对应的就是 IntermediateResult,使用IntermediateDataSetID来唯一标识。
接下来看下IntermediateResult类的成员变量
private final IntermediateDataSetID id;
private final ExecutionJobVertex producer;
private final IntermediateResultPartition[] partitions;
/**
* Maps intermediate result partition IDs to a partition index. This is
* used for ID lookups of intermediate results. I didn't dare to change the
* partition connect logic in other places that is tightly coupled to the
* partitions being held as an array.
*/
private final HashMap<IntermediateResultPartitionID, Integer> partitionLookupHelper = new HashMap<>();
// ExecutionJobVertex producer的并行度
private final int numParallelProducers;
private final AtomicInteger numberOfRunningProducers;
private int partitionsAssigned;
private int numConsumers;
private final int connectionIndex;
- 一个ExecutionJobVertex可能包含0或者多个IntermediateResult,每一个并行的子任务ExecutionVertex可能会包含0或者多个IntermediateResultPartition。
5. IntermediateResultPartition
IntermediateResult可以包含0或者多个IntermediateResultPartition,IntermediateResultPartition是ExecutionVertex的一个输出分区。
接下来看下IntermediateResultPartition类的成员变量
private final IntermediateResult totalResult;
private final ExecutionVertex producer;
private final int partitionNumber;
private final IntermediateResultPartitionID partitionId;
private List<List<ExecutionEdge>> consumers;
从源码可以看出,IntermediateResultPartition的生产者是ExecutionVertex,消费者是一个或若干个ExecutionEdge。
6. ExecutionGraph生成
入口ExecutionGraphBuilder#buildGraph()方法
主要做的工作内容:
6.1 构建ExecutionGraph,主要是对jobInformation,restartStrategy,slotProvider等初始化。
6.2 JobVertex在Master上进行初始化
// initialize the vertices that have a master initialization hook
// file output formats create directories here, input formats create splits
final long initMasterStart = System.nanoTime();
log.info("Running initialization on master for job {} ({}).", jobName, jobId);
for (JobVertex vertex : jobGraph.getVertices()) {
String executableClass = vertex.getInvokableClassName();
if (executableClass == null || executableClass.isEmpty()) {
throw new JobSubmissionException(jobId,
"The vertex " + vertex.getID() + " (" + vertex.getName() + ") has no invokable class.");
}
try {
vertex.initializeOnMaster(classLoader);
}
catch (Throwable t) {
throw new JobExecutionException(jobId,
"Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(), t);
}
}
log.info("Successfully ran initialization on master in {} ms.",
(System.nanoTime() - initMasterStart) / 1_000_000);
- 设置input format和input splits
- 设置output format并且调用initializeGlobal方法。
关于initializeGlobal说明下面两点:
- initializeGlobal是在分布式程序执行之前,在JM中调用
- initializeGlobal方法通过指定的write mode在分布式文件系统中初始化ouput目录
6.3 把JobGraph中的所有JobVertex从source进行拓扑排序
// topologically sort the job vertices and attach the graph to the existing one
List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
if (log.isDebugEnabled()) {
log.debug("Adding {} vertices from job graph {} ({}).", sortedTopology.size(), jobName, jobId);
}
6.4 将6.3排序之后的JobVertex list加到ExecutionGraph中
public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {
assertRunningInJobMasterMainThread();
LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} " +
"vertices and {} intermediate results.",
topologiallySorted.size(),
tasks.size(),
intermediateResults.size());
final ArrayList<ExecutionJobVertex> newExecJobVertices = new ArrayList<>(topologiallySorted.size());
final long createTimestamp = System.currentTimeMillis();
for (JobVertex jobVertex : topologiallySorted) {
if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
this.isStoppable = false;
}
// create the execution job vertex and attach it to the graph
ExecutionJobVertex ejv = new ExecutionJobVertex(
this,
jobVertex,
1,
maxPriorAttemptsHistoryLength,
rpcTimeout,
globalModVersion,
createTimestamp);
ejv.connectToPredecessors(this.intermediateResults);
ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
if (previousTask != null) {
throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
jobVertex.getID(), ejv, previousTask));
}
for (IntermediateResult res : ejv.getProducedDataSets()) {
IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
if (previousDataSet != null) {
throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
res.getId(), res, previousDataSet));
}
}
this.verticesInCreationOrder.add(ejv);
this.numVerticesTotal += ejv.getParallelism();
newExecJobVertices.add(ejv);
}
failoverStrategy.notifyNewVertices(newExecJobVertices);
schedulingTopology = new ExecutionGraphToSchedulingTopologyAdapter(this);
partitionReleaseStrategy = partitionReleaseStrategyFactory.createInstance(
schedulingTopology,
new DefaultFailoverTopology(this));
}
主要做了哪些工作:
- 每个JobVertex生成一个对应的ExecutionJobVertex
- 将生成的ExecutionJobVertex加入tasks map中,map key就是JobVertexID
- 把ExecutionJobVertex节点生成的结果IntermediateResult加入intermediateResults map中,map key是IntermediateDataSetID
- ExecutionGraph有个成员变量numVerticesTotal,这个变量会把所有ExecutionJobVertex的并发度都算进去,也就是后面申请资源的时候,slot的个数
6.5 设置checkpoint
- 为job创建一个CompletedCheckpointStore实例,主要工作是设置checkpoint路径
- 为job创建一个CheckpointIDCounter实例,主要是设置checkpointIdCounterPath
- 创建CheckpointFailureManager,当checkpoint失败时增加callback,比如fail job
- 创建CheckpointCoordinator实例,可以看下对象的成员变量
/** The job whose checkpoint this coordinator coordinates. */
private final JobID job;
/** Default checkpoint properties. **/
private final CheckpointProperties checkpointProperties;
/** The executor used for asynchronous calls, like potentially blocking I/O. */
private final Executor executor;
/** Tasks who need to be sent a message when a checkpoint is started. */
private final ExecutionVertex[] tasksToTrigger;
/** Tasks who need to acknowledge a checkpoint before it succeeds. */
private final ExecutionVertex[] tasksToWaitFor;
/** Tasks who need to be sent a message when a checkpoint is confirmed. */
private final ExecutionVertex[] tasksToCommitTo;
/** Map from checkpoint ID to the pending checkpoint. */
private final Map<Long, PendingCheckpoint> pendingCheckpoints;
/** Completed checkpoints. Implementations can be blocking. Make sure calls to methods
* accessing this don't block the job manager actor and run asynchronously. */
private final CompletedCheckpointStore completedCheckpointStore;
/** The root checkpoint state backend, which is responsible for initializing the
* checkpoint, storing the metadata, and cleaning up the checkpoint. */
private final CheckpointStorageCoordinatorView checkpointStorage;
/** A list of recent checkpoint IDs, to identify late messages (vs invalid ones). */
private final ArrayDeque<Long> recentPendingCheckpoints;
/** Checkpoint ID counter to ensure ascending IDs. In case of job manager failures, these
* need to be ascending across job managers. */
private final CheckpointIDCounter checkpointIdCounter;
/** The base checkpoint interval. Actual trigger time may be affected by the
* max concurrent checkpoints and minimum-pause values */
private final long baseInterval;
/** The max time (in ms) that a checkpoint may take. */
private final long checkpointTimeout;
/** The min time(in ns) to delay after a checkpoint could be triggered. Allows to
* enforce minimum processing time between checkpoint attempts */
private final long minPauseBetweenCheckpointsNanos;
/** The maximum number of checkpoints that may be in progress at the same time. */
private final int maxConcurrentCheckpointAttempts;
/** The timer that handles the checkpoint timeouts and triggers periodic checkpoints. */
private final ScheduledThreadPoolExecutor timer;
/** The master checkpoint hooks executed by this checkpoint coordinator. */
private final HashMap<String, MasterTriggerRestoreHook<?>> masterHooks;
/** Actor that receives status updates from the execution graph this coordinator works for. */
private JobStatusListener jobStatusListener;
/** The number of consecutive failed trigger attempts. */
private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger(0);
/** A handle to the current periodic trigger, to cancel it when necessary. */
private ScheduledFuture<?> currentPeriodicTrigger;
/** The timestamp (via {@link System#nanoTime()}) when the last checkpoint completed. */
private long lastCheckpointCompletionNanos;
/** Flag whether a triggered checkpoint should immediately schedule the next checkpoint.
* Non-volatile, because only accessed in synchronized scope */
private boolean periodicScheduling;
/** Flag whether a trigger request could not be handled immediately. Non-volatile, because only
* accessed in synchronized scope */
private boolean triggerRequestQueued;
/** Flag marking the coordinator as shut down (not accepting any messages any more). */
private volatile boolean shutdown;
/** Optional tracker for checkpoint statistics. */
@Nullable
private CheckpointStatsTracker statsTracker;
/** A factory for SharedStateRegistry objects. */
private final SharedStateRegistryFactory sharedStateRegistryFactory;
/** Registry that tracks state which is shared across (incremental) checkpoints. */
private SharedStateRegistry sharedStateRegistry;
private boolean isPreferCheckpointForRecovery;
private final CheckpointFailureManager failureManager;
- 里面包含了checkpoint 制作间隔,job所有的tasks以及checkpointIdCounter等信息
- checkpointIdCounter开始计数,checkpointIDCounter.start();
6.6 为ExecutionGraph增加metric
小结
ExecutionGraph是分布式执行job时最核心的数据结构,在JobGraph的基础上增加了并发度的概念。
ExecutionGraph涉及到节点总结如下:
- JobGraph 的 JobVertex 与 ExecutionGraph 的 ExecutionJobVertex 一一对应。
- 每个ExecutionJobVertex 有 parallelism 个 ExecutionVertex。
- 每个JobVertex 可能有 n(n>=0) 个 IntermediateDataSet,在 ExecutionJobVertex 中,一个 IntermediateDataSet 对应一个 IntermediateResult,每一个 IntermediateResult 都有 parallelism 个生产者, 对应 parallelism 个 IntermediateResultPartition
- 每个ExecutionJobVertex 都会和前面的 IntermediateResult 连接,ExecutionVertex 和 IntermediateResult 建立连接生成 ExecutionEdge。