1. 主要内容
本文主要是将用户写的java程序如何生成Flink JobGraph的过程与逻辑追踪了一下,欢迎有兴趣的读者一起探讨与交流
2. 用户程序
public class WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
dataStream.print();
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word: sentence.split(" ")) {
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
现在开始追踪代码
-
StreamExecutionEnvironment.getExecutionEnvironment()
的代码逻辑如下
public static StreamExecutionEnvironment getExecutionEnvironment() {
//在streaming 环境中,这个一直是null
if (contextEnvironmentFactory != null) {
return contextEnvironmentFactory.createExecutionEnvironment();
}
// because the streaming project depends on "flink-clients" (and not the other way around)
// we currently need to intercept the data set environment and create a dependent stream env.
// this should be fixed once we rework the project dependencies
// 肯定会走到这里,在之前的分享中提到,这里会调用ContextEnvironmentFactory.ContextEnvironmentFactory,即 ContextEnvironment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
if (env instanceof ContextEnvironment) {
return new StreamContextEnvironment((ContextEnvironment) env);
} else if (env instanceof OptimizerPlanEnvironment || env instanceof PreviewPlanEnvironment) {
return new StreamPlanEnvironment(env);
} else {
return createLocalEnvironment();
}
}
上述代码最终会返回StreamContextEnvironment((ContextEnvironment) env)
, 接下来env
通过socketTextStream()
方法创建DataStream, 在说明示例中的几个Operator前,先阐述一下DataStream类之间继承关系, 如下图:
socketTextStream
创建的为DataStreamSource
,除了这个之外,其它比较重要的DataStream类为KeyedStream
, 这个是调用了keyBy()
API生成的DataStream, 而常见的map()
、filter
则生成SingleOutputSteamOperator
。
回到socketTextStream()
, 代码为:
public DataStreamSource<String> socketTextStream(String hostname, int port, String delimiter, long maxRetry) {
return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry),
"Socket Stream");
}
最后会调用:
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
if (typeInfo == null) {
if (function instanceof ResultTypeQueryable) {
typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
} else {
try {
typeInfo = TypeExtractor.createTypeInfo(
SourceFunction.class,
function.getClass(), 0, null, null);
} catch (final InvalidTypesException e) {
typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
}
}
}
boolean isParallel = function instanceof ParallelSourceFunction;
clean(function);
final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
}
其中的参数function为SocketTextStreamFunction()
这个function会读取socket的内容生成StreamSource。而以上代码的逻辑如下:
- 首先提取function的type, 根据后面的代码逻辑,
SocketTextStreamFunction
会得到TypeInformation<String>
- 清除闭包field
- 构造SourceOperator. 其中DataStreamSource的构造函数如下
public DataStreamSource(StreamExecutionEnvironment environment,
TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator,
boolean isParallel, String sourceName) {
super(environment, new SourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));
this.isParallel = isParallel;
if (!isParallel) {
setParallelism(1);
}
}
在flink中StreamOperator与Transformation类之间继承与实现关系如下:
后面的.flatMap
均为DataStream
提供的方法, 代码为
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
getType(), Utils.getCallLocationName(), true);
return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));
}
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operator,
outTypeInfo,
environment.getParallelism());
@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
可以看到整体逻辑为:
-
用户的function 构造了StreamOperator, StreamOperator结构如下:
-
通过输入的transformation与StreamOperation构造当前的当前的Transformation
通过当前的Transformation构造DataStream
将当前Transformation放入当前StreamEnvironment的中, 后面生成执行计划时会用这个数据结构
后面的几个operator如KeyBy, Filter 其DataStream生成逻辑类似,感兴趣的读者可以自行去查看
最后总结一下DataStream、StreamOperator、StreamTransformation之间的调用关系:
3. StreamGraph
StreamContextEnvironment
通过execute
开始整个任务的执行, 主要代码如下:
public JobExecutionResult execute(String jobName) throws Exception {
...
StreamGraph streamGraph = this.getStreamGraph();
...
return ctx
.getClient()
.run(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader(), ctx.getSavepointRestoreSettings())
.getJobExecutionResult();
}
其中getStreamGraph()
是获得StreamGraph主要处理函数,其核心代码为:
public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {
//StreamGraphGenerator的构造函数将env传过去了,主要是使用了env中一些配置和对应的transformations
return new StreamGraphGenerator(env).generateInternal(transformations);
}
参数transformations
是整个job生成的transformation集合, 每调用一个API 算子就会生成一个或多个transformations
。 紧接着调用generateInternal()
, 其代码如下:
private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
for (StreamTransformation<?> transformation: transformations) {
//遍历每一个transformation并处理
transform(transformation);
}
return streamGraph;
}
generateInternal
会对每一个transformation执行transform()
方法,该方法的核心逻辑为:
...
Collection<Integer> transformedIds;
if (transform instanceof OneInputTransformation<?, ?>) {
transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
} else if (transform instanceof SourceTransformation<?>) {
transformedIds = transformSource((SourceTransformation<?>) transform);
} else if (transform instanceof SinkTransformation<?>) {
transformedIds = transformSink((SinkTransformation<?>) transform);
} else if (transform instanceof UnionTransformation<?>) {
transformedIds = transformUnion((UnionTransformation<?>) transform);
} else if (transform instanceof SplitTransformation<?>) {
transformedIds = transformSplit((SplitTransformation<?>) transform);
} else if (transform instanceof SelectTransformation<?>) {
transformedIds = transformSelect((SelectTransformation<?>) transform);
} else if (transform instanceof FeedbackTransformation<?>) {
transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
} else if (transform instanceof CoFeedbackTransformation<?>) {
transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
} else if (transform instanceof PartitionTransformation<?>) {
transformedIds = transformPartition((PartitionTransformation<?>) transform);
} else if (transform instanceof SideOutputTransformation<?>) {
transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
} else {
throw new IllegalStateException("Unknown transformation: " + transform);
}
return transformedIds
...
上面的逻辑是判断对应的transformation类型,生成应的Map<transformation, List<integer>>, List<integer>的作用是什么呢? 比如说同一个DataStream接多个Sink时,JobGraph会依据id来来获取source, 此时List的size = 1, 具体的会在JobGraph用到。现在以transformOneInputTransform()
例进行代码说明
private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
//处理这个Transformation的Input
Collection<Integer> inputIds = transform(transform.getInput());
...
//Slot Group 会在后会给Task分配Slot中用到,用于隔离不的Task在不同的Slot中,此处先不管
String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
//向StreamGraph中添加StreamNode及StreamEdge
streamGraph.addOperator(transform.getId(),
slotSharingGroup,
transform.getCoLocationGroupKey(),
transform.getOperatorFactory(),
transform.getInputType(),
transform.getOutputType(),
transform.getName());
...
// 添加输入输出关系的edge, 常规的opeartor只是添加一个连接input Node与本node的边,特殊的streamNode
// 如split/select 过程稍微复杂一点,下面有针对split/select说明
for (Integer inputId: inputIds) {
streamGraph.addEdge(inputId, transform.getId(), 0);
}
return Collections.singleton(transform.getId());
}
以上为transformOneInputTransform
处理逻辑,其它的Transformation 类似,可以发现StreamGraph中记录了两个最重要的部分: Transformation及连接Transformation的Edge, 后面生成JobGraph主要就是用这个两个数据结构
查看StreamGraph代码可以知道StreamGraph中主要数据结构有
-
StreamNode
主要封装transformation及transformation中的元数据 -
StreamEdge
主要用于串联StreamNode的依赖关系
其组织结构如下:
StreamNode通过StreamEdge相连,一个StreamNode可能有多个输入Edge,多个输出Edge,比如说某个Operator有多个输出,多个输入。而StreamNode中有以下关键的数据结构:
- InputEdges和OutputEdges 输入与输出edges。
- parallelism 结点并行度,每个结点可以单独设置,在生成ExecutionGraph时会用到这个parallelism,如果不设置,默认值为其输入的并行度。
- SlotSharingGroup SlotGroup会影响生成的Task对Slot的分配策略。后面会专门针对这个进行说明
-
jobVertexClass 这是Flink Job生成ExecutionGraph后对应Task执行入口函数,每一个Execution对应一个jobVertexClass, 其基类为AbstractInvokable,组织结构如下:
当ExecutionGraph在TaskManager执行的时候,执行的入口就在上述的类中, 比如说初始化Task、Checkpoint等等
- id, 这个id其实是Transformation对应的id,通过id与Transformation 建立一一映射关系
在StreamGraph中还有一类特殊的StreamNode:虚节点。虚结点在StreamAPI中对应的操作如'DataStream.split(xxx).select(xxx)', 针对这些特殊的结点, flink 做了特殊处理。
Split和Select处理
假如一个DataStream有Split与Select这两个operator, 其组织结构为streamA.split().select(), 那么在对应的StreamGraph图中,这一部分关系为:
在翻译Split operator的时候,会生成一个虚拟的StreamNode B, 并把selector放入StreamNode B的outputSelectors中(数据结构为List<OutputSelector>), Input 为StreamNode A, 而在翻译
Select的时候,首先翻译其输入VirtualNode B, 在翻译VirtualNode B时会在StreamGraph的virtualSelectNodes的Map中添加虚拟StreamNode与虚拟StreamNode输入的映射关系,Map的key为VirtualNode B id, 值为Tuple2<>,分别为StreamNode A id, SelectNames,即表示: 虚拟StreamNode的输入为StreamNode A并通过SelectNames从A中获取SplitStream。
问题来了,那边的关系呢? 这需要在翻译下游StreamNode才会确定,在翻译OtherNode时,添加边的逻辑请看上面代码transformOneInputTransform
, input 为虚拟StreamNode, 在添加边的逻辑中有
private void addEdgeInternal(Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner<?> partitioner,
List<String> outputNames,
OutputTag outputTag) {
if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
if (outputTag == null) {
outputTag = virtualSideOutputNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag);
} else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
//第一次进来进入这里,因为upStreamVertexID是一个虚拟的StreamNode
int virtualId = upStreamVertexID;
//得到SelectNode的InputNode id, 即StreamNode A id
upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
if (outputNames.isEmpty()) {
// selections that happen downstream override earlier selections
outputNames = virtualSelectNodes.get(virtualId).f1;
}
//会再一次调addEdgeInternal, 输入为 StreamNode A
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
if (partitioner == null) {
partitioner = virtualPartitionNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag);
} else {
//第二次进入走到这里
StreamNode upstreamNode = getStreamNode(upStreamVertexID);
StreamNode downstreamNode = getStreamNode(downStreamVertexID);
...
// 构造StreamEdge, 在split/select 模型中,edge会保存outputNames, 而upstreamNode 记录Selector函数
StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag);
getStreamNode(edge.getSourceId()).addOutEdge(edge);
getStreamNode(edge.getTargetId()).addInEdge(edge);
}
FeedBack、Partition、SideOutput
处理基本与Split/Select类似,请参考以上内容
最终生成一个StreamGraph,其包括所有StreamNode与StreamEdge, 通过这些StreamNodes、StreamEdges及每一个Nodes的配置(如并行度、SlotGroup等)来生成一个完整的JobGraph
JobGraph
得到StreamGraph
后, 现在我们看一下如何生成JobGraph
, 其调用链路为 ClusterClient#run()
--> ClusterClient#getJobGraph()
--> StreamingPlan#getJobGraph()
--> StreamGraph#getJobGraph
--> StreamingJobGraphGenerator#createJobGraph()
--> StreamingJobGraphGenerator#createJobGraph()
主要逻辑在 createJobGraph()
, 代码如下:
private JobGraph createJobGraph() {
// make sure that all vertices start immediately
jobGraph.setScheduleMode(ScheduleMode.EAGER);
// Generate deterministic hashes for the nodes in order to identify them across
// submission iff they didn't change.
Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
// Generate legacy version hashes for backwards compatibility
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
//见下面详解
setChaining(hashes, legacyHashes, chainedOperatorHashes);
//根据setChaining得到的结果设置物理边
setPhysicalEdges();
//设置jobGraph的SlotSharingGroup和CoLocationGroup
setSlotSharingAndCoLocation();
/*
设置jobGraph的各个 JobVertex 的checkpoint 信息
比如说source JobVertex 需要trigger checkpoint
所有的JobVertex需要commit和ack checkpoint
*/
configureCheckpointing();
JobGraphGenerator.addUserArtifactEntries(streamGraph.getEnvironment().getCachedFiles(), jobGraph);
// set the ExecutionConfig last when it has been finalized
try {
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
}
catch (IOException e) {
throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
"This indicates that non-serializable types (like custom serializers) were registered");
}
return jobGraph;
}
代码逻辑如下:
- 遍历stream graph
- 生成operatorChain
- 设置物理边
- 设置SlotSharing Group
3.1 遍历
遍历StreamGraph 会从source开始遍历求每一个StreamNode的hash码,在计算的时候,一定会确保一个StremNode的所有输入Node都已经计算过了之后才会计算当前的StreamNode
3.2 Operator Chain
3.2.1 opearor chain 及作用
在StreamGraph中可以知道一个Operator对应一个StreamNode, 考虑一个日常经常遇到的问题,一个DataStream.map().filter()
这个关系中map和filter Operator会组成不同的StreamNode,最后生成Task, 如果这两个Task不在同一个Slot或在不同一个TaskManager中,数据会经过网络从map传到filter,执行性能会很差,考虑到这一点,flink引入 operator chain的概念, 一个operator chain 代表一组可以在同一个Slot执行的Operator串
3.2.2 什么样的情况可以chain在一起
根据源码信息,如果一个上游opeartor A与下游满足以下关系则可以串在一起
- 下游的input只有一个即上游
- 属于同一个SlotSharingGroup
- 允许Chain打开
- Partitioner 为ForwardPartitioner
- 并行度一致
- ChainingStrategy允许chain在起
当然一个chain可以chain多个operator,只要连续的两个operator满足以下关系
return downStreamVertex.getInEdges().size() == 1
&& outOperator != null
&& headOperator != null
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled();
3.2.3 代码逻辑
现在以一个简单的StreamGraph为例来说明生成JobGraph的流程,如图:
上面函数中setChaining
函数会调用createChain
函数三次
第一次调用createChain
private List<StreamEdge> createChain(
Integer startNodeId, //Node A
Integer currentNodeId, //Node A
Map<Integer, byte[]> hashes, //空
List<Map<Integer, byte[]>> legacyHashes, //size = 1
int chainIndex, //chainIndex = 0
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes //空
) {
if (!builtVertices.contains(startNodeId)) {
List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
//currentNodeId为Node A, outedges为size = 1,后面的chainableOutputs size = 0, nonChainableOutputs size 为 1
for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
if (isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}
//chainableOutputs为空,skip
for (StreamEdge chainable : chainableOutputs) {
transitiveOutEdges.addAll(
createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
}
//nonChainableOutputs不为空
for (StreamEdge nonChainable : nonChainableOutputs) {
//添加Edge D
transitiveOutEdges.add(nonChainable);
//见第二次调用createChain
createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
}
/*
往chainedOperatorHashes添加 Node A的id, 由于在第二次和三次调用时已
放入Node B的id, 此处当前的值为
key value
NodeB Hash List<
Tuple2<Node C hash, Node C hash1>
Tuple2<Node B hash, Node B hash1>
>
*/
List<Tuple2<byte[], byte[]>> operatorHashes =
chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
//得到Node A的hash值
byte[] primaryHashBytes = hashes.get(currentNodeId);
/* legacyHashes的size 为1, 实际上下面的逻为chainedOperatorHashes 的key为 Node A中添加 NodeA的添加Hash
最后组织形式为:
Key Value
NodeB Hash List<
Tuple2<Node C hash, Node C hash1>
Tuple2<Node B hash, Node B hash1>
>
Node A Hash List<<Tuple2<Node A hash, Node A hash1>
*/
for (Map<Integer, byte[]> legacyHash : legacyHashes) {
operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
}
/*
chainedNames添加Stream Node A的name
执行完之后值为
<3, StreamNode C>
<2, StreamNode B -> StreamNode C>
<1, StreamNode A>
*/
chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
//后面两个基本没有用到,就直接过去了,有兴趣的读者
chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
/*
此时config会执行下面的createJobVertex, 其主要逻辑为
生成JobVertex, 查看JobVertex的构造函数:
jobVertex = new JobVertex(
chainedNames.get(streamNodeId),
jobVertexId, //根据Node A的hash
legacyJobVertexIds, // List<Node A hash1>
chainedOperatorVertexIds, //值为List<Node A hash> 代表这个jobVertex是一个operator chain, 其chain中只有一个StreamNode
userDefinedChainedOperatorVertexIds //值为List<Node A hash1>
);
*/
StreamConfig config = currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
: new StreamConfig(new Configuration());
/*
设置StreamNode A的一些相关配置,如并行度、Checkpoint配置等,
逻辑比较简单,感兴趣的同学自行点进去看
*/
setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
//currentNodeId等于startNodeId, 执行当前代码块
if (currentNodeId.equals(startNodeId)) {
//知道Node A是chain 的开始
config.setChainStart();
config.setChainIndex(0);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
config.setOutEdgesInOrder(transitiveOutEdges); //transitiveOutEdges是空
/*
outEdges 为 edges D
*/
config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
/*
transitiveOutEdges size = 1, 值为edge D,
整体逻辑为给JobGraph 添加物理边连接JobVertex A(StreamNode A) 和 JobVertex B(StreamNode B和StreamNode C)
给物理边添加Partitioner, partitioner 作用: 根据下游的并行度、Operator类型,决定如何将当前Opeartor的结果传递给下游
比如说 ForwardPartitioner(上游并行度与下游并行度一致)
RescalePartitioner(上游并行度与下游并行度不一致)
其它,比如说是 Agg
关于这一部分更为详细的说明见Flink 并行度那一节说明
*/
for (StreamEdge edge : transitiveOutEdges) {
connect(startNodeId, edge);
}
/*
chainedConfigs.get(startNodeId) 的值在第二、三次调用已经计算过了
key value
2 3, Node C config
chainedConfigs.get(startNodeId) 值为空
*/
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
} else {
chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());
config.setChainIndex(chainIndex);
StreamNode node = streamGraph.getStreamNode(currentNodeId);
config.setOperatorName(node.getOperatorName());
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}
config.setOperatorID(new OperatorID(primaryHashBytes));
// chainableOutputs.isEmpty()为true,
if (chainableOutputs.isEmpty()) {
config.setChainEnd();
}
// transitiveOutEdges 的值为Edge D
return transitiveOutEdges;
} else {
return new ArrayList<>();
}
第二次调用createChain
private List<StreamEdge> createChain(
Integer startNodeId, //Node B
Integer currentNodeId, //Node B
Map<Integer, byte[]> hashes, //空
List<Map<Integer, byte[]>> legacyHashes, //size = 1
int chainIndex, //chainIndex = 0
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes //空
) {
if (!builtVertices.contains(startNodeId)) {
List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
//currentNodeId为Node B, outedges为size = 1,后面的chainableOutputs = 1, nonChainableOutputs 为空
for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
if (isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}
for (StreamEdge chainable : chainableOutputs) {
//见第三次调用createChain, createChain返回值一个空List
transitiveOutEdges.addAll(
createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
}
//nonChainableOutputs 为空, skip
for (StreamEdge nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(nonChainable);
createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
}
/*
往chainedOperatorHashes添加 Node B的id, 由于在第三次调用时已
放入Node B的id, 此当前的值为
List<Tupe2<>> 的size = 1, 值为 Tuple2<Node C hash1, Node C hash2>
*/
List<Tuple2<byte[], byte[]>> operatorHashes =
chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
// 得到Node B的hash值
byte[] primaryHashBytes = hashes.get(currentNodeId);
/* legacyHashes的size 为1, 实际上下面的逻辑给chainedOperatorHashes 的key为 Node B中添加 NodeB的添加Hash
最后组织形式为:
Key Value
NodeB Hash List<
Tuple2<Node C hash, Node C hash1>
Tuple2<Node B hash, Node B hash1>
>
*/
for (Map<Integer, byte[]> legacyHash : legacyHashes) {
operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
}
/*
chainedNames添加Stream Node C的name
当前值为
<3, StreamNode C>
<2, StreamNode B -> StreamNode C>
*/
chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
//后面两个基本没有用到,就直接过去了,有兴趣的读者
chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
/*
此时config会执行下面的createJobVertex, 其主要逻辑为
生成JobVertex, 查看JobVertex的构造函数:
jobVertex = new JobVertex(
chainedNames.get(streamNodeId),
jobVertexId, //根据Node B的hash
legacyJobVertexIds, // List<Node B hash1>
chainedOperatorVertexIds, //值为List<Node B hash, Node C hash> 代表这个jobVertex是一个operator chain, 其chain 中有两个stream opearor B和C
userDefinedChainedOperatorVertexIds //值为List<Node B hash1, Node C hash1>
);
*/
StreamConfig config = currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
: new StreamConfig(new Configuration());
/*
设置StreamNode B的一些相关配置,如并行度、Checkpoint配置等,
逻辑比较简单,感兴趣的同学自行点进去看
*/
setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
//currentNodeId等于startNodeId, 执行当前代码块
if (currentNodeId.equals(startNodeId)) {
//知道Node B是chain 的开始
config.setChainStart();
config.setChainIndex(0);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
config.setOutEdgesInOrder(transitiveOutEdges); //transitiveOutEdges是空
/*
outEdges 为 edges E
*/
config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
for (StreamEdge edge : transitiveOutEdges) {
connect(startNodeId, edge);
}
/*
chainedConfigs.get(startNodeId) 的值在第三次调用已经计算过了
key value
2 3, Node C config
*/
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
} else {
chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());
config.setChainIndex(chainIndex);
StreamNode node = streamGraph.getStreamNode(currentNodeId);
config.setOperatorName(node.getOperatorName());
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}
config.setOperatorID(new OperatorID(primaryHashBytes));
// chainableOutputs.isEmpty()为false,
if (chainableOutputs.isEmpty()) {
config.setChainEnd();
}
// transitiveOutEdges 的值为空
return transitiveOutEdges;
} else {
return new ArrayList<>();
}
第三次调用createChain
private List<StreamEdge> createChain(
Integer startNodeId, //Node B
Integer currentNodeId, //Node C
Map<Integer, byte[]> hashes, //空
List<Map<Integer, byte[]>> legacyHashes, //size = 1
int chainIndex, //chainIndex = 1
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes //空
) {
if (!builtVertices.contains(startNodeId)) {
List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
//currentNodeId为Node C, outedges为空, 进而后面的chainableOutputs, nonChainableOutputs 都为空
for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
if (isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}
for (StreamEdge chainable : chainableOutputs) {
transitiveOutEdges.addAll(
createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
}
for (StreamEdge nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(nonChainable);
createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
}
//往chainedOperatorHashes 放入Node B
List<Tuple2<byte[], byte[]>> operatorHashes =
chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
// 得到Node C的hash值
byte[] primaryHashBytes = hashes.get(currentNodeId);
/* legacyHashes的size 为1, 实际上下面的逻辑给chainedOperatorHashes 的key为 Node B中添加 NodeC的添加Hash
其组织形式为:
Key Value
NodeB Hash List<Tuple2<Node C hash1, Node C hash2>>
*/
for (Map<Integer, byte[]> legacyHash : legacyHashes) {
operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
}
/*
chainedNames添加Stream Node C的name
当前值为
<3, StreamNode C>
*/
chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
//后面两个基本没有用到,就直接过去了,有兴趣的读者
chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
//此时config会执行下面的new StreamConfig
StreamConfig config = currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
: new StreamConfig(new Configuration());
/*
设置StreamNode C的一些相关配置,如并行度、Checkpoint配置等,
逻辑比较简单,感兴趣的同学自行点进去看
*/
setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
// currentNodeId 不等于startNodeId, 执行下面那面那个代码块
if (currentNodeId.equals(startNodeId)) {
config.setChainStart();
config.setChainIndex(0);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
config.setOutEdgesInOrder(transitiveOutEdges);
config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
for (StreamEdge edge : transitiveOutEdges) {
connect(startNodeId, edge);
}
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
} else {
/**
chainedConfigs存放的是operator chain 相关的配置。
chainIndex = 1
node = StreamNode C
则
chainedConfigs 最终内容为
key value
2 3, Node C config
*/
chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());
config.setChainIndex(chainIndex);
StreamNode node = streamGraph.getStreamNode(currentNodeId);
config.setOperatorName(node.getOperatorName());
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}
config.setOperatorID(new OperatorID(primaryHashBytes));
// chainableOutputs.isEmpty()为true,
if (chainableOutputs.isEmpty()) {
config.setChainEnd();
}
// transitiveOutEdges 的值为空
return transitiveOutEdges;
} else {
return new ArrayList<>();
}
核心逻辑为: 从当前StreamNode开始,一直遍历到结点不能与其串在一起(从代码逻辑上看,StreamNode与其本身是永远可以串在一起), 记录这些能串在一起的结点,递归翻译当前结点的输出后, 然后将保存下来可以串在一起的StreamNode生成一个JobVertex, 最后将JobVertex的输出设置成之前已经翻译的输出JobVertex。
可以发现JobGraph相对于StreamGraph的最主要区别是将一些StreamNode合并成一个JobVertex, 而JobVertex通过JobEdge(物理边)相连, 最大程度的优化了StreamGraph
最后生成的JobGraph主体架构如下
4. JobGraph 提交步骤
最后看一下提交Job的处理流程,最终会调用ClusterClient#submitJob()
, 这个是一个抽象方法,我以MiniClusterClient
为例, 会调用MiniClusterClient#requestJobResult
, 关于从Client提交到JobManager过程先略过,后面会专门针对这个流程进行详解, 最终在JobManager端会调用
JobManagerRunner#grantLeadership()
, 之后的调用链为verifyJobSchedulingStatusAndStartJobManager
-> startJobMaster()
--> JobMasterService#start()
--> JobMaster#startJobExecution()
--> JobMaster#resetAndStartScheduler()
--> JobMaster#startScheduling()
终于看到任务开始调度执行了,关于这一块具体逻辑,后面会针对性的说明