public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Double>> dataStream = env.addSource(new MySource());
DataStream<CategoryPojo> result = dataStream.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.days(
1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(
1)))
.aggregate(
new PriceAggregate(),
new WindowResult()
);
result.print();
result.keyBy("dateTime")
.window(TumblingProcessingTimeWindows.of(Time.seconds(
1)))
.process(new WindowResultProcess());
env.execute();
}
private static class WindowResultProcess
extends ProcessWindowFunction<CategoryPojo, Object, Tuple, TimeWindow> {
@Override
public void process(
Tuple tuple,
Context context,
Iterable<CategoryPojo> elements,
Collector<Object> out) throws Exception {
String date = ((Tuple1<String>) tuple).f0;
Queue<CategoryPojo> queue = new PriorityQueue<>(
3,
(o1, o2) -> o1.getTotalPrice() >= o2.getTotalPrice() ? 1 : -1);
double price = 0D;
Iterator<CategoryPojo> iterator = elements.iterator();
while (iterator.hasNext()) {
CategoryPojo categoryPojo = iterator.next();
//使用优先级队列计算出top3
if (queue.size() < 3) {
queue.add(categoryPojo);
} else {
//计算topN的时候需要小顶堆,也就是要去掉堆顶比较小的元素
CategoryPojo tmp = queue.peek();
if (categoryPojo.getTotalPrice() > tmp.getTotalPrice()) {
queue.poll();
queue.add(categoryPojo);
}
}
price += categoryPojo.getTotalPrice();
}
//计算出来的queue是无序的,所以我们需要先sort一下
List<String> list = queue.stream()
.sorted((o1, o2) -> o1.getTotalPrice() <=
o2.getTotalPrice() ? 1 : -1)
.map(f -> "(分类:" + f.getCategory() + " 销售额:" +
f.getTotalPrice() + ")")
.collect(
Collectors.toList());
System.out.println("时间 : " + date + " 总价 : " + price + " top3 " +
StringUtils.join(list, ","));
System.out.println("-------------");
}
}
private static class WindowResult
implements WindowFunction<Double, CategoryPojo, Tuple, TimeWindow> {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public void apply(
Tuple key,
TimeWindow window,
Iterable<Double> input,
Collector<CategoryPojo> out) throws Exception {
CategoryPojo categoryPojo = new CategoryPojo();
categoryPojo.setCategory(((Tuple1<String>) key).f0);
BigDecimal bg = new BigDecimal(input.iterator().next());
double p = bg.setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();
categoryPojo.setTotalPrice(p);
categoryPojo.setDateTime(simpleDateFormat.format(new Date()));
out.collect(categoryPojo);
}
}
/**
* 用于存储聚合的结果
*/
public static class CategoryPojo {
// 分类名称
private String category;
// 改分类总销售额
private double totalPrice;
// 截止到当前时间的时间
private String dateTime;
public String getCategory() {
return category;
}
public void setCategory(String category) {
this.category = category;
}
public double getTotalPrice() {
return totalPrice;
}
public void setTotalPrice(double totalPrice) {
this.totalPrice = totalPrice;
}
public String getDateTime() {
return dateTime;
}
public void setDateTime(String dateTime) {
this.dateTime = dateTime;
}
@Override
public String toString() {
return "CategoryPojo{" +
"category='" + category + '\'' +
", totalPrice=" + totalPrice +
", dateTime=" + dateTime +
'}';
}
}
private static class PriceAggregate
implements AggregateFunction<Tuple2<String, Double>, Double, Double> {
@Override
public Double createAccumulator() {
return 0D;
}
@Override
public Double add(Tuple2<String, Double> value, Double accumulator) {
return accumulator + value.f1;
}
@Override
public Double getResult(Double accumulator) {
return accumulator;
}
@Override
public Double merge(Double a, Double b) {
return a + b;
}
}
/**
* 模拟生成某一个分类下的订单生成
*/
public static class MySource implements SourceFunction<Tuple2<String, Double>> {
private volatile boolean isRunning = true;
private Random random = new Random();
String category[] = {
"女装", "男装",
"图书", "家电",
"洗护", "美妆",
"运动", "游戏",
"户外", "家具",
"乐器", "办公"
};
@Override
public void run(SourceContext<Tuple2<String, Double>> ctx) throws Exception {
while (isRunning) {
Thread.sleep(1000);
//某一个分类
String c = category[(int) (Math.random() * (category.length - 1))];
//某一个分类下产生了price的成交订单
double price = random.nextDouble() * 100;
ctx.collect(Tuple2.of(c, price));
}
}
@Override
public void cancel() {
isRunning = false;
}
}
Flink实战 - 实时计算当天销售总额与Top3分类
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。