1. 什么时候生成StreamGraph
给出如下的flink的总体架构图,有个总体的认识,我们可以清楚的看到,在用户给出StreamApi之后,就会转化成StreamGraph,而在它的下面,它会转化成JobGraph。在后续的文章中,会逐层进行分析。
2. 由简单的demo分析这个过程
public class FlinkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.socketTextStream("localhost", 9900);
// 对一个键型流(keyed stream) 使用过程函数
text.flatMap(new LineSplitter()).shuffle().filter(new HelloFilter()).print();
env.execute("FlinkTest");
}
public static class LineSplitter implements FlatMapFunction<String, String> {
public void flatMap(String value, Collector<String> out) throws Exception {
String[] values = value.split(" ");
for (String s : values) {
out.collect(s);
}
}
}
public static class HelloFilter implements FilterFunction<String> {
public boolean filter(String value) throws Exception {
if (value.equals("hello")) {
return false;
}
return true;
}
}
}
上面是个很简单的入门例子,主要是实现把输入的字符串按空格分隔成一个一个单个的字符串。并且过滤掉“hello"这个字符串。在后面的分析中,会以这个作为例子
3. 源码分析
env.execute("FlinkTest");
是生成StreamGraph
的入口。
public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {
return new StreamGraphGenerator(env).generateInternal(transformations);
}
-
List<StreamTransformation<?>> transformations
就是定义的一系列StreamTransformation
。常见的StreamTransformation
的类图结构如下。StreamTransformation
就是产生DataStream的一些操作。
从上图我们看出有10种
StreamTransformation
。DataStream 上常见的 transformation 有 map、flatmap、filter等。这些transformation会构造出一棵 StreamTransformation 树,通过这棵树转换成 StreamGraph。以生成SingleOutputStreamOperator
为例。
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
getType(), Utils.getCallLocationName(), true);
//包装成StreamFlatMap
return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));
}
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operator,
outTypeInfo,
environment.getParallelism());
//包装成SingleOutputStreamOperator
@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
上面的这个过程,可以抽象成下面一个图
首先把MapFunction包装成
StreamFlatMap
,然后包装成OneInputTransformation
。另外,并不是每一个 StreamTransformation 都会转换成 runtime 层中物理操作。有一些只是逻辑概念,比如 union、split/select、partition等。如下图所示的转换树,在运行时会优化成下方的操作图。
通过源码也可以发现,
UnionTransformation
,SplitTransformation
,SelectTransformation
,PartitionTransformation
由于不包含具体的操作所以都没有StreamOperator成员变量,而其他StreamTransformation的子类基本上都有。StreamOperator
DataStream 上的每一个 Transformation 都对应了一个 StreamOperator,StreamOperator是运行时的具体实现,会决定UDF(User-Defined Funtion)的调用方式。下图所示为 StreamOperator 的类图(点击查看大图):
可以发现,所有实现类都继承了
AbstractStreamOperator
。另外除了 project 操作,其他所有可以执行UDF代码的实现类都继承自AbstractUdfStreamOperator
,该类是封装了UDF的StreamOperator
。UDF就是实现了Function
接口的类,如MapFunction
,FilterFunction
。
- 进一步深入到如果生成图
上一步分析到这里
private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
for (StreamTransformation<?> transformation: transformations) {
transform(transformation);
}
return streamGraph;
}
- transform
private Collection<Integer> transform(StreamTransformation<?> transform) {
//判断是否已经转化,每次转化完,都会把StreamTransformation加入进来,同时做递归的出口,然后递归的时候,很多时候从这里作为出口
if (alreadyTransformed.containsKey(transform)) {
//以前没有转化,则转化
return alreadyTransformed.get(transform);
}
LOG.debug("Transforming " + transform);
设置StreamTransformation的MaxParallelism
if (transform.getMaxParallelism() <= 0) {
// if the max parallelism hasn't been set, then first use the job wide max parallelism
// from theExecutionConfig.
int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();
if (globalMaxParallelismFromConfig > 0) {
transform.setMaxParallelism(globalMaxParallelismFromConfig);
}
}
// call at least once to trigger exceptions about MissingTypeInfo
transform.getOutputType();
Collection<Integer> transformedIds;
//判断是哪种StreamTransformation,进行响应类型的转化
if (transform instanceof OneInputTransformation<?, ?>) {
transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
} else if (transform instanceof SourceTransformation<?>) {
transformedIds = transformSource((SourceTransformation<?>) transform);
} else if (transform instanceof SinkTransformation<?>) {
transformedIds = transformSink((SinkTransformation<?>) transform);
} else if (transform instanceof UnionTransformation<?>) {
transformedIds = transformUnion((UnionTransformation<?>) transform);
} else if (transform instanceof SplitTransformation<?>) {
transformedIds = transformSplit((SplitTransformation<?>) transform);
} else if (transform instanceof SelectTransformation<?>) {
transformedIds = transformSelect((SelectTransformation<?>) transform);
} else if (transform instanceof FeedbackTransformation<?>) {
transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
} else if (transform instanceof CoFeedbackTransformation<?>) {
transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
} else if (transform instanceof PartitionTransformation<?>) {
transformedIds = transformPartition((PartitionTransformation<?>) transform);
} else if (transform instanceof SideOutputTransformation<?>) {
transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
} else {
throw new IllegalStateException("Unknown transformation: " + transform);
}
// need this check because the iterate transformation adds itself before
// transforming the feedback edges
if (!alreadyTransformed.containsKey(transform)) {
alreadyTransformed.put(transform, transformedIds);
}
if (transform.getBufferTimeout() > 0) {
streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
}
if (transform.getUid() != null) {
streamGraph.setTransformationUID(transform.getId(), transform.getUid());
}
if (transform.getUserProvidedNodeHash() != null) {
streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
}
if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
}
return transformedIds;
}
- 以
transformOneInputTransform
为例
private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
//存储这个OneInputTransformation的上游Transformation的id,方便构造边,在这里递归,确保所有的上游Transformation都已经转化
Collection<Integer> inputIds = transform(transform.getInput());
// the recursive call might have already transformed this
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
//加入StreamNode
streamGraph.addOperator(transform.getId(),
slotSharingGroup,
transform.getOperator(),
transform.getInputType(),
transform.getOutputType(),
transform.getName());
if (transform.getStateKeySelector() != null) {
TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
}
streamGraph.setParallelism(transform.getId(), transform.getParallelism());
streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
//构造边
for (Integer inputId: inputIds) {
streamGraph.addEdge(inputId, transform.getId(), 0);
}
return Collections.singleton(transform.getId());
}
结合我们这个例子,对以上内容分析如下。
把UDF转化成Transformation,可以生成3个Transformatoin如下.
OneInputTransformation{id=2, name='Flat Map', outputType=String, parallelism=4}
OneInputTransformation{id=4, name='Filter', outputType=String, parallelism=4}
SinkTransformation{id=5, name='Unnamed', outputType=GenericType<java.lang.Object>, parallelism=4}
由于是树之间父子关系,我们从底层的SinkTransformation
看,如下图。
最后得到的StreamGraph如下
在这过程中生成一个虚拟结点。
上面这个图对应步骤如下:
首先处理的Source,生成了Source的StreamNode。
然后处理的FlatMap,生成了FlatMap的StreamNode,并生成StreamEdge连接上游Source和FlatMap。由于上下游的并发度不一样(1:4),所以此处是Rebalance分区。
然后处理的Shuffle,由于是逻辑转换,并不会生成实际的节点。将partitioner信息暂存在virtuaPartitionNodes中。
在处理Filter时,生成了Filter的StreamNode。发现上游是shuffle,找到shuffle的上游FlatMap,创建StreamEdge与Filter相连。并把ShufflePartitioner的信息写到StreamEdge中。
最后处理Sink,创建Sink的StreamNode,并生成StreamEdge与上游Filter相连。由于上下游并发度一样(4:4),所以此处选择 Forward 分区。
参考文章:
Flink 原理与实现:如何生成 StreamGraph (本文很多图片参考这篇文章,在此声明)