名词解释
StreamGraph
一个代码用户编码结构的拓扑结构(不是很准确,因为很多用户编码的算子没有生成对应的StreamNode,like: shuffle, split等),所以【一个还未经过优化处理的逻辑计划】这样描述会更加准确。由Client端生成。(问题:StreamGraph → JobGraph这一步的转换,优化了什么?)
相关API
DataStream
A DataStream represents a stream of elements of the same type. A DataStream can be transformed into another DataStream by applying a transformation.
顾名思义,data组成的stream;DataStream中持有Transformation<T> transformation,经过该transformation,得到此DataStream(注意不是通过该transformation生成一个新的DataStream),意味着DataStream中持有的Transformation是该DataStream的来源;
Transformation
A Transformation represents the operation that creates a DataStream
(讲述起来比较复杂)可以简单理解成,代码中每个引用的算子,都是一个transformation;比如flatMap,split,print都有对应的Transformation实现类,具体实现如下:
对Transformation的具体实现类都在org.apache.flink.streaming.api.transformations中
Operator
Transformation中持有StreamOperator的实现,StreamOperator封装具体的Function来对DataStream中的record进程处理;以StreamMap实现类为例,StreamMap继承自StreamOperator接口;
以一个例子(也是贯穿全文的例子)归纳以上三个类:
public class FlinkBatchDemo {
private static final Logger logger = LoggerFactory.getLogger(FlinkBatchDemo.class);
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.fromElements(WordCountData.WORDS);
DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(0).sum(1);
// emit result
counts.print();
// System.out.println(env.getStreamGraph().getStreamingPlanAsJSON());
// execute program
env.execute("Streaming WordCount");
}
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}
示例中text.flatMap(new Tokenizer())这一行,对inputStream调用flatMap算子,相当于对inputStream这个DataStream进行transform,transform的具体实现类为:OneInputTransformation(生成的该OneInputTransformation的input为inputStream中的Transformation),而OneInputTransformation持有StreamFlatMap,具体的处理逻辑在StreamFlatMap中 ,通过调用userFunction的具体实现来实现,如下图所示:
(该图有误,需要重画)
StreamGraph
StreamGraph可视化
通过env.getStreamGraph().getStreamingPlanAsJSON()方法可以的到StreamGraph的Json:
{
"nodes" : [ {
"id" : 1,
"type" : "Source: Collection Source",
"pact" : "Data Source",
"contents" : "Source: Collection Source",
"parallelism" : 1
}, {
"id" : 2,
"type" : "Flat Map",
"pact" : "Operator",
"contents" : "Flat Map",
"parallelism" : 1,
"predecessors" : [ {
"id" : 1,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 4,
"type" : "Keyed Aggregation",
"pact" : "Operator",
"contents" : "Keyed Aggregation",
"parallelism" : 1,
"predecessors" : [ {
"id" : 2,
"ship_strategy" : "HASH",
"side" : "second"
} ]
}, {
"id" : 5,
"type" : "Sink: Print to Std. Out",
"pact" : "Data Sink",
"contents" : "Sink: Print to Std. Out",
"parallelism" : 1,
"predecessors" : [ {
"id" : 4,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
} ]
}
通过flink提供的可视化界面:https://flink.apache.org/visualizer/得到如下所示图形:
从图形中可以看到,缺少ID=3的节点(StreamNode),对应算子keyBy(0)未生成对应的StreamNode;因此可以看出,不是所有的算子都会产生对应的StreamNode;
源码阅读
从StreamExecutionEnvironment类入手来看源码是如何生成StreamGraph的,带着问题去看:
什么样的算子会生成StreamNode,什么样的算子会被忽略,被忽略的算子中的function是怎样处理的?
图中箭头中的FORWARD、HASH代表什么?
追踪代码,StreamGraph是在env.execute()之后触发生成的
-> execute(getStreamGraph(jobName));
-> return getStreamGraph(jobName, true);
-> StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
-> return new StreamGraphGenerator(transformations, config, checkpointCfg)
生成StreamGraphGenerator需要transformations参数,追踪transformations赋值的过程:
-> env.fromElements
-> return addSource(function, "Collection Source", typeInfo).setParallelism(1);
-> return new DataStreamSource<>(this, resolvedTypeInfo, sourceOperator, isParallel, sourceName);
-> DataStream.flatMap
-> return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
-> getExecutionEnvironment().addOperator(resultTransform); // resultTransform是flatmap对应的Transformation,其inputTransformation也就是上游 transformation对应LegacySourceTransformation
-> this.transformations.add(transformation);
发现将flatmap对应的transformation也就是OneInputTransformation加入到变量transformations中;
继续追踪demo中的算子,最终确定transformations中存在Flat Map → OneInputTransformation, Keyed Aggregation → OneInputTransformation(上游transformation是keyBy算子对应的PartitionTransformation), Print to Std. Out → SinkTransformation(上游transformation是OneInputTransformation);
共有三个Transformation;
StreamGraphGenerator是通过遍历Transformation中的每一个成员,最终形成一个DAG图:
for (Transformation<?> transformation: transformations) {
transform(transformation);
}
...
不同的Transform类型,对应不同的方法;
需要注意的是(问题1):
对于Keyed Aggregation这个OneInputTransformation,上游的PartitionTransformation不会生成对应的StreamNode,而是生成一个VirtualPartitionNode:
private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {
Transformation<T> input = partition.getInput();
List resultIds = new ArrayList<>();
Collection<Integer> transformedIds = transform(input);
for (Integer transformedId: transformedIds) {
int virtualId = Transformation.getNewNodeId();
streamGraph.addVirtualPartitionNode(
transformedId, virtualId, partition.getPartitioner(), partition.getShuffleMode());
resultIds.add(virtualId);
}
return resultIds;
}
// 对于VirtualPartitionNode,在addEdge中,如果上游是该Node,会继续向上遍历直到获取到非虚拟节点
} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
if (partitioner == null) {
partitioner = virtualPartitionNodes.get(virtualId).f1;
}
shuffleMode = virtualPartitionNodes.get(virtualId).f2;
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
}
所以对于keyby算子(对应PartitionTransformation)不会生成对应的StreamNode与StreamEdge,生成的StreamGraph会越过这个算子直接将两个非虚拟的Node进行连接;具体执行的function在生成StreamGraph的代码中没有体现。
对于问题2:
生成两个Node之间的Edge时,会对上下游的partition数目进行判断,上下游数目一致时,选择ForwardPartitioner:
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) {
partitioner = new RebalancePartitioner<Object>();
}
public class ForwardPartitioner extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
return 0;
}
public StreamPartitioner<T> copy() {
return this;
}
@Override
public String toString() {
return "FORWARD";
}
}
而KeyedStream中,用到的是KeyGroupStreamPartitioner
```public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, TypeInformation<KEY> keyType) {
this(
dataStream,
new PartitionTransformation<>(
dataStream.getTransformation(),
// 初始化KeyedStream时,声明KeyGroupStreamPartitioner
new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),
keySelector,
keyType);
}
public class KeyGroupStreamPartitioner extends StreamPartitioner implements ConfigurableStreamPartitioner {
private static final long serialVersionUID = 1L;
private final KeySelector<T, K> keySelector;
private int maxParallelism;
public KeyGroupStreamPartitioner(KeySelector keySelector, int maxParallelism) {
Preconditions.checkArgument(maxParallelism > 0, "Number of key-groups must be > 0!");
this.keySelector = Preconditions.checkNotNull(keySelector);
this.maxParallelism = maxParallelism;
}
public int getMaxParallelism() {
return maxParallelism;
}
@Override
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
K key;
try {
key = keySelector.getKey(record.getInstance().getValue());
} catch (Exception e) {
throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);
}
return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels);
}
@Override
public StreamPartitioner<T> copy() {
return this;
}
@Override
public String toString() {
return "HASH";
}
@Override
public void configure(int maxParallelism) {
KeyGroupRangeAssignment.checkParallelismPreconditions(maxParallelism);
this.maxParallelism = maxParallelism;
}
}```swift