Flink系列 - 实时数仓之热门商品统计-TopN(三)

  作为一名初学者来说,Flink 的各种API着实使人头晕乱象,建以这种情况,今天总结下:热门商品的统计。接下来我们先看下数据源的格式(这里为了方便我们直接网上下载公开数据即可-> wget https://raw.githubusercontent.com/wuchong/my-flink-project/master/src/main/resources/UserBehavior.csv):

用户ID,商品ID,商品类目ID,行为类型,时间戳
543462,1715,1464116,pv,1511658000
662867,2244074,1575622,pv,1511658000
561558,3611281,965809,pv,1511658000
894923,3076029,1879194,pv,1511658000
834377,4541270,3738615,pv,1511658000
315321,942195,4339722,pv,1511658000

  先明确下具体的需求:按一小时的窗口大小,每5分钟统计一次《做滑动窗口聚合 - Sliding Window》,按每个窗口聚合,输出每个窗口中点击量前N名的商品。
  废话不多说,既然需求和数据源已确定,那么接下来直接上代码实践:FlinkSQL 和 Stateful Stream Processing

一、FlinkSQL 的实践

1.创建实现类 - AnalysisHotItemSQL.java,整体代码如下:

public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        DataStream<String> inputStream = env.readTextFile("C:\\Users\\Administrator\\Desktop\\my-gitlib\\shishi-daping\\dip\\shishi-daping\\NFDWSYYBigScreen\\TestJsonDmon\\src\\main\\resources\\UserBehavior.csv");
        DataStream<UserBehavior> dataStream = inputStream.map(new MapFunction<String, UserBehavior>() {
            @Override
            public UserBehavior map(String s) throws Exception {
                String[] dataArray = s.split(",");
                return new UserBehavior(Long.parseLong(dataArray[0]),Long.parseLong(dataArray[1]),Integer.parseInt(dataArray[2]),dataArray[3],Long.parseLong(dataArray[4]));
            }
        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserBehavior>(Time.seconds(1)) {
            @Override
            public long extractTimestamp(UserBehavior element) {
                return element.getTimestamp()*1000L;
            }
        });

        // ---------------  开始区 ---------------------------------------------
        
        // 具体代码实现

        // --------------- 结束区 ------------------------------------------------
        env.execute("Top PV");
    }

2.具体逻辑代码实现如下:

        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
        // 创建临时表
        tableEnv.createTemporaryView("UserBehavior", dataStream,"itemId,behavior,timestamp.rowtime as ts");

        String sql = ("select * from (select *,row_number() over(partition by windowEnd order by cnt desc) as row_num from(select itemId, count(itemId) as cnt, hop_end(ts, interval '5' minute, interval '1' hour) as windowEnd from " +
                "UserBehavior where behavior = 'pv' group by itemId, hop(ts, interval '5' minute, interval '1' hour))) where row_num <= 5").trim();

        Table topNResultTable = tableEnv.sqlQuery(sql);
        DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnv.toRetractStream(topNResultTable, Row.class);
        tuple2DataStream.print();

3.输出结果如下:
image.png

  从整体代码结构来看确实是很精简,只要创建临时表我们就可以用 SQL 来实现了,但是由于 SQL 默认是用一个状态进程存储的,因此所耗的资源自然会大一些,还有就是 SQL 的实现只能输出满足条件的数据,不能输出多余的数据,这自然会不满足某些苛刻的需求;那么如何避免这种情况呢?当然可以实现,那就是用底层的API进行开发。

二、Stateful Stream Processing 的实践

1.首先我们先写下逻辑的实现代码:

SingleOutputStreamOperator<String> singleOutputStream = dataStream.filter(ub -> ub.getBehavior().equals("pv"))
                // 1. 先按窗口进行统计
                .keyBy(ub -> ub.getItemId())
                .timeWindow(Time.hours(1), Time.minutes(5))
                .aggregate(new CountAgg(), new WindowResult())
                // 2. 根据时间窗口的结束时间分组并排序
                .keyBy(ub -> ub.getWindowEnd())
                .process(new TopN(3));

        singleOutputStream.print();

2.从逻辑代码可以看出第一步先做分组然后开窗,接下来使用 aggregate(AggregateFunction af, WindowFunction wf)做增量的聚合操作,它能使用AggregateFunction提前聚合掉数据,减少 state 的存储压力、第二部在根据窗口的结束日期进行分组然后再排序即可得到人们商品;为了方便操作聚合函数,我们先创建相应的实体类:

/**  源
 * @author feiniu
 * @create 2020-09-01 9:50
 */
public class UserBehavior {

    private long userId;         // 用户ID
    private long itemId;         // 商品ID
    private int categoryId;      // 商品类目ID
    private String behavior;     // 用户行为, 包括("pv", "buy", "cart", "fav")
    private long timestamp;      // 行为发生的时间戳,单位秒
    ......
}

/**  结果
 * @author feiniu
 * @create 2020-09-01 10:23
 */
public class ItemViewCount {

    private Long itemId;
    private Long windowEnd;
    private Long count;
    ......
}

3.CountAgg 实现了 AggregateFunction接口,功能是统计窗口中的条数,即遇到一条数据就加一:

/** COUNT 统计的聚合函数实现,每出现一条记录加一
     *       in, acc, out
     * */
    public static class CountAgg implements AggregateFunction<UserBehavior, Long, Long>{

        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(UserBehavior userBehavior, Long aLong) {
            return aLong + 1;
        }

        @Override
        public Long getResult(Long aLong) {
            return aLong;
        }

        @Override
        public Long merge(Long aLong, Long acc1) {
            return aLong + acc1;
        }
    }

  1. aggregate(AggregateFunction af, WindowFunction wf) 的第二个参数WindowFunction 将每个 key 每个窗口聚合后的结果带上其他信息进行输出。我们这里实现的 WindowResult 将主键商品ID,窗口,点击量封装成了ItemViewCount进行输出:
/** 用于输出窗口的结果
     *      in, out, key, window
     * */
    public static class WindowResult implements WindowFunction<Long, ItemViewCount, Long, TimeWindow> {

        /**
         *     窗口的主键,即 itemId
         *     窗口
         *     聚合函数的结果,即 count 值
         *     输出类型为 ItemViewCount
         */
        @Override
        public void apply(Long aLong, TimeWindow timeWindow, Iterable<Long> iterable, Collector<ItemViewCount> collector) throws Exception {
            collector.collect(new ItemViewCount(aLong, timeWindow.getEnd(), iterable.iterator().next()));
        }
    }

  目前为止我们得到了每个商品在每个窗口的点击量的数据流。

  1. TopN 计算最热门商品 - 我们实现了逻辑代码中的第二部分求出前3的热门商品,我们使用 ProcessFunction进行操作,它是 Flink 提供的一个 low-level API,用于实现更高级的功能:
/** 求某个窗口中前 N 名的热门点击商品,key 为窗口时间戳,输出为 TopN 的结果字符串
     *      K , I , O
     * */
    public static class TopN extends KeyedProcessFunction<Long, ItemViewCount, String>{

        private final int n;

        public TopN(int n) {
            this.n = n;
        }

        // 用于存储商品与点击数的状态,待收齐同一个窗口的数据后,再触发 TopN 计算
        private transient ListState<ItemViewCount> itemState = null;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            // 状态的注册
            ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>(
                    "itemState-state",
                    ItemViewCount.class);
            itemState = getRuntimeContext().getListState(itemsStateDesc);
        }

        @Override
        public void processElement(ItemViewCount itemViewCount, Context context, Collector<String> collector) throws Exception {
            // 每条数据都保存到状态中
            this.itemState.add(itemViewCount);
            // 注册 windowEnd + 1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据
            context.timerService().registerEventTimeTimer(itemViewCount.getWindowEnd() + 1);
        }

        // -------------------------------------------
        // 定时器的代码实现
        // -------------------------------------------
    }

  1. 定时器的代码实现如下:
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            super.onTimer(timestamp, ctx, out);
            // 获取收到的所有商品点击量
            List<ItemViewCount> allItems = new ArrayList<>();
            for (ItemViewCount item : itemState.get()) {
                allItems.add(item);
            }
            // 提前清除状态中的数据,释放空间
            itemState.clear();
            // 按照点击量从大到小排序
            allItems.sort(new Comparator<ItemViewCount>() {
                @Override
                public int compare(ItemViewCount o1, ItemViewCount o2) {
                    return (int) (o2.getCount() - o1.getCount());
                }
            });

            // 将排名信息格式化成 String, 便于打印
            StringBuilder result = new StringBuilder();
            result.append("====================================\n");
            result.append("时间: ").append(new Timestamp(timestamp-1)).append("\n");
            for (int i = 0; i < n; i++) {
                ItemViewCount currentItem = allItems.get(i);
                result.append("No").append(i).append(":")
                        .append("  商品ID=").append(currentItem.getItemId())
                        .append("  浏览量=").append(currentItem.getCount())
                        .append("\n");
            }
            result.append("====================================\n\n");

            out.collect(result.toString());
        }

  1. 运行结果如下:
    image.png

      到此,我们用两种方式实现了热门商品的操作,不过使用哪种方式,适合自己才是最好的;这篇文章的写作之前我在网上查了好多资料,万变不离其中,代码基本上都是相近的,在此特别感谢 云邪 大佬的资料参考:《https://mp.weixin.qq.com/s?__biz=MzUxNjkzMzc0MA==&mid=2247483698&idx=1&sn=f1e6e7c44274af25da70a239eb47a48d&scene=19#wechat_redirect
      欢迎各位同学留言讨论,共同进步。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,837评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,551评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,417评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,448评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,524评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,554评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,569评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,316评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,766评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,077评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,240评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,912评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,560评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,176评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,425评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,114评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,114评论 2 352