package com.ctgu.flink.project;
import com.ctgu.flink.entity.BehaviorChannelCount;
import com.ctgu.flink.entity.MarketingUserBehavior;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
public class Flink_Sql_Marketing {
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<MarketingUserBehavior> dataStream = env.addSource(new SimulatedMarketingUserBehaviorSource())
.assignTimestampsAndWatermarks(WatermarkStrategy
.<MarketingUserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp()));
dataStream.filter(data -> !"UNINSTALL".equals(data.getBehavior()))
.keyBy(new KeySelector<MarketingUserBehavior, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> getKey(MarketingUserBehavior userBehavior) throws Exception {
return new Tuple2<>(userBehavior.getChannel(), userBehavior.getBehavior());
}
})
// .keyBy(MarketingUserBehavior::getBehavior)
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.seconds(1)))
.aggregate(new AverageAggregate(), new MyProcessWindowFunction())
.print("分组求和:");
dataStream.filter(data -> !"UNINSTALL".equals(data.getBehavior()))
.map(new MyMapFunction())
.keyBy(data -> data.f0)
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.seconds(1)))
.aggregate(new AverageAggregate1(), new MyWindowFunction())
.print("total:");
env.execute("Table SQL");
System.out.println("耗时: " + (System.currentTimeMillis() - start) / 1000);
}
private static class SimulatedMarketingUserBehaviorSource implements SourceFunction<MarketingUserBehavior> {
boolean running = true;
List<String> behaviorList = Arrays.asList("CLICK", "DOWNLOAD", "INSTALL", "UNINSTALL");
List<String> channelList = Arrays.asList("app store", "wechat", "tencent", "ali");
Random rand = new Random();
@Override
public void run(SourceContext<MarketingUserBehavior> sourceContext) throws Exception {
while (running) {
long userId = rand.nextLong();
String behavior = behaviorList.get(rand.nextInt(behaviorList.size()));
String channel = channelList.get(rand.nextInt(channelList.size()));
long timestamp = System.currentTimeMillis();
MarketingUserBehavior userBehavior = new MarketingUserBehavior(userId, behavior, channel, timestamp);
System.out.println(userBehavior);
sourceContext.collect(userBehavior);
Thread.sleep(100);
}
}
@Override
public void cancel() {
running = false;
}
}
private static class MyMapFunction extends RichMapFunction<MarketingUserBehavior, Tuple2<String, Long>> {
@Override
public Tuple2<String, Long> map(MarketingUserBehavior userBehavior) throws Exception {
return new Tuple2<>("total", 1L);
}
}
private static class AverageAggregate
implements AggregateFunction<MarketingUserBehavior, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(MarketingUserBehavior userBehavior, Long aLong) {
return aLong + 1;
}
@Override
public Long getResult(Long aLong) {
return aLong;
}
@Override
public Long merge(Long a, Long b) {
return a + b;
}
}
private static class AverageAggregate1
implements AggregateFunction<Tuple2<String, Long>, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(Tuple2<String, Long> tuple, Long aLong) {
return aLong + 1;
}
@Override
public Long getResult(Long aLong) {
return aLong;
}
@Override
public Long merge(Long a, Long b) {
return a + b;
}
}
private static class MyWindowFunction
implements WindowFunction<Long, BehaviorChannelCount, String, TimeWindow> {
@Override
public void apply(String key,
TimeWindow timeWindow,
Iterable<Long> iterable,
Collector<BehaviorChannelCount> out) throws Exception {
String windowEnd = new Timestamp(timeWindow.getEnd()).toString();
Long count = iterable.iterator().next();
out.collect(new BehaviorChannelCount(key, key, windowEnd, count));
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<Long, BehaviorChannelCount, Tuple2<String, String>, TimeWindow> {
@Override
public void process(Tuple2<String, String> tuple2,
Context context,
Iterable<Long> iterable,
Collector<BehaviorChannelCount> out) throws Exception {
String channel = tuple2.getField(0);
String behavior = tuple2.getField(1);
String windowEnd = new Timestamp(context.window().getEnd()).toString();
Long count = iterable.iterator().next();
out.collect(new BehaviorChannelCount(behavior, channel, windowEnd, count));
}
}
}
Flink-6.Flink 分组求和
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 一、简单说明 本以为mybatis的example可以搞定group by,后面看到说不行于是曲线救国,直接查出一...
- mongodb 分组统计: 按deviceId、tenant分组,统计总记录条数、求和workload 删除sen...
- 最近有个培训班的小伙伴遇到了这样的问题,他想对字符串变量进行分组求和与分组累加,但是他不知道该如何实现,今天我们就...
- 文章详细地址:https://tengxiaotao.top/blog/7 模型(models.py) 视图(vi...