Flink实战 - 实时计算当天销售总额与Top3分类


public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String, Double>> dataStream = env.addSource(new MySource());

        DataStream<CategoryPojo> result = dataStream.keyBy(0)
                .window(TumblingProcessingTimeWindows.of(Time.days(
                        1), Time.hours(-8)))
                .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(
                        1)))
                .aggregate(
                        new PriceAggregate(),
                        new WindowResult()
                );

        result.print();

        result.keyBy("dateTime")
                .window(TumblingProcessingTimeWindows.of(Time.seconds(
                        1)))
                .process(new WindowResultProcess());

        env.execute();
    }

    private static class WindowResultProcess
            extends ProcessWindowFunction<CategoryPojo, Object, Tuple, TimeWindow> {

        @Override
        public void process(

                Tuple tuple,
                Context context,
                Iterable<CategoryPojo> elements,
                Collector<Object> out) throws Exception {
            String date = ((Tuple1<String>) tuple).f0;

            Queue<CategoryPojo> queue = new PriorityQueue<>(
                    3,
                    (o1, o2) -> o1.getTotalPrice() >= o2.getTotalPrice() ? 1 : -1);
            double price = 0D;
            Iterator<CategoryPojo> iterator = elements.iterator();
            while (iterator.hasNext()) {
                CategoryPojo categoryPojo = iterator.next();
                //使用优先级队列计算出top3
                if (queue.size() < 3) {
                    queue.add(categoryPojo);
                } else {
                    //计算topN的时候需要小顶堆,也就是要去掉堆顶比较小的元素
                    CategoryPojo tmp = queue.peek();
                    if (categoryPojo.getTotalPrice() > tmp.getTotalPrice()) {
                        queue.poll();
                        queue.add(categoryPojo);
                    }
                }
                price += categoryPojo.getTotalPrice();
            }

            //计算出来的queue是无序的,所以我们需要先sort一下
            List<String> list = queue.stream()
                    .sorted((o1, o2) -> o1.getTotalPrice() <=
                            o2.getTotalPrice() ? 1 : -1)
                    .map(f -> "(分类:" + f.getCategory() + " 销售额:" +
                            f.getTotalPrice() + ")")
                    .collect(
                            Collectors.toList());
            System.out.println("时间 : " + date + "  总价 : " + price + " top3 " +
                    StringUtils.join(list, ","));
            System.out.println("-------------");
        }

    }

    private static class WindowResult
            implements WindowFunction<Double, CategoryPojo, Tuple, TimeWindow> {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        @Override
        public void apply(
                Tuple key,
                TimeWindow window,
                Iterable<Double> input,
                Collector<CategoryPojo> out) throws Exception {
            CategoryPojo categoryPojo = new CategoryPojo();
            categoryPojo.setCategory(((Tuple1<String>) key).f0);

            BigDecimal bg = new BigDecimal(input.iterator().next());
            double p = bg.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();
            categoryPojo.setTotalPrice(p);
            categoryPojo.setDateTime(simpleDateFormat.format(new Date()));
            out.collect(categoryPojo);
        }
    }

    /**
     * 用于存储聚合的结果
     */
    public static class CategoryPojo {
        //      分类名称
        private String category;
        //      改分类总销售额
        private double totalPrice;
        //      截止到当前时间的时间
        private String dateTime;

        public String getCategory() {
            return category;
        }

        public void setCategory(String category) {
            this.category = category;
        }

        public double getTotalPrice() {
            return totalPrice;
        }

        public void setTotalPrice(double totalPrice) {
            this.totalPrice = totalPrice;
        }

        public String getDateTime() {
            return dateTime;
        }

        public void setDateTime(String dateTime) {
            this.dateTime = dateTime;
        }

        @Override
        public String toString() {
            return "CategoryPojo{" +
                    "category='" + category + '\'' +
                    ", totalPrice=" + totalPrice +
                    ", dateTime=" + dateTime +
                    '}';
        }

    }

    private static class PriceAggregate
            implements AggregateFunction<Tuple2<String, Double>, Double, Double> {

        @Override
        public Double createAccumulator() {
            return 0D;
        }

        @Override
        public Double add(Tuple2<String, Double> value, Double accumulator) {
            return accumulator + value.f1;
        }

        @Override
        public Double getResult(Double accumulator) {
            return accumulator;
        }

        @Override
        public Double merge(Double a, Double b) {
            return a + b;
        }
    }

    /**
     * 模拟生成某一个分类下的订单生成
     */
    public static class MySource implements SourceFunction<Tuple2<String, Double>> {

        private volatile boolean isRunning = true;
        private Random random = new Random();
        String category[] = {
                "女装", "男装",
                "图书", "家电",
                "洗护", "美妆",
                "运动", "游戏",
                "户外", "家具",
                "乐器", "办公"
        };

        @Override
        public void run(SourceContext<Tuple2<String, Double>> ctx) throws Exception {
            while (isRunning) {
                Thread.sleep(1000);
                //某一个分类
                String c = category[(int) (Math.random() * (category.length - 1))];
                //某一个分类下产生了price的成交订单
                double price = random.nextDouble() * 100;
                ctx.collect(Tuple2.of(c, price));
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。