一、回到起点
一切都从最初的起点开始,我们来看官网给出的Flink Job程序的框架:
public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/*
* Here, you can start creating your execution plan for Flink.
*
* Start with getting some data from the environment, like
* env.readTextFile(textPath);
*
* then, transform the resulting DataStream<String> using operations
* like
* .filter()
* .flatMap()
* .join()
* .coGroup()
*
* and many more.
* Have a look at the programming guide for the Java API:
*
* http://flink.apache.org/docs/latest/apis/streaming/index.html
*
*/
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
}
首先我们要创建StreamExecutionEnvironment,这是我们的流计算执行环境(上下文)包含并行度,state,time相关配置。包含StreamTransformation,当然也包含创建DataStreamSource的方法addSource(也是 fromCollection/readTextFile方法中会调用的base方法)。
public abstract class StreamExecutionEnvironment {
//其他略,主要看一下transformations和addSource方法
protected final List<StreamTransformation<?>> transformations = new ArrayList<>();
//重点关注返回DataStreamSource传入了this参数,把自己也传递进去,这个this(StreamExecutionEnvironment 上下文)会在后面DataStream不断进行变化的时候,被回调其addOperator方法,不断的维护transformations 列表
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
//其他略
return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
}
public void addOperator(StreamTransformation<?> transformation) {
Preconditions.checkNotNull(transformation, "transformation must not be null.");
this.transformations.add(transformation);
}
}
这里看到addSource 传入的第一个参数是 SourceFunction类实例。SourceFunction类是Flink所有流数据source的基本接口,定义了source需要实现的run(SourceContext<T> ctx)方法,并通过SourceContext来进行发射数据,当调用cancel的方法时候需要可以break run方法中的loop。而SourceContext接口,使用 collect(T element)来进行数据的发射。
二、DataStreamSource类
整个拓扑,是从env. addSource(或者fromCollection/readTextFile方法,底层也会调用addSource)开始的,DataStreamSource代表着DataStream的起始点。
DataStreamSource是DataStream的子类。
DataStream<--SingleOutputStreamOperator<--DataStreamSource
看看DataStreamSource父类DataStream类最主要的两个变量:
public class DataStream<T> {
protected final StreamExecutionEnvironment environment;
protected final StreamTransformation<T> transformation;
StreamExecutionEnvironment 代表执行环境
StreamTransformation 代表产生DataStream的转换操作
同样子类DataStreamSource 也需要这两个变量,看一下其构造函数:
public class DataStreamSource<T> extends SingleOutputStreamOperator<T> {
public DataStreamSource(StreamExecutionEnvironment environment,
TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator,
boolean isParallel, String sourceName) {
super(environment, new SourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));
//略
}
第二个参数传输入的是SourceTransformation类,是StreamTransformation的子类,通过构造函数可以看出来比父类StreamTransformation多了一个变量StreamSource operator 。
public class SourceTransformation<T> extends StreamTransformation<T> {
private final StreamSource<T, ?> operator;
public SourceTransformation(
String name,
StreamSource<T, ?> operator,
TypeInformation<T> outputType,
int parallelism) {
super(name, outputType, parallelism);
this.operator = operator;
}
我们继续看一下这个多出来的StreamSource类的operator。稍微复杂一些:
* @param <OUT> Type of the output elements
* @param <SRC> Type of the source function of this stream source operator
public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {
继承了AbstractUdfStreamOperator 并实现了StreamOperator接口,而AbstractUdfStreamOperator最终通过继承 AbstractStreamOperator,也实现了StreamOperator接口
StreamOperator<--AbstractStreamOperator<--AbstractUdfStreamOperator
还是先看最Base的StreamOperator接口吧,主要定义了lifecycle,包括:setup、open、close等。通常通过实现OneInputStreamOperator或者TwoInputStreamOperator接口来创建operators,但是呢!StreamSource不一般啊,因为是Source,所以直接实现了StreamOperator,虽然感觉好像没有必要啊,因为已经继承了间接实现了StreamOperator的AbstractUdfStreamOperator类。
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Disposable, Serializable {
AbstractStreamOperator类,提供了默认的lifecycle方法。
// Base class for all stream operators. Operators that contain a user function should extend the class AbstractUdfStreamOperator instead (which is a specialized subclass of this class).
public abstract class AbstractStreamOperator<OUT>
implements StreamOperator<OUT>, Serializable {
AbstractUdfStreamOperator多包含了一个用户定义function,会在open、close等方法,额外在调用用户自定义function里面对应的方法,丰富需求实现。
/**
* This is used as the base class for operators that have a user-defined
* function. This class handles the opening and closing of the user-defined functions,
* as part of the operator life cycle.
*
* @param <F>
* The type of the user function
*/
@PublicEvolving
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
extends AbstractStreamOperator<OUT>
implements OutputTypeConfigurable<OUT> {
/** The user function. */
protected final F userFunction;
@Override
public void open() throws Exception {
super.open();
FunctionUtils.openFunction(userFunction, new Configuration());
}
@Override
public void close() throws Exception {
super.close();
functionsClosed = true;
FunctionUtils.closeFunction(userFunction);
嗯,上面几个StreamSource类相关的父类祖宗类接口介绍完了。看看StreamSource类本身,回忆一下,研究StreamSource类是因为在创建DataStreamSource的时候传入的第二个参数SourceTransformation类包含一个StreamSource类的成员operator。
public class SourceTransformation<T> extends StreamTransformation<T> {
private final StreamSource<T, ?> operator;
花开两朵各表一枝啊,回过头来看DataStreamSource类,本身还继承了SingleOutputStreamOperator。
public class DataStreamSource<T> extends SingleOutputStreamOperator<T> {
public DataStreamSource(StreamExecutionEnvironment environment,
TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator,
boolean isParallel, String sourceName) {
//调用了父类SingleOutputStreamOperator构造函数
super(environment, new SourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));
//略
}
会调用 SingleOutputStreamOperator的构造函数,继续调用DataStream的构造函数。
public class SingleOutputStreamOperator<T> extends DataStream<T> {
protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) {
super(environment, transformation);
}
由此 env. addSource 返回 DataStreamSource类的过程分析完毕,此时我们已经有了一个起点,就是DataStream类的子类DataStreamSource,包含了当前的上下文StreamExecutionEnvironment和变换StreamTransformation(SourceTransformation类)两个成员。在env. addSource 的时候给DataStreamSource类的StreamTransformation子类变量SourceTransformation类传入了一个StreamSource 类成员(是StreamOperator 类的实现& AbstractUdfStreamOperator子类)这个StreamSource类成员,包含传入的SourceFunction类的udfFunction来读取Source的源数据的方法(后面再来看运行的时候如何具体处理的。)
至此,我们已经从env. addSource ,分析了DataStream类和他的子类DataStreamSource(对应数据源)。
接下来,我们要跟随DataStream类,看他的Transformation操作方法,探寻DataStream类和Transformation之间的关系。
三、定义在DataStream类上的Transformation方法:
回忆一下,DataStream类,代表相同类型元素的流,可以通过转换(transformation)来实现转换为另一个DataStream,transformation如map,filter等。
DataStream类最主要的两个变量:
protected final StreamExecutionEnvironment environment;
protected final StreamTransformation<T> transformation;
StreamExecutionEnvironment 代表执行环境
StreamTransformation 代表产生自己的DataStream的转换操作,比如上面我们分析的DataStreamSource类 对应的就是SourceTransformation类。
DataStream类包含的方法如map:
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
Utils.getCallLocationName(), true);
return transform("Map", outType, new StreamMap<>(clean(mapper)));
}
可以看到map方法调用的是transform方法,返回的是DataStream 的子类SingleOutputStreamOperator类,代表有一个输出类型的DataStream。
public class SingleOutputStreamOperator<T> extends DataStream<T> {
接下来,我们看一下transform方法,本质上还是根据转换,生成新的DataStream(map对应返回的是子类SingleOutputStreamOperator),我们已经知道DataStream要传入两个参数1 ExecutionEnvironment本身,2转换类本身。(StreamTransformation的子类)
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
transformation.getOutputType();
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operator,
outTypeInfo,
environment.getParallelism());
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
代码里面首先创建转换对象,根据操作,返回了一个OneInputTransformation类对象。这里有一个重点需要关注的就是,不同于兄弟类SourceTransformation,OneInputTransformation类对象有一个成员变量:input,是StreamTransformation类,代表这个转换的输入转换(可以理解为前一个转换操作)。调用transform方法的时候,传入的是this.transformation。第二个成员变量operator是StreamOperator类的子类,OneInputStreamOperator类。
public class OneInputTransformation<IN, OUT> extends StreamTransformation<OUT> {
private final StreamTransformation<IN> input;
private final OneInputStreamOperator<IN, OUT> operator;
可以看到 map方法,输入transform的 operator是 StreamMap类。 StreamMap类是AbstractUdfStreamOperator子类,实现了OneInputStreamOperator接口。
/**
* A {@link StreamOperator} for executing {@link MapFunction MapFunctions}.
*/
@Internal
public class StreamMap<IN, OUT>
extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> {
private static final long serialVersionUID = 1L;
public StreamMap(MapFunction<IN, OUT> mapper) {
super(mapper);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
output.collect(element.replace(userFunction.map(element.getValue())));
}
}
至此,我们也可以获得如下认识:一个StreamTransformation类会包含一个名叫operator 的StreamOperator类的成员变量,代表对应算子,同时,通常会包含一个名叫input的StreamTransformation类,记录输入(上一个)转换,这样实际上就是形成了一个Transformation List链表,把所有的Transformation都串成串!!特别在运行时,创建运行Dataflow时候很有用。也依靠transform方法返回前的一句操作,将当前生成的新的StreamTransformation传入ExecutionEnvironment的transformations成员变量,能够让ExecutionEnvironment 也能记录所有的Transformation,真正拥有了Transformation List!!StreamExecutionEnvironment真正可以把控全局。
transform 方法中:
getExecutionEnvironment().addOperator(resultTransform);
public abstract class StreamExecutionEnvironment {
protected final List<StreamTransformation<?>> transformations = new ArrayList<>();
public void addOperator(StreamTransformation<?> transformation) {
Preconditions.checkNotNull(transformation, "transformation must not be null.");
this.transformations.add(transformation);
}
DataStream类中其他转换方法类似,如filter
public SingleOutputStreamOperator<T> filter(FilterFunction<T> filter) {
return transform("Filter", getType(), new StreamFilter<>(clean(filter)));
}
再如flatMap
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
getType(), Utils.getCallLocationName(), true);
return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));
}
都是调用transform方法,只不过传入的名称和算子对象不一样,对应的自定义userFunction就不一样,调用processElement时候使用userFunction方式不同。对应处理数据方式和结果也就不同,不同转换(算子)才会有不同的功能。
至此我们从env. addSource开始(返回第一个DataStream类子类DataStreamSource类),通过不断调用DataStream类的转换方法(map,filter等)来得到新的DataStream类。
这个过程中宏观上,我们可以看到StreamExecutionEnvironment 对象env,不断记录DataTransformation对象,维护整个transformation list。
微观上,每个DataStream类在做转换的时候,会首先创建转换对应的DataTransformation对象(被传入上一个DataTransformation对象,也就是创建这个新DataTransformation对象的DataStream被创建对应的DataTransformation对象,因此在DataTransformation层面,整体形成了一个DataTransformation链表,每一个节点对象维护自己上一个节点)。之后返回一个新的DataStream类对象,拥有当前DataStream的StreamExecutionEnvironment成员,和创建新的DataStream类对象的DataTransformation对象。
四、StreamTransformation类
StreamTransformation代表创建DataStream类的操作。DataStream类内部是包含一个创建他自己的StreamTransformation。
//构造函数,重点包括了id(唯一的)和outputType转换的输出类型
public StreamTransformation(String name, TypeInformation<T> outputType, int parallelism) {
this.id = getNewNodeId();
this.name = Preconditions.checkNotNull(name);
this.outputType = outputType;
this.parallelism = parallelism;
this.slotSharingGroup = null;
}
//通过static变量和方法,保证了每个StreamTransformation的唯一ID
protected static Integer idCounter = 0;
public static int getNewNodeId() {
idCounter++;
return idCounter;
}
其他
如果是本地IDE运行的话StreamExecutionEnvironment.getExecutionEnvironment()返回一个LocalStreamEnvironment子类,execute方法开始执行整个流计算的拓扑,在一个mini cluster上。
/**
* Executes the JobGraph of the on a mini cluster of CLusterUtil with a user
* @return The result of the job execution, containing elapsed time and accumulators.
*/
@Override
public JobExecutionResult execute(String jobName) throws Exception {
/* 从StreamTransformations构建拓扑
*FlinkPlan 接口 <-- StreamingPlan 抽象类 <-- StreamGraph
*/
StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
// JobGraph ,代表 JobManager接受的 Flink dataflow,是在底层的
JobGraph jobGraph = streamGraph.getJobGraph();
jobGraph.setAllowQueuedScheduling(true);
Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());
configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
// add (and override) the settings with what the user defined
configuration.addAll(this.configuration);
if (!configuration.contains(RestOptions.PORT)) {
configuration.setInteger(RestOptions.PORT, 0);
}
int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
.build();
if (LOG.isInfoEnabled()) {
LOG.info("Running job on local embedded Flink mini cluster");
}
// 创建 执行Flink Job的MiniCluster
MiniCluster miniCluster = new MiniCluster(cfg);
try {
miniCluster.start();
configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());
return miniCluster.executeJobBlocking(jobGraph);
}
finally {
transformations.clear();
miniCluster.close();
}
}
Flink 运行时,会把流拓扑(Source、Transformation、Sink组成)转换为DataFlow(由Stream和 Operator算子组成)