Flink 计算 TopN

前言

使用 flink 很长一段时间了,突然发现竟然没有计算过 topN,这可是 flink 常见的计算场景了, 故自己想了一个场景来计算一下。
基于 Flink 1.12

场景

外卖员听单的信息会发到单独一个 topic 中,计算一个每天有多少个 外卖员听单以及总共的听单次数。

kafka 中消息类型

{"locTime":"2020-12-28 12:32:23","courierId":12,"other":"aaa"}

locTime:事件发生的时间,courierId 外卖员id

计算一天中 听单次数 top5 的外卖员

代码

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(topics, new SimpleStringSchema(), properties);
        FlinkHelp.setOffset(parameter, consumer);
        consumer.assignTimestampsAndWatermarks(
                WatermarkStrategy.<String>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<String>() {
                            @Override
                            public long extractTimestamp(String element, long recordTimestamp) {
                                String locTime = "";
                                try {
                                    Map<String, Object> map = Json2Others.json2map(element);
                                    locTime = map.get("locTime").toString();
                                } catch (IOException e) {
                                }
                                LocalDateTime startDateTime =
                                        LocalDateTime.parse(locTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
                                long milli = startDateTime.toInstant(OffsetDateTime.now().getOffset()).toEpochMilli();
                                return milli;
                            }
                        }).withIdleness(Duration.ofSeconds(1)));

        SingleOutputStreamOperator<CourierListenInfos> process = env.addSource(consumer).filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return true;
            }
        }).keyBy(new KeySelector<String, String>() {
            @Override
            public String getKey(String value) throws Exception {
                Map<String, Object> map = Json2Others.json2map(value);
                String courierId = map.get("courierId").toString();
                String day = map.get("locTime").toString().split(" ")[0].replace("-", "");
                return day + "-" + courierId;
            }
        }).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
                .allowedLateness(Time.minutes(1))
//              .trigger(CountTrigger.of(5))// 其实多个 trigger 就是下一个 trigger 覆盖上一个 trigger
                .trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
                //追历史数据的时候会有问题
//              .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
                //处理完毕后将 window state 中的数据清除掉
                .evictor(TimeEvictor.of(Time.seconds(0), true))
                .process(new ProcessWindowFunction<String, CourierListenInfos, String, TimeWindow>() {
                    private JedisCluster jedisCluster;
                    private ReducingStateDescriptor<Long> reducingStateDescriptor;
                    private ReducingState<Long> listenCount;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        StateTtlConfig ttlConfig = StateTtlConfig
                                .newBuilder(org.apache.flink.api.common.time.Time.hours(25))
                                //default,不支持 eventTime 1.12.0
                                .setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime)
                                .cleanupInRocksdbCompactFilter(1000)
                                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default
                                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                                .build();

                        reducingStateDescriptor =
                                new ReducingStateDescriptor<Long>("listenCount", new Sum(), TypeInformation.of(Long.class));
                        reducingStateDescriptor.enableTimeToLive(ttlConfig);
                        listenCount = getRuntimeContext().getReducingState(reducingStateDescriptor);

                        jedisCluster = RedisUtil.getJedisCluster(redisHp);
                    }

                    @Override
                    public void close() throws Exception {
                        RedisUtil.closeConn(jedisCluster);
                    }

                    @Override
                    public void process(String s, Context context, Iterable<String> elements, Collector<CourierListenInfos> out) throws Exception {
                        Iterator<String> iterator = elements.iterator();

                        long l = context.currentProcessingTime();
                        long watermark = context.currentWatermark();
                        TimeWindow window = context.window();

                        String endDay = DateUtils.millisecondsToDateStr(window.getEnd(), "yyyyMMdd HH:mm:ss");
                        String startDay = DateUtils.millisecondsToDateStr(window.getStart(), "yyyyMMdd HH:mm:ss");

                        System.out.println("currentProcessingTime:" + l + " watermark:" + watermark + " windowTime:" + startDay + "-" + endDay);

                        while (iterator.hasNext()) {
                            iterator.next();
                            listenCount.add(1L);
                        }

                        iterator = elements.iterator();
                        Map<String, Object> map = Json2Others.json2map(iterator.next());
                        String courierId = map.get("courierId").toString();
                        String day = map.get("locTime").toString().split(" ")[0].replace("-", "");
                        out.collect(new CourierListenInfos(day, courierId, listenCount.get()));
                    }
                });

        process.keyBy(new KeySelector<CourierListenInfos, String>() {
            @Override
            public String getKey(CourierListenInfos value) throws Exception {
                return value.getDay();
            }
        }).process(new KeyedProcessFunction<String, CourierListenInfos, String>() {
            private JedisCluster jedisCluster;
            private MapStateDescriptor<String, Long> mapStateCountDescriptor;
            private MapState<String, Long> courierInfoCountMapState;
            private boolean mucalc = false;

            @Override
            public void open(Configuration parameters) throws Exception {
                StateTtlConfig ttlConfig = StateTtlConfig
                        .newBuilder(org.apache.flink.api.common.time.Time.hours(25))
                        //default,不支持 eventTime 1.12.0
                        .setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime)
                        .cleanupInRocksdbCompactFilter(1000)
                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default
                        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                        .build();

                mapStateCountDescriptor =
                        new MapStateDescriptor<String, Long>("courierInfoCountMapState", TypeInformation.of(String.class), TypeInformation.of(Long.class));
                mapStateCountDescriptor.enableTimeToLive(ttlConfig);
                courierInfoCountMapState = getRuntimeContext().getMapState(mapStateCountDescriptor);

                jedisCluster = RedisUtil.getJedisCluster(redisHp);
            }

            @Override
            public void close() throws Exception {
                RedisUtil.closeConn(jedisCluster);
            }

            @Override
            public void processElement(CourierListenInfos value, Context ctx, Collector<String> out) throws Exception {
                courierInfoCountMapState.put(value.getDay() + "#" + value.getCourierId(), value.getListenCount());
//              System.out.println("ctx.timerService().currentWatermark() = " + DateUtils.millisecondsToDateStr(ctx.timerService().currentWatermark(), "yyyyMMdd HH:mm:ss"));
//              System.out.println("ctx.timestamp() = " + DateUtils.millisecondsToDateStr(ctx.timestamp(), "yyyyMMdd HH:mm:ss"));
                ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() / 1000 + 1000);
            }

            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                String day = ctx.getCurrentKey();
                PriorityQueue<CourierListenInfos> courierListenInfos = new PriorityQueue<>(new Comparator<CourierListenInfos>() {
                    @Override
                    public int compare(CourierListenInfos o1, CourierListenInfos o2) {
                        return (int) (o1.listenCount - o2.listenCount);
                    }
                });

                Iterable<Map.Entry<String, Long>> entries = courierInfoCountMapState.entries();
                for (Map.Entry<String, Long> entry : entries) {
//                  System.out.println("entry.getKey() " + entry.getKey());
                    String[] split = entry.getKey().split("#", -1);
                    courierListenInfos.offer(new CourierListenInfos(split[0], split[1], entry.getValue()));
                    if (courierListenInfos.size() > 5) {
                        courierListenInfos.poll();
                    }
                }

                courierInfoCountMapState.clear();
                String tops = "";
                int size = courierListenInfos.size();
                for (int i = 0; i < size; i++) {
                    CourierListenInfos courierListenInfos1 = courierListenInfos.poll();
                    System.out.println("courierListenInfos1 " + courierListenInfos1);
                    courierInfoCountMapState.put(courierListenInfos1.getDay() + "#" + courierListenInfos1.getCourierId(), courierListenInfos1.listenCount);
                    tops = tops + courierListenInfos1.courierId + "#" + courierListenInfos1.listenCount;
                    if (i != size - 1) {
                        tops += ",";
                    }
                }
//              System.out.println("courierListenInfos.poll() = " + tops);
                jedisCluster.hset("test_courier_tops", day + "-top5", tops);
                System.out.println("============");
            }
        }).setParallelism(1);

结果样例

'20201227-top5':'1#1111,2#2222,3#3333'
'20201227-top5':'1#1111,2#2222,3#3333'

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342