通常,我们写一段flink stream api代码类型如下:
DataStreamSource<String> kafkaSource =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
//.setParallelism(3);
SingleOutputStreamOperator<CanalMessage> data =
kafkaSource.flatMap(new ProcessDdlFunction(dorisMap))
.uid("ddl");
SingleOutputStreamOperator<Map<String, String>> convertMap =
data.map(new ProcessBinlogFunction())
.uid("binlog");
...
env.execute(topics);
熟悉flink的人都知道,最后这行代码env.execute(topics);
实际上是把我们的代码提交到环境上执行,可以是local/yarn/k8s,它是触发任务执行的开始,那么代码里调用了map ,flatmap这些函数是做什么的呢,让我们一起分析,举例我们点击这个map方法,发现是调用了DataStream这个类里的map方法:
public <R> SingleOutputStreamOperator<R> map(
MapFunction<T, R> mapper, TypeInformation<R> outputType) {
return transform("Map", outputType, new StreamMap<>(clean(mapper)));
}
后面最终调用了如下这个方法:这里有一行核心的代码getExecutionEnvironment().addOperator(resultTransform);
,一看就知道是把我们写的代码逻辑,其实就是transformation算子放在一个存储所有transfromations的集合中
protected <R> SingleOutputStreamOperator<R> doTransform(
String operatorName,
TypeInformation<R> outTypeInfo,
StreamOperatorFactory<R> operatorFactory) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
OneInputTransformation<T, R> resultTransform =
new OneInputTransformation<>(
this.transformation,
operatorName,
operatorFactory,
outTypeInfo,
environment.getParallelism());
@SuppressWarnings({"unchecked", "rawtypes"})
SingleOutputStreamOperator<R> returnStream =
new SingleOutputStreamOperator(environment, resultTransform);
//把代码逻辑算子放入里面
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
同样,看上面的实例,我们把flatmap的算子也放在集合里,那么重点来了,就是提交任务env.execute(topics)
下面的transformations 就是StreamExecutionEnvironment里的属性变量,也就是上面我们讲的存储transfromation集合
public JobExecutionResult execute(String jobName) throws Exception {
final List<Transformation<?>> originalTransformations = new ArrayList<>(transformations);
StreamGraph streamGraph = getStreamGraph();
if (jobName != null) {
streamGraph.setJobName(jobName);
}
try {
return execute(streamGraph);
} catch (Throwable t) {
Optional<ClusterDatasetCorruptedException> clusterDatasetCorruptedException =
ExceptionUtils.findThrowable(t, ClusterDatasetCorruptedException.class);
if (!clusterDatasetCorruptedException.isPresent()) {
throw t;
}
// Retry without cache if it is caused by corrupted cluster dataset.
invalidateCacheTransformations(originalTransformations);
streamGraph = getStreamGraph(originalTransformations);
return execute(streamGraph);
}
}