本文内容是基于Flink 1.9来讲解。在执行Flink任务的时候,会涉及到三个Graph,分别是StreamGraph,JobGraph,ExecutionGraph。其中StreamGraph和JobGraph是在client端生成的,ExecutionGraph是在JobMaster中执行的。
- StreamGraph是根据用户代码生成的最原始执行图,也就是直接翻译用户逻辑得到的图
- JobGraph是对StreamGraph进行优化,比如设置哪些算子可以chain,减少网络开销
- ExecutionGraph是用于作业调度的执行图,对JobGraph加了并行度的概念
本篇文章在Flink源码阅读(一)--- StreamGraph 的生成 基础上,介绍下JobGraph的生成
1. JobVertex
在StreamGraph中,每个operator对应一个StreamNode。在JobGraph中,JobVertex对应的是可chain起来的operator list,把一些operator chain起来,可以较少网络以及序列化和反序列化的开销,大部分情况下可以提高作业性能。
2. JobEdge
在StreamGraph中,StreamNode之间的连接关系使用StreamEdge表示。在JobGraph中,JobVertex之间的连接关系使用JobEdge表示。
- JobEdge中存储了target JobVertex信息,没有source JobVertex信息
- JobEdge中存储了source IntermediateDataSet以及source IntermediateDataSetID信息,IntermediateDataSet是edge的输入数据集。
3. JobGraph生成入口StreamingJobGraphGenerator#createJobGraph()方法
private JobGraph createJobGraph() {
// make sure that all vertices start immediately
jobGraph.setScheduleMode(streamGraph.getScheduleMode());
// Generate deterministic hashes for the nodes in order to identify them across
// submission iff they didn't change.
Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
// Generate legacy version hashes for backwards compatibility
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
setChaining(hashes, legacyHashes, chainedOperatorHashes);
setPhysicalEdges();
setSlotSharingAndCoLocation();
configureCheckpointing();
JobGraphGenerator.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);
// set the ExecutionConfig last when it has been finalized
try {
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
}
catch (IOException e) {
throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
"This indicates that non-serializable types (like custom serializers) were registered");
}
return jobGraph;
}
主要做的工作如下
1.1 为streamGraph的StreamNode生成hash值(如果用户为operator指定了uid,就使用用户自定义的,否则自动生成。这个uid不管是用户指定还是自动生成,必须保证Job全局唯一性),这个值以后作为JobVertexID来唯一标识节点
1.2 设置task chain关系,并且把可chain的operator创建一个新的JobVertex添加到JobGraph中
1.2.1 是否可chain,代码逻辑在StreamingJobGraphGenerator.isChainable方法中
- 下游算子的输入只有一个
- 下游算子不为空
- 上游算子不为空
- 上游和下游算子的slotSharingGroup相同
- 下游算子的ChainingStrategy为ALWAYS
- 上游算子的ChainingStrategy为HEAD或者ALWAYS
- 上下游算子中间的edge的Partitioner是ForwardPartitioner
- 上下游算子的并发相同
- streamGraph配置是可以chain的
1.3 设置PhysicalEdges
1.4 设置SlotSharingAndCoLocation,就是把streamGraph StreamNode的SlotSharingGroup属性设置到JobVertex中
1.5 配置checkpoint。主要就是设置triggerVertices(所有的输入节点),commitVertices(所有的节点),ackVertices(所有的节点)
1.6 设置执行配置信息,比如默认并发度,失败时retry次数,retry delay等
通过createJobGraph方法,就完成了StreamGraph到JobGraph的转换。
4. 小结
JobGraph主要是把StreamGraph中,可以chain起来的operator进行合并,这样可以减小网络以及序列化和反序列化的开销。