reduce

import bean.WaterSensor;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;

public class Reduce {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);

        ArrayList<WaterSensor> arr = new ArrayList<>();
        arr.add(new WaterSensor("sensor_1", 1607527992000L, 20));
        arr.add(new WaterSensor("sensor_2", 1607527993000L, 10));
        arr.add(new WaterSensor("sensor_1", 1607527994000L, 60));
        arr.add(new WaterSensor("sensor_2", 1607527996000L, 60));
        arr.add(new WaterSensor("sensor_2", 1607527995000L, 90));

        env.fromCollection(arr)
                .keyBy(WaterSensor::getId)
                .reduce(new RichReduceFunction<WaterSensor>() {

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        System.out.println("open方法~~~");
                    }

                    public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                        System.out.println("~~~~ ");
                        return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc() + value2.getVc());
                    }
                }).print();

        env.execute();
    }
}

open方法只和并行度有关,并行度为几,则open执行几次:

图片.png

修改代码,把并行度设为3,在reduce方法中输出value1,value2的值:

import bean.WaterSensor;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;

public class Reduce {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(3);

        ArrayList<WaterSensor> arr = new ArrayList<>();
        arr.add(new WaterSensor("sensor_1", 1607527992000L, 20));
        arr.add(new WaterSensor("sensor_2", 1607527993000L, 10));
        arr.add(new WaterSensor("sensor_1", 1607527994000L, 60));
        arr.add(new WaterSensor("sensor_2", 1607527996000L, 60));
        arr.add(new WaterSensor("sensor_2", 1607527995000L, 90));

        env.fromCollection(arr)
                .keyBy(WaterSensor::getId)
                .reduce(new RichReduceFunction<WaterSensor>() {

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        System.out.println("open方法~~~");
                    }

                    public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                        System.out.println("value1-->" + value1 + ",value2-->" + value2);
                        return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc() + value2.getVc());
                    }
                }).print();

        env.execute();
    }
}
图片.png

id相同的首条记录,不调用reduce方法;随后对id相同的余下记录,遇见一条则调用一次reduce方法:

图片.png
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容