import bean.WaterSensor;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
public class Reduce {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
ArrayList<WaterSensor> arr = new ArrayList<>();
arr.add(new WaterSensor("sensor_1", 1607527992000L, 20));
arr.add(new WaterSensor("sensor_2", 1607527993000L, 10));
arr.add(new WaterSensor("sensor_1", 1607527994000L, 60));
arr.add(new WaterSensor("sensor_2", 1607527996000L, 60));
arr.add(new WaterSensor("sensor_2", 1607527995000L, 90));
env.fromCollection(arr)
.keyBy(WaterSensor::getId)
.reduce(new RichReduceFunction<WaterSensor>() {
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("open方法~~~");
}
public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
System.out.println("~~~~ ");
return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc() + value2.getVc());
}
}).print();
env.execute();
}
}
open方法只和并行度有关,并行度为几,则open执行几次:
图片.png
修改代码,把并行度设为3,在reduce方法中输出value1,value2的值:
import bean.WaterSensor;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
public class Reduce {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(3);
ArrayList<WaterSensor> arr = new ArrayList<>();
arr.add(new WaterSensor("sensor_1", 1607527992000L, 20));
arr.add(new WaterSensor("sensor_2", 1607527993000L, 10));
arr.add(new WaterSensor("sensor_1", 1607527994000L, 60));
arr.add(new WaterSensor("sensor_2", 1607527996000L, 60));
arr.add(new WaterSensor("sensor_2", 1607527995000L, 90));
env.fromCollection(arr)
.keyBy(WaterSensor::getId)
.reduce(new RichReduceFunction<WaterSensor>() {
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("open方法~~~");
}
public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
System.out.println("value1-->" + value1 + ",value2-->" + value2);
return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc() + value2.getVc());
}
}).print();
env.execute();
}
}
图片.png
id相同的首条记录,不调用reduce方法;随后对id相同的余下记录,遇见一条则调用一次reduce方法:
图片.png