在flink中我们经常会用到ReduceFunction来合并两个参数生成一个新的值,这个新的值同时也可以再下一次reduce操作中跟新的参数的再次进行合并操作。但是在reduce方法中,我们并不能看出究竟谁是上一次reduce合并后的结果值,有时候我们面临的场景恰恰需要判断一下哪个参数是上一批合并的结果,哪个参数是新传入的参数。带着这个疑问我们来看一下ReduceFunction的源代码。
首先看到ReduceFunction这里的注释,这里并没有说明value1和value2究竟哪个参数是是上一次产生的
/**
* The core method of ReduceFunction, combining two values into one value of the same type.
* The reduce function is consecutively applied to all values of a group until only a single value remains.
*
* @param value1 The first value to combine.
* @param value2 The second value to combine.
* @return The combined value of both input values.
*
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
T reduce(T value1, T value2) throws Exception;
在flink运行过程中,ReduceFunction的底层是通过StreamGroupedReduce这个类来运行,我们看一下这个类的源码:
@Override
public void open() throws Exception {
super.open();
ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, serializer);
values = getPartitionedState(stateId);
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
IN value = element.getValue();
IN currentValue = values.value();
if (currentValue != null) {
IN reduced = userFunction.reduce(currentValue, value);
values.update(reduced);
output.collect(element.replace(reduced));
} else {
values.update(value);
output.collect(element.replace(value));
}
}
调用我们自己定义的ReduceFunction,代码的逻辑也很清楚,如果之前没有执行过reduce操作,更新当前valueState,同时将该参数发送出去,如果之前有做过reduce操作也就是valueState有值,则第一个参数currentValue是从valueState里面取出的值,这就说明,第一个参数是上一次reduce产生的。