Flink 1.10 源码 -- Map
导读
- 1.map是什么
- 2.map的源码和调用过程
- 3.transition方法的源码
- 4.通过底层接口实现map功能
一.map是什么
引用flink官网的解释,接收一个元素并转换成一个新的元素,很容易就可以理解了,map就是接收元素,处理之后在输出一个元素,不过需要注意的是map是必须要返回值的;
二.map的源码和调用过程
map本质就是一个transition,也可以说flink的算子都是一个个的transition操作
我们在调用各种算子的时候,都可以认为是在对数据做转换的操作,现在看一下map方法,map中传入了一 个MapFunction 由我们用户自己去实现,flink进行调用
// 我们调用map的时候传入了一个mapFunction,是我们自己去实现的
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
// 获取到我们的输出类型 不重要
TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
Utils.getCallLocationName(), true);
// 在这里调用了重载方法 map()
return map(mapper, outType);
}
在重载的map方法中,我们可以看到调用了transform方法,这个方法相对毕竟底层了,我们等下分析这个方法,transform中最终要的一个参数就是StreamMap对象
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {
return transform("Map", outputType, new StreamMap<>(clean(mapper)));
}
我们进入看看这个StreamMap对象构造
可以看到该类继承了 AbstractUdfStreamOperator类,实现了OneInputStreamOperator接口
AbstractUdfStreamOperator 这个类表示具有用户自定义函数(Function)的一个基类,拥有open和close作用声明周期的方法
OneInputStreamOperator 这个通过名字我们大概也可以了解了,就是表示只有一个输入的operator
StreamMap构造中,将我们自定义的map函数通过父类构造传入,在processElement中,调用output对象进行map结果的收集并发送到下游,到这里map的调用链就结束了,具体一些其他方法可以根据自己的需要进行阅读查看;
public class StreamMap<IN, OUT>
extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> {
private static final long serialVersionUID = 1L;
public StreamMap(MapFunction<IN, OUT> mapper) {
super(mapper);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
// element.replace 方法会替换数据 保证了一次传递不回创建多个StreamRecord对象
output.collect(element.replace(userFunction.map(element.getValue())));
}
}
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
extends AbstractStreamOperator<OUT>
implements OutputTypeConfigurable<OUT> {
.......
/** The user function. */
protected final F userFunction;
public AbstractUdfStreamOperator(F userFunction) {
this.userFunction = requireNonNull(userFunction);
checkUdfCheckpointingPreconditions();
}
.......
}
在这里我们先分析一下transform方法
transform需要传入 算子的名字,输出的类型,和具体的算子
public <R> SingleOutputStreamOperator<R> transform(
String operatorName,
TypeInformation<R> outTypeInfo,
OneInputStreamOperator<T, R> operator) {
return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}
public <R> SingleOutputStreamOperator<R> transform(
String operatorName,
TypeInformation<R> outTypeInfo,
OneInputStreamOperator<T, R> operator) {
// TODO 这里将operator封装到SimpleOperator工厂中
return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}
public <R> SingleOutputStreamOperator<R> transform(
String operatorName,
TypeInformation<R> outTypeInfo,
OneInputStreamOperatorFactory<T, R> operatorFactory) {
// 具体干活了
return doTransform(operatorName, outTypeInfo, operatorFactory);
}
继续追踪doTransform方法,在这方法中的实现步骤
1.检查上游的输出类型能否识别,否则抛出一个异常
2.将传入的operator封装成一个OneInputTransformation(PhysicalTransformation实现类,有多种,后面会有讲解)
3.将当前transformation封装成一个新的SingleOutputStreamOperator(DataStream的子类),在进行调用算子的时候,传入的transformation就变成上游输出了(这里需要理解一下)
4.将转换后的transformation添加到env中的一个transformations中,用于后面生成StreamGraph
到这里一个transformation的转换就完成了,就等待我们调用env.execute()方法,进行程序的执行过程
protected <R> SingleOutputStreamOperator<R> doTransform(
String operatorName,
TypeInformation<R> outTypeInfo,
StreamOperatorFactory<R> operatorFactory) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
// 这里会检查上游算子的输出类型是否是MissingTypeInfo类型,如果是则会抛出一个InvalidTypesException异常,这种情况下 我们需要调用.return方法指定返回类型
transformation.getOutputType();
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operatorFactory,
outTypeInfo,
environment.getParallelism());
@SuppressWarnings({"unchecked", "rawtypes"})
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
// 在 StreamExecutionEnvironment中
public void addOperator(Transformation<?> transformation) {
Preconditions.checkNotNull(transformation, "transformation must not be null.");
this.transformations.add(transformation);
}
四.通过底层接口实现map功能
我这里继承的是 AbstractStreamOperator类,因为我这里是直接在方法中实现逻辑了,不需要传入一个function就没有继承AbstractUdfStreamOperator类,
我主要是对输入的数据拼接了一个字符串进行返回, 也可以根据自己的需求开发更高级的功能,这种属于底层的方法,在使用的时候需要谨慎,如果操作不当可能会造成极大的资源浪费
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
DataStreamSource<String> source = env.fromElements("123 123 123 ", "efd 阿迪斯", "hi");
CustomStreamMap_Operator<Integer, Integer> customStreamMap = new CustomStreamMap_Operator<>();
source
// 自定义map的实现
.transform("MyMap", BasicTypeInfo.INT_TYPE_INFO, customStreamMap)
.print();
env.execute();
}
public static class CustomStreamMap_Operator<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> {
/**
* 具体的实现方法
*/
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
output.collect((StreamRecord<OUT>) element.replace("custom :" + element.getValue()));
}
}