Flink-6.Flink 分组求和

package com.ctgu.flink.project;


import com.ctgu.flink.entity.BehaviorChannelCount;
import com.ctgu.flink.entity.MarketingUserBehavior;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.sql.Timestamp;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Random;

public class Flink_Sql_Marketing {
    public static void main(String[] args) throws Exception {

        long start = System.currentTimeMillis();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<MarketingUserBehavior> dataStream = env.addSource(new SimulatedMarketingUserBehaviorSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<MarketingUserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(0))
                        .withTimestampAssigner((event, timestamp) -> event.getTimestamp()));

        dataStream.filter(data -> !"UNINSTALL".equals(data.getBehavior()))
                .keyBy(new KeySelector<MarketingUserBehavior, Tuple2<String, String>>() {

                    @Override
                    public Tuple2<String, String> getKey(MarketingUserBehavior userBehavior) throws Exception {
                        return new Tuple2<>(userBehavior.getChannel(), userBehavior.getBehavior());
                    }
                })
//                .keyBy(MarketingUserBehavior::getBehavior)
                .window(SlidingEventTimeWindows.of(Time.hours(1), Time.seconds(1)))
                .aggregate(new AverageAggregate(), new MyProcessWindowFunction())
                .print("分组求和:");

        dataStream.filter(data -> !"UNINSTALL".equals(data.getBehavior()))
                .map(new MyMapFunction())
                .keyBy(data -> data.f0)
                .window(SlidingEventTimeWindows.of(Time.hours(1), Time.seconds(1)))
                .aggregate(new AverageAggregate1(), new MyWindowFunction())
                .print("total:");

        env.execute("Table SQL");

        System.out.println("耗时: " + (System.currentTimeMillis() - start) / 1000);
    }

    private static class SimulatedMarketingUserBehaviorSource implements SourceFunction<MarketingUserBehavior> {
        boolean running = true;
        List<String> behaviorList = Arrays.asList("CLICK", "DOWNLOAD", "INSTALL", "UNINSTALL");
        List<String> channelList = Arrays.asList("app store", "wechat", "tencent", "ali");
        Random rand = new Random();

        @Override
        public void run(SourceContext<MarketingUserBehavior> sourceContext) throws Exception {
            while (running) {
                long userId = rand.nextLong();
                String behavior = behaviorList.get(rand.nextInt(behaviorList.size()));
                String channel = channelList.get(rand.nextInt(channelList.size()));
                long timestamp = System.currentTimeMillis();
                MarketingUserBehavior userBehavior = new MarketingUserBehavior(userId, behavior, channel, timestamp);
                System.out.println(userBehavior);
                sourceContext.collect(userBehavior);
                Thread.sleep(100);
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }

    private static class MyMapFunction extends RichMapFunction<MarketingUserBehavior, Tuple2<String, Long>> {

        @Override
        public Tuple2<String, Long> map(MarketingUserBehavior userBehavior) throws Exception {
            return new Tuple2<>("total", 1L);
        }
    }

    private static class AverageAggregate
            implements AggregateFunction<MarketingUserBehavior, Long, Long> {
        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(MarketingUserBehavior userBehavior, Long aLong) {
            return aLong + 1;
        }

        @Override
        public Long getResult(Long aLong) {
            return aLong;
        }

        @Override
        public Long merge(Long a, Long b) {
            return a + b;
        }
    }

    private static class AverageAggregate1
            implements AggregateFunction<Tuple2<String, Long>, Long, Long> {
        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(Tuple2<String, Long> tuple, Long aLong) {
            return aLong + 1;
        }

        @Override
        public Long getResult(Long aLong) {
            return aLong;
        }

        @Override
        public Long merge(Long a, Long b) {
            return a + b;
        }
    }

    private static class MyWindowFunction
            implements WindowFunction<Long, BehaviorChannelCount, String, TimeWindow> {

        @Override
        public void apply(String key,
                          TimeWindow timeWindow,
                          Iterable<Long> iterable,
                          Collector<BehaviorChannelCount> out) throws Exception {
            String windowEnd = new Timestamp(timeWindow.getEnd()).toString();
            Long count = iterable.iterator().next();
            out.collect(new BehaviorChannelCount(key, key, windowEnd, count));
        }
    }

    private static class MyProcessWindowFunction
            extends ProcessWindowFunction<Long, BehaviorChannelCount, Tuple2<String, String>, TimeWindow> {

        @Override
        public void process(Tuple2<String, String> tuple2,
                            Context context,
                            Iterable<Long> iterable,
                            Collector<BehaviorChannelCount> out) throws Exception {
            String channel = tuple2.getField(0);
            String behavior = tuple2.getField(1);
            String windowEnd = new Timestamp(context.window().getEnd()).toString();
            Long count = iterable.iterator().next();
            out.collect(new BehaviorChannelCount(behavior, channel, windowEnd, count));
        }
    }

}

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容