需求:统计用户下单了但一段时间后没有支付的订单,用来触达用户。
一、模拟一个数据源,用来模拟用户行为
public class UserEvent implements Serializable {
private String pin; //用户的id
private String skuId; 商品的skuId
private String action; //用户事件 0表示下单,1表示支付
}
public class RandomSource extends RichParallelSourceFunction<UserEvent> {
private boolean isRun;
private static List<UserEvent> events1 = new ArrayList<>();
private transient int index = 0;
@Override
public void run(SourceContext<UserEvent> ctx) throws InterruptedException {
while (isRun){
if(index < events1.size()){
UserEvent event = events1.get(index % events1.size());
ctx.collect(event);
index++;
// System.out.println("send message:" + JSON.toJSONString(event));
}
Thread.sleep(1000);
}
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
isRun = true;
addEvent(events1,"zhangsan", "1", "0");
addEvent(events1,"lisi", "3", "0");
addEvent(events1,"zhangsan", "1", "1");
}
private void addEvent(Collection<UserEvent> collections, String pin,String skuId, String action){
UserEvent event = new UserEvent();
event.setPin(pin);
event.setSkuId(skuId);
event.setAction(action);
collections.add(event);
}
@Override
public void cancel() {
isRun = false;
}
}
二、逻辑代码
public class Rule6App {
private static final OutputTag<UserEvent> timeOutTag = new OutputTag<>("timeOut", TypeInformation.of(UserEvent.class));
public static void main(String[] args) throws Exception{
//步骤1、定义数据源,给数据添加水印
final AssignerWithPeriodicWatermarks extractor = new IngestionTimeExtractor<UserEvent>();
StreamExecutionEnvironment env = StreamCommon.getStreamEnv(true, true);
DataStream<UserEvent> dataStream = env.addSource(new RandomSource())
.setParallelism(1) //并行度设置成1,便于观察
.assignTimestampsAndWatermarks(extractor); //加水印
//步骤2、定义Pattern,可以看成是一个事件链
//我这里定义的意思是,先收到下单的消息,然后再收到支付的消息,事件链可以很长,也可以有
//复杂的组合事件
Pattern<UserEvent,UserEvent> pattern = Pattern.<UserEvent>begin("order")
.where(new SimpleCondition<UserEvent>() {
@Override
public boolean filter(UserEvent event) throws Exception {
return "0".equals(event.getAction());
}
})
.next("pay")
.where(new SimpleCondition<UserEvent>() {
@Override
public boolean filter(UserEvent event) throws Exception {
return "1".equals(event.getAction());
}
});
//步骤3、进行事件匹配,在匹配前,需要对流进行KeyBy分组,确保每个单元里处理的是
//同一个用户的订单
PatternStream<UserEvent> patternStream = CEP.pattern(
dataStream.keyBy(UserEvent::getPin),
pattern.within(Time.seconds(10)));
//步骤4、对匹配的结果进行分流
//这里注意到select有3个参数,第一个是超时消息的容器,这里通过旁路进行输出
//第二个参数里定义了超时的消息如何进行处理
//第三个参数里定义里正常匹配到规则的消息如何进行处理
SingleOutputStreamOperator<UserEvent> result = patternStream.select(timeOutTag, new PatternTimeoutFunction<UserEvent, UserEvent>() {
@Override
public UserEvent timeout(Map<String, List<UserEvent>> map, long l) throws Exception {
System.out.println("这是超时了的:" + JSON.toJSONString(map));
return map.get("order").get(0);
}
}, new PatternSelectFunction<UserEvent, UserEvent>() {
@Override
public UserEvent select(Map<String, List<UserEvent>> map) throws Exception {
System.out.println("这是完成的订单:" + JSON.toJSONString(map));
return map.get("pay").get(0);//这里Map有2个KEY,就是前面定义事件链的tag
}
});
//步骤5、从旁路拿到结果流,完成超时触达
DataStream<UserEvent> timeoutResult = result.getSideOutput(timeOutTag);
timeoutResult.print();
env.execute();
}
}
三、需要注意的点
1、必须要是EventTime
尝试了ProcessTime取不到超时的结果,只能拿到匹配到规则的结果
2、within的含义
within在每一条消息到达时,为该消息开启一个定时器,当整个时间链都匹配到结果,则终止定时器,否则被视为超时。
例如第一个条件要累计多次的时候,当满足累积多次后,会重新开始 计时来计算超时。
3、.oneOrMore().where(...)
满足条件后开始计时,再within限定的时间段内如果有满足条件的数据进来则修改计时器重新计时(类似SessionWindow)