前言
使用 flink 很长一段时间了,突然发现竟然没有计算过 topN,这可是 flink 常见的计算场景了, 故自己想了一个场景来计算一下。
基于 Flink 1.12
场景
外卖员听单的信息会发到单独一个 topic 中,计算一个每天有多少个 外卖员听单以及总共的听单次数。
kafka 中消息类型
{"locTime":"2020-12-28 12:32:23","courierId":12,"other":"aaa"}
locTime:事件发生的时间,courierId 外卖员id
计算一天中 听单次数 top5 的外卖员
代码
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(topics, new SimpleStringSchema(), properties);
FlinkHelp.setOffset(parameter, consumer);
consumer.assignTimestampsAndWatermarks(
WatermarkStrategy.<String>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<String>() {
@Override
public long extractTimestamp(String element, long recordTimestamp) {
String locTime = "";
try {
Map<String, Object> map = Json2Others.json2map(element);
locTime = map.get("locTime").toString();
} catch (IOException e) {
}
LocalDateTime startDateTime =
LocalDateTime.parse(locTime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
long milli = startDateTime.toInstant(OffsetDateTime.now().getOffset()).toEpochMilli();
return milli;
}
}).withIdleness(Duration.ofSeconds(1)));
SingleOutputStreamOperator<CourierListenInfos> process = env.addSource(consumer).filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return true;
}
}).keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String value) throws Exception {
Map<String, Object> map = Json2Others.json2map(value);
String courierId = map.get("courierId").toString();
String day = map.get("locTime").toString().split(" ")[0].replace("-", "");
return day + "-" + courierId;
}
}).window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.allowedLateness(Time.minutes(1))
// .trigger(CountTrigger.of(5))// 其实多个 trigger 就是下一个 trigger 覆盖上一个 trigger
.trigger(ContinuousEventTimeTrigger.of(Time.seconds(30)))
//追历史数据的时候会有问题
// .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
//处理完毕后将 window state 中的数据清除掉
.evictor(TimeEvictor.of(Time.seconds(0), true))
.process(new ProcessWindowFunction<String, CourierListenInfos, String, TimeWindow>() {
private JedisCluster jedisCluster;
private ReducingStateDescriptor<Long> reducingStateDescriptor;
private ReducingState<Long> listenCount;
@Override
public void open(Configuration parameters) throws Exception {
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(org.apache.flink.api.common.time.Time.hours(25))
//default,不支持 eventTime 1.12.0
.setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime)
.cleanupInRocksdbCompactFilter(1000)
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
reducingStateDescriptor =
new ReducingStateDescriptor<Long>("listenCount", new Sum(), TypeInformation.of(Long.class));
reducingStateDescriptor.enableTimeToLive(ttlConfig);
listenCount = getRuntimeContext().getReducingState(reducingStateDescriptor);
jedisCluster = RedisUtil.getJedisCluster(redisHp);
}
@Override
public void close() throws Exception {
RedisUtil.closeConn(jedisCluster);
}
@Override
public void process(String s, Context context, Iterable<String> elements, Collector<CourierListenInfos> out) throws Exception {
Iterator<String> iterator = elements.iterator();
long l = context.currentProcessingTime();
long watermark = context.currentWatermark();
TimeWindow window = context.window();
String endDay = DateUtils.millisecondsToDateStr(window.getEnd(), "yyyyMMdd HH:mm:ss");
String startDay = DateUtils.millisecondsToDateStr(window.getStart(), "yyyyMMdd HH:mm:ss");
System.out.println("currentProcessingTime:" + l + " watermark:" + watermark + " windowTime:" + startDay + "-" + endDay);
while (iterator.hasNext()) {
iterator.next();
listenCount.add(1L);
}
iterator = elements.iterator();
Map<String, Object> map = Json2Others.json2map(iterator.next());
String courierId = map.get("courierId").toString();
String day = map.get("locTime").toString().split(" ")[0].replace("-", "");
out.collect(new CourierListenInfos(day, courierId, listenCount.get()));
}
});
process.keyBy(new KeySelector<CourierListenInfos, String>() {
@Override
public String getKey(CourierListenInfos value) throws Exception {
return value.getDay();
}
}).process(new KeyedProcessFunction<String, CourierListenInfos, String>() {
private JedisCluster jedisCluster;
private MapStateDescriptor<String, Long> mapStateCountDescriptor;
private MapState<String, Long> courierInfoCountMapState;
private boolean mucalc = false;
@Override
public void open(Configuration parameters) throws Exception {
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(org.apache.flink.api.common.time.Time.hours(25))
//default,不支持 eventTime 1.12.0
.setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime)
.cleanupInRocksdbCompactFilter(1000)
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//default
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
mapStateCountDescriptor =
new MapStateDescriptor<String, Long>("courierInfoCountMapState", TypeInformation.of(String.class), TypeInformation.of(Long.class));
mapStateCountDescriptor.enableTimeToLive(ttlConfig);
courierInfoCountMapState = getRuntimeContext().getMapState(mapStateCountDescriptor);
jedisCluster = RedisUtil.getJedisCluster(redisHp);
}
@Override
public void close() throws Exception {
RedisUtil.closeConn(jedisCluster);
}
@Override
public void processElement(CourierListenInfos value, Context ctx, Collector<String> out) throws Exception {
courierInfoCountMapState.put(value.getDay() + "#" + value.getCourierId(), value.getListenCount());
// System.out.println("ctx.timerService().currentWatermark() = " + DateUtils.millisecondsToDateStr(ctx.timerService().currentWatermark(), "yyyyMMdd HH:mm:ss"));
// System.out.println("ctx.timestamp() = " + DateUtils.millisecondsToDateStr(ctx.timestamp(), "yyyyMMdd HH:mm:ss"));
ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() / 1000 + 1000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
String day = ctx.getCurrentKey();
PriorityQueue<CourierListenInfos> courierListenInfos = new PriorityQueue<>(new Comparator<CourierListenInfos>() {
@Override
public int compare(CourierListenInfos o1, CourierListenInfos o2) {
return (int) (o1.listenCount - o2.listenCount);
}
});
Iterable<Map.Entry<String, Long>> entries = courierInfoCountMapState.entries();
for (Map.Entry<String, Long> entry : entries) {
// System.out.println("entry.getKey() " + entry.getKey());
String[] split = entry.getKey().split("#", -1);
courierListenInfos.offer(new CourierListenInfos(split[0], split[1], entry.getValue()));
if (courierListenInfos.size() > 5) {
courierListenInfos.poll();
}
}
courierInfoCountMapState.clear();
String tops = "";
int size = courierListenInfos.size();
for (int i = 0; i < size; i++) {
CourierListenInfos courierListenInfos1 = courierListenInfos.poll();
System.out.println("courierListenInfos1 " + courierListenInfos1);
courierInfoCountMapState.put(courierListenInfos1.getDay() + "#" + courierListenInfos1.getCourierId(), courierListenInfos1.listenCount);
tops = tops + courierListenInfos1.courierId + "#" + courierListenInfos1.listenCount;
if (i != size - 1) {
tops += ",";
}
}
// System.out.println("courierListenInfos.poll() = " + tops);
jedisCluster.hset("test_courier_tops", day + "-top5", tops);
System.out.println("============");
}
}).setParallelism(1);
结果样例
'20201227-top5':'1#1111,2#2222,3#3333'
'20201227-top5':'1#1111,2#2222,3#3333'