Flink流处理常见API

1. map

DataStream mapStram = dataStream.map(new MapFunction<String, Integer>() {

    public Integer map(String value) throws Exception {

        return value.length();

    }

});


2. flatMap

DataStream flatMapStream = dataStream.flatMap(new FlatMapFunction<String, String>() {

    public void flatMap(String value, Collector out) throws Exception { 

        String[] fields = value.split(",");

        for (String field: fields) {

            out.collect(field);

        }

    }

});


3. filter

DataStream filterStream = dataStream.filter(new FilterFunction()<String> {

    public boolean filter(String value) throws Exception {

        return value == 1;

    }

});


4. keyBy

DataStream -> KeyedStream. 逻辑上将相同key拆分到同一分区


5. Rolling Aggregation

对KeyedStream的每一个支流做滚动聚合。sum(), min(), max(), minBy(), maxBy()


6. reduce

KeyedStream -> DataStream. 合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果

// 分组

KeyedStream keyedStream= dataStream.keyBy("id");

// reduce 聚合,取最小的温度值,并输出当前的时间戳

DataStream reduceStream = keyedStream.reduce(new ReduceFunction()<SensorReading> { 

    @Override

    public SensorReading reduce(SensorReading value1,SensorReading value2) throws Exception {

        return new SensorReading(value1.getId(), value2.getTimestamp(), Math.min(value1.getTemperature(), value2.getTemperature()));

    }

});


7. Split和Select

DataStream -> split -> SplitStream -> select -> DataStream.

SplitStream splitStream = dataStream.split(new OutputSelector()<SensorReading> {

    @Override

    public Iterable select(SensorReading value) {

        return (value.getTemperature()>30) ? Collections.singletonList("high") : Collections.singletonList("low");

    }

});

DataStream highTempStream = splitStream.select("high");

DataStream lowTempStream = splitStream.select("low");

DataStream allTempStream = splitStream.select("high","low");


8. Connect

DataStream,DataStream -> ConnectedStreams. 可以连接两个不同类型的数据流,两个流被connect之后,只是被放到了同一个流里,仍然保持各自的数据类型


9. CoMap和CoFlatMap

ConnectedStreams -> DataStream. 

// 合流 connect

DataStream<Tuple2<String, Double>> warningStream = highTempStream.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {

    @Override

    public Tuple2 map(SensorReading value) throws Exception {

        return new Tuple2<>(value.getId(), value.getTemperature());

    }

});

ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStreams = warningStream.connect(lowTempStream);

DataStream<Object> resultStream = connectedStreams.map(new CoMapFunction<Tuple2<String,Double>, SensorReading, Object>() {

    @Override

    public Object map1(Tuple2 value) throws Exception{

        return new Tuple3<>(value.f0, value.f1,"warning");

    }

    @Override

    public Object map2(SensorReading value) throws Exception {

        return new Tuple2<>(value.getId(),"healthy");

    }

});


10. Union

连接数据类型一样的多条流

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容