Flink源码 - 生成JobGraph
什么是jobGraph
JobGraph:StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。
在jobgraph中有一个组成“元素”:JobVertex:jobVertex用于产生intermediateDataset,并通过jobEdge串联不同的jobVertex同时也是将operator chain的"关键点"。 jobVertex是从job层面对task进行抽象。
在获取到StreamGraph后,继续分析,如何通过StreamGraph来生成JobGraph,StreamGraph和JobGraph都是在Client端生成的,我们可以通过debug断点观察Grpah的生成过程
JobGraph的入口在StreamingJobGraphGenerator.createJobGraph()方法中
//创建JobGraph
public static JobGraph createJobGraph(StreamGraph streamGraph) {
return createJobGraph(streamGraph, null);
}
public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {
//StreamingJobGraphGenerator 传入StreamGraph,以及jobId,初始化StreamingJobGraphGenerator
//并创建JobGraph对象
//createJobGraph 具体JobGraph生成的逻辑
return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
}
进入到StreamingJobGraphGenerator方法中,这个方法初始化了一些需要的参数,并通过,JobID和JobName创建了一个JobGraph,
private StreamingJobGraphGenerator(StreamGraph streamGraph, @Nullable JobID jobID) {
this.streamGraph = streamGraph;
this.defaultStreamGraphHasher = new StreamGraphHasherV2();
this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphUserHashHasher());
this.jobVertices = new HashMap<>();
// 已经构建的JobVertex的id集合
this.builtVertices = new HashSet<>();
// 保存chain信息,部署时用来构建 OperatorChain,startNodeId -> (currentNodeId -> StreamConfig)
this.chainedConfigs = new HashMap<>();
// 所有节点的配置信息,id -> StreamConfig
this.vertexConfigs = new HashMap<>();
// 保存每个节点的名字,id -> chainedName
this.chainedNames = new HashMap<>();
this.chainedMinResources = new HashMap<>();
this.chainedPreferredResources = new HashMap<>();
this.chainedInputOutputFormats = new HashMap<>();
// 物理边集合(排除了chain内部的边), 按创建顺序排序
this.physicalEdgesInOrder = new ArrayList<>();
jobGraph = new JobGraph(jobID, streamGraph.getJobName());
}
在调用createJobGraph
这个方法时候开始构建JobGraph图,进入到createJobGraph
方法中
在这里可以分为6 步
- 广度遍历StreamGraph,为每个SteramNode生成hash值,这个算法能够保证如果拓补图没有发生改变,每次生成的hash值都是一样的,同时会生成用户指定hash值(即map.setUidHash("hash")的值),
这里是根据StreamGraph的配置生成的,给每个StreamNode生成一个长度为16的字节数组的hash值,这个散列值用于生成JobGraph的JobVertex的id,在checkpoint恢复的时候会根据jobVertexId为依据,所以在重启的任务的时候,对于对于相同的任务,其各JobVertexID能够保持不变,而StreamGraph中各个StreamNode的ID,就是其包含的StreamTransformation的ID,而StreamTransformation的ID是在对数据流中的数据进行转换的过程中,通过一个静态的累加器生成的,比如有多个数据源时,每个数据源添加的顺序不一致,则有可能导致相同数据处理逻辑的任务,就会对应于不同的ID,所以为了得到确定的ID,在进行JobVertexID的产生时,需要以一种确定的方式来确定其值,要么是通过用户为每个ID直接指定对应的一个散列值,要么参考StreamGraph中的一些特征,为每个JobVertex产生一个确定的ID。
如果用户指定了hash值,则会基于用户的指定的值生成一个长度为16的字节数组,如果用户没有指定,则会由flink生成一个长度为16的字节数组- 构建任务链,属于Flink对我们的任务进行调优,在这里会遍历所有的source节点开始构建chain,当条件满足的时候会将两个operator放到一个线程并行执行,这也可以减少网络的传输,并且在chain中的operator之间传输数据也不需要进行序列化和反序列化,可以提交任务执行效率
- 将每个JobVertex的入边集合也序列化到该JobVertex的StreamConfig中(出边在构建chain的时候已经写入conf中).
4.为每个JobVertex指定 slot sharing group,以及CoLocationGroup
5.设置托管内存分数(权重比)
6.配置检查点,保存点恢复模式以及添加用户分布式缓存的文件
private JobGraph createJobGraph() {
// 创建之前进行预检查
preValidate();
// make sure that all vertices start immediately
//设置调度模式, -- todo 默认是所有job都启动
jobGraph.setScheduleMode(streamGraph.getScheduleMode());
// Generate deterministic hashes for the nodes in order to identify them across
// submission iff they didn't change.
/**
* todo defaultStreamGraphHasher 使用的是StreamGraphHasherV2类,
* legacyStreamGraphHashers使用的是StreamGraphUserHashHasher类
* 两个类都实现了StreamGraphHasher接口,在使用userHash的时候,是调用
* 我们在算子后面调用.setUidHash("hash")传入的值生成的
*/
// todo 广度优先遍历遍历StreamGraph,并且为每个SteamNode生成散列值, 这里的散列值产生算法,可以保证如
// 果提交的拓扑没有改变,则每次生成的散列值都是一样的。一个StreamNode的ID对应一个散列值。
Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
// Generate legacy version hashes for backwards compatibility
// 为了保持兼容性创建的hash, 这里是user 传入的setUidHash
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}
//主要从source构建任务链
setChaining(hashes, legacyHashes);
// 将每个JobVertex的入边集合也序列化到该JobVertex的StreamConfig中
// (出边集合已经在setChaining的时候写入了)
setPhysicalEdges();
//设置slot共享和coLocation。同一个coLocationGroup的task需要在同一个slot中运行
// CoLocationGroup =设置标识共存组的键。具有相同共定位键的操作符将由调度器将它们对应的子任务放到相同的槽中
setSlotSharingAndCoLocation();
// 设置托管内存分数(权重比)
setManagedMemoryFraction(
Collections.unmodifiableMap(jobVertices),
Collections.unmodifiableMap(vertexConfigs),
Collections.unmodifiableMap(chainedConfigs),
id -> streamGraph.getStreamNode(id).getMinResources(),
id -> streamGraph.getStreamNode(id).getManagedMemoryWeight());
// 配置检查点
configureCheckpointing();
// 设置保存点恢复设置
jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
// 添加用户分布式缓存
JobGraphUtils.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);
// set the ExecutionConfig last when it has been finalized
try {
//设置 job execution 配置
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
}
catch (IOException e) {
throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
"This indicates that non-serializable types (like custom serializers) were registered");
}
return jobGraph;
}
主要分析一下setChaining方法中的逻辑
setChaining这里调用了createChain方法,传入了source节点和节点在chain的索引位置,以及一个OperatorChainInfo对象,这个对用
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
// 从每个source节点构建 任务链, 如果有多个source遍历每个source调用createChain方法
for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
createChain(
sourceNodeId, //source 节点id
0, // chain的索引位置 初始为0
// 一个私有类 递归调用期间帮助维护操作符链的信息
// sourceNodeId : chain的起始node id 为source
// hashes, legacyHashes : 是前面产生的StreamNode的hash值
new OperatorChainInfo(sourceNodeId, hashes, legacyHashes, streamGraph));
}
}
createChain方法
1.如果startNodeId已经创建chain,那么直接返回一个空list
2.如果没有构建,则开始构建chian2.1 首先会循环当前StreamNode的所有StreamEdge(StreamEdge记录了下游节点id,根据StreamGraph可以获 取指定id的StreamNode),通过isChainable方法去判断两个节点是否能够chain在一起,如果能chain在一 起,将edge存入chainableOutputs集合,否则放入nonChainableOutputs集合
2.2 循环将chainableOutputs集合,如果有值,递归的调用createChain方法,当循环将chainableOutputs集 合循环完,会循环nonChainableOutputs集合递归调用createChain方法,开始创建新的chain,这时候是构 建以下游StreamNode为顶点开始构建chain
2.3 如果当前节点是chain的起始节点,会创建一个JobVertex,然后循环chain的出边(outEdge)调用connect 方法,将它指向的出边(outEdge)通过JobEdge进行连接,并将配置信息通过StreamConfig提供的 setTransitiveChainedTaskConfigs方法将配置信息保存到JobVertex的configuration中
如果是chain的中间节点,那么会将配置信息写入到StreamConfig中,并写入到chainedConfigs中,其数据 结构为 Map<Integer, Map<Integer, StreamConfig>> key为 chain的顶点,value为每个中间节点 配置信息
private List<StreamEdge> createChain(Integer currentNodeId, int chainIndex, OperatorChainInfo chainInfo) {
Integer startNodeId = chainInfo.getStartNodeId();
// builtVertices存放了已经被构建了的StreamNode ID,避免重复操作
if (!builtVertices.contains(startNodeId)) {
// 过渡用的出边集合, 用来生成最终的 JobEdge,
// 注意:存在某些StreamNode会连接到一起,比如source->map->flatMap,如果这几个StreamNode连接到一起,
// 则transitiveOutEdges是不包括 chain 内部的边,既不包含source->map的StreamEdge的
List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
// 可以与当前节点链接的StreamEdge
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
// 不可以与当前节点链接的StreamEdge
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
// 获取当前处理node
StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
// 将当前节点的出边分成 chainable 和 nonChainable 两类
// 分类可以被chain的edge和不可被chain的edge,使用isChainable的方法判断
// 如果能chain 放入chainableOutputs集合,否则放入nonChainableOutputs集合
for (StreamEdge outEdge : currentNode.getOutEdges()) {
if (isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}
// todo 递归调用
// 对于每个可连接的StreamEdge,递归调用其目标StreamNode,startNodeId保持不变,但是chainIndex会加1
for (StreamEdge chainable : chainableOutputs) {
// 如果是可被chain的StreamEdge,递归调用createChain方法
// 注意currentNode是chainable.getTargetId(),也就是(downStreamNode)下游节点id
// 递归直到currentNode的out edge为不可chain的edge,会执行下一段for循环,不可chain的边被加入transitiveOutEdges,最终返回到递归最外层
// 这样以来,transitiveOutEdges收集齐了整个chain所有的出边
transitiveOutEdges.addAll(
createChain(chainable.getTargetId(), chainIndex + 1, chainInfo));
}
// 对于每个不可连接的StreamEdge,则将对应的StreamEdge就是当前链的一个输出StreamEdge,所以会添加到
// transitiveOutEdges这个集合中 然后递归调用其Target node,注意,startNodeID变成了nonChainable这个
// StreamEdge的输出节点id,chainIndex也赋值为0,说明重新开始一条链的建立
for (StreamEdge nonChainable : nonChainableOutputs) {
// 将不可被chain的StreamEdge,添加到transitiveOutEdges集合中
transitiveOutEdges.add(nonChainable);
// 调用createChain,构建新的chain
// todo 这里传入StreamNodeId 都是下游StreamNodeId, 并创建新的OperatorChainInfo对象;
createChain(nonChainable.getTargetId(), 0, chainInfo.newChain(nonChainable.getTargetId()));
}
//生成当前节点的显示名,如:"Keyed Aggregation -> Sink: Unnamed"
chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
// 设置chain的最小资源
chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
// 设置chain的首选资源
chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
OperatorID currentOperatorId = chainInfo.addNodeToChain(currentNodeId, chainedNames.get(currentNodeId));
if (currentNode.getInputFormat() != null) {
getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
}
if (currentNode.getOutputFormat() != null) {
getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
}
// 如果当前节点是起始节点,则直接创建 JobVertex 并返回 StreamConfig, 否则先创建一个空的 StreamConfig
// createJobVertex 函数就是根据 StreamNode 创建对应的 JobVertex, 并返回了空的 StreamConfig
// 如果currentNodeId和startNodeId相等,说明需要创建一个新的chain,会生成一个JobVertex
StreamConfig config = currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, chainInfo)
: new StreamConfig(new Configuration());
// 设置 JobVertex 的 StreamConfig, 基本上是序列化 StreamNode 中的配置到 StreamConfig 中.
// 其中包括 序列化器, StreamOperator, Checkpoint 等相关配置
// 设置的Vertex属性到config中
setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
// 如果相等 说明处理到了当前节点,也就是说chain的子节点处理完了 需要创建Edge将两个JobVertex进行连接
// startNodeId是 chain的开始节点, currentNodeId 是当前节点,
// todo 如果开始节点是1 当前节点是4, 那么他的逻辑顺序则是 1 => 4 , 1 => 3 , 1 => 2 , 1=>1
// 当 当前节点等于开始节点的时候, 说明chain中间的节点处理完了, 需要处理 chain的顶点, 即chain的开始节点
if (currentNodeId.equals(startNodeId)) {
//如果是chain的起始节点。(不是chain的中间节点,会被标记成 chain start)
// 意味着一个新chain的开始
config.setChainStart();
config.setChainIndex(0);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
//我们也会把物理出边写入配置, 部署时会用到
config.setOutEdgesInOrder(transitiveOutEdges);
config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
// 将当前节点(headOfChain)与所有出边相连
// 对于每一个chain,把它和指向下一个chain的出边连接起来
for (StreamEdge edge : transitiveOutEdges) {
//通过StreamEdge构建出JobEdge,创建 IntermediateDataSet ,用来将JobVertex和JobEdge相连
connect(startNodeId, edge);
}
// 将chain中所有子节点的StreamConfig写入到 headOfChain 节点的 CHAINED_TASK_CONFIG 配置中
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
} else {
//如果是 chain 中的子节点
chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());
config.setChainIndex(chainIndex);
// 获取到被chain的节点
StreamNode node = streamGraph.getStreamNode(currentNodeId);
config.setOperatorName(node.getOperatorName());
// 将当前节点的StreamConfig添加到该chain的config集合中
// 关联chain内节点的配置信息到chain的起始节点上
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}
config.setOperatorID(currentOperatorId);
//如果节点的输出StreamEdge已经为空,则说明是链的结尾
if (chainableOutputs.isEmpty()) {
config.setChainEnd();
}
// 返回连往chain外部的出边集合
return transitiveOutEdges;
} else {
//startNodeId 如果已经构建过,则直接返回
return new ArrayList<>();
}
}
看一下createChain内调用的一些方法
isChainable
这个方法用于判断两个StreamEdge的源(即 源产生的operator<并不一定是 source operator>)是否可以chain一起进行执行,只有当所有的条件都满足的时候才返回true(即可以chain)
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
/** 获取StreamEdge的源和目标StreamNode */
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
/**
* 1、下游节点只有一个输入
* 4、上下游节点在一个槽位共享组内
* 5、下游节点的连接策略是 ALWAYS
* 6、上游节点的连接策略是 HEAD 或者 ALWAYS
* 7、sourceFactory不能是YieldingOperatorFactory
* 8、edge 的分区函数是 ForwardPartitioner 的实例
* 9、shuffle模式不能是batch模式
* 10、上下游节点的并行度相等
* 11、可以进行节点连接操作
*/
return downStreamVertex.getInEdges().size() == 1
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& edge.getShuffleMode() != ShuffleMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled();
}
createJobVertex
JobVertex只有在chain的顶点创建,chain的其余节点以配置信息的形式记录在JobVertex中
这个方法首先会获取StreamNode(chain的开始节点),并创建一个JobVertexId,是根据当前StreamNode的hash创建的, 获取到chain顶点的operator hash对 (用户与flink生成),然后根据JobVertexName和JobVertexId以及hash对生成JobVertex,生成jobVertex后设置一些参数,将jobVertex存入到jobGreaph中,到这里,一个JobVertex就生成完了
private StreamConfig createJobVertex(
Integer streamNodeId,
OperatorChainInfo chainInfo) {
JobVertex jobVertex;
// 获取开始节点
StreamNode streamNode = streamGraph.getStreamNode(streamNodeId);
byte[] hash = chainInfo.getHash(streamNodeId);
if (hash == null) {
throw new IllegalStateException("Cannot find node hash. " +
"Did you generate them before calling this method?");
}
// 通过hash创建JobVertexId
JobVertexID jobVertexId = new JobVertexID(hash);
// 获取任务链 operator的哈希值
List<Tuple2<byte[], byte[]>> chainedOperators = chainInfo.getChainedOperatorHashes(streamNodeId);
List<OperatorIDPair> operatorIDPairs = new ArrayList<>();
if (chainedOperators != null) {
for (Tuple2<byte[], byte[]> chainedOperator : chainedOperators) {
OperatorID userDefinedOperatorID = chainedOperator.f1 == null ? null : new OperatorID(chainedOperator.f1);
operatorIDPairs.add(OperatorIDPair.of(new OperatorID(chainedOperator.f0), userDefinedOperatorID));
}
}
// 如果chain是 输人输出格式, 创建InputOutputFormatVertex对象, -- > 这个只是尝试性的
if (chainedInputOutputFormats.containsKey(streamNodeId)) {
jobVertex = new InputOutputFormatVertex(
chainedNames.get(streamNodeId),
jobVertexId,
operatorIDPairs);
chainedInputOutputFormats
.get(streamNodeId)
.write(new TaskConfig(jobVertex.getConfiguration()));
} else { // 否则创建 JobVertex 对象
jobVertex = new JobVertex(
chainedNames.get(streamNodeId),
jobVertexId,
operatorIDPairs);
}
for (OperatorCoordinator.Provider coordinatorProvider : chainInfo.getCoordinatorProviders()) {
try {
jobVertex.addOperatorCoordinator(new SerializedValue<>(coordinatorProvider));
} catch (IOException e) {
throw new FlinkRuntimeException(String.format(
"Coordinator Provider for node %s is not serializable.", chainedNames.get(streamNodeId)));
}
}
jobVertex.setResources(chainedMinResources.get(streamNodeId), chainedPreferredResources.get(streamNodeId));
jobVertex.setInvokableClass(streamNode.getJobVertexClass());
int parallelism = streamNode.getParallelism();
if (parallelism > 0) {
jobVertex.setParallelism(parallelism);
} else {
parallelism = jobVertex.getParallelism();
}
jobVertex.setMaxParallelism(streamNode.getMaxParallelism());
if (LOG.isDebugEnabled()) {
LOG.debug("Parallelism set: {} for {}", parallelism, streamNodeId);
}
// TODO: inherit InputDependencyConstraint from the head operator
// 设置输入状态的情况下安排任务方式。 , 默认 如果有可使用的输入,则调度该任务
jobVertex.setInputDependencyConstraint(streamGraph.getExecutionConfig().getDefaultInputDependencyConstraint());
//将生成好的jobVertex存入jobVertices列表
jobVertices.put(streamNodeId, jobVertex);
//已经构建好的jobVertex Id列表
builtVertices.add(streamNodeId);
// 将jovVertex添加到JobGraph图中
jobGraph.addVertex(jobVertex);
return new StreamConfig(jobVertex.getConfiguration());
}
connect
这个方法主要用于 将两个JobVertex连接在一起,首先会获取到ShuffleMode(数据交互模式)生成resultPartitionType,然后通过不同的partition创建不同的JobEdge,如果是ForwardPartitioner或RescalePartitioner使用POINTWISE模式,否则使用ALL_TO_ALL(即 一个subTask对应下游subTask是一对一还是一对多),最后connectNewDataSetAsInput方法,创建一个JobEdge,将两个JobVertex连接在一起
private void connect(Integer headOfChain, StreamEdge edge) {
physicalEdgesInOrder.add(edge);
Integer downStreamVertexID = edge.getTargetId();
JobVertex headVertex = jobVertices.get(headOfChain);
JobVertex downStreamVertex = jobVertices.get(downStreamVertexID);
StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());
// 每次循环 input数量+1
downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1);
//获取 edge的分区模式 --
StreamPartitioner<?> partitioner = edge.getPartitioner();
//结果分区的类型
ResultPartitionType resultPartitionType;
// 最终返回 PIPELINED_BOUNDED 或者 BLOCKING
// 如果没有定义,会进行判断全局数据交换模式, 最终确定分区模式
switch (edge.getShuffleMode()) {
case PIPELINED:
resultPartitionType = ResultPartitionType.PIPELINED_BOUNDED;
break;
case BATCH:
resultPartitionType = ResultPartitionType.BLOCKING;
break;
case UNDEFINED:
resultPartitionType = determineResultPartitionType(partitioner);
break;
default:
throw new UnsupportedOperationException("Data exchange mode " +
edge.getShuffleMode() + " is not supported yet.");
}
JobEdge jobEdge;
// 根据不同的分区模式创建JobEdge
// 如果分区模式为ForwardPartitioner或RescalePartitioner;
// 则使用 POINTWISE分配模式, 否则使用ALL_TO_ALL
if (isPointwisePartitioner(partitioner)) {
jobEdge = downStreamVertex.connectNewDataSetAsInput(
headVertex,
// 每个生产子任务都连接到消费任务的一个或多个子任务
DistributionPattern.POINTWISE,
resultPartitionType);
} else {
jobEdge = downStreamVertex.connectNewDataSetAsInput(
headVertex,
// 每个生产子任务都与消费任务的子任务相连接
DistributionPattern.ALL_TO_ALL,
resultPartitionType);
}
// set strategy name so that web interface can show it.
// 设置分发策略模式,用于 web显示
jobEdge.setShipStrategyName(partitioner.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("CONNECTED: {} - {} -> {}", partitioner.getClass().getSimpleName(),
headOfChain, downStreamVertexID);
}
}
downStreamVertex.connectNewDataSetAsInput方法
首先由input的JobVertex(即上游),创建一个IntermediateDataSet(简称 dataSet),并将自己添加到dataSet的producer中,,然后当前JobVertex(即下游)构建一个JobEdge,并将edge添加到inputs的列表中,并将dataSet的consumer进行赋值为JobEdge, 这是JobEdge已经构建完毕了
JobEdge中的dataSet中有两个参数(producer,consumer),分别对应生产者(上游JobVertex)和消费者(下游JobVertex),通过这种形式将两个JobVertex进行连接
public JobEdge connectNewDataSetAsInput(
JobVertex input,
DistributionPattern distPattern,
ResultPartitionType partitionType) {
// 由上游节点 创建一个IntermediateDataSet
// todo 中间数据集是操作符 - sour操作或任何中间操作 - 产生的数据集
// 中间数据集可能被其他操作符读取、具体化或丢弃。
IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);
// 由此可以看出, 他的输入就是上游节点的一个输出
// JobEdge的输入就是当前节点
// 构建一个jobEdge
JobEdge edge = new JobEdge(dataSet, this, distPattern);
// jobEdge 连接edge
this.inputs.add(edge);
dataSet.addConsumer(edge);
return edge;
}
构建JobGraph最主要的逻辑就是构建chain,源码中剩下方法就是设置了一些JobGraph的参数,比如设置slot共享组,设置托管内存分数,配置检查点等内容,最终会将JobGraph构建完成并返回
下面是一段简单的代码,我们看一下他的生成JobGraph的过程
在StreamGraph向JobGraph转换的时候,会将两个或多个算子chain一起进行执行,这是Flink对我们的程序进行的一个优化,
现在我看一下 具体任务执行后的样子