使用FlinkCEP统计超时未支付的订单/用户触达

需求:统计用户下单了但一段时间后没有支付的订单,用来触达用户。

一、模拟一个数据源,用来模拟用户行为

public class UserEvent implements Serializable {

    private String pin; //用户的id
    private String skuId; 商品的skuId
    private String action; //用户事件 0表示下单,1表示支付
}

public class RandomSource extends RichParallelSourceFunction<UserEvent> {

    private boolean isRun;

    private static List<UserEvent> events1 = new ArrayList<>();
    private transient int index = 0;

    @Override
    public void run(SourceContext<UserEvent> ctx) throws InterruptedException {
        while (isRun){
            if(index < events1.size()){
                UserEvent event = events1.get(index % events1.size());
                ctx.collect(event);
                index++;
//                System.out.println("send message:" + JSON.toJSONString(event));
            }
            Thread.sleep(1000);
        }
    }


    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        isRun = true;

        addEvent(events1,"zhangsan", "1", "0");
        addEvent(events1,"lisi", "3", "0");
        addEvent(events1,"zhangsan", "1", "1");

    }

    private void addEvent(Collection<UserEvent> collections, String pin,String skuId, String action){
        UserEvent event = new UserEvent();
        event.setPin(pin);
        event.setSkuId(skuId);
        event.setAction(action);
        collections.add(event);
    }


    @Override
    public void cancel() {
        isRun = false;
    }
}

二、逻辑代码

public class Rule6App {

    private static final OutputTag<UserEvent> timeOutTag = new OutputTag<>("timeOut", TypeInformation.of(UserEvent.class));

    public static void main(String[] args) throws Exception{
    
        //步骤1、定义数据源,给数据添加水印
        final AssignerWithPeriodicWatermarks extractor = new IngestionTimeExtractor<UserEvent>();

        StreamExecutionEnvironment env = StreamCommon.getStreamEnv(true, true);
        DataStream<UserEvent> dataStream = env.addSource(new RandomSource())
                .setParallelism(1) //并行度设置成1,便于观察
                .assignTimestampsAndWatermarks(extractor); //加水印
        //步骤2、定义Pattern,可以看成是一个事件链
        //我这里定义的意思是,先收到下单的消息,然后再收到支付的消息,事件链可以很长,也可以有
       //复杂的组合事件
        Pattern<UserEvent,UserEvent> pattern = Pattern.<UserEvent>begin("order")
                .where(new SimpleCondition<UserEvent>() {
                    @Override
                    public boolean filter(UserEvent event) throws Exception {
                        return "0".equals(event.getAction());
                    }
                })
                .next("pay")
                .where(new SimpleCondition<UserEvent>() {
                    @Override
                    public boolean filter(UserEvent event) throws Exception {
                        return "1".equals(event.getAction());
                    }
                });
        
       //步骤3、进行事件匹配,在匹配前,需要对流进行KeyBy分组,确保每个单元里处理的是
       //同一个用户的订单
        PatternStream<UserEvent> patternStream = CEP.pattern(
                dataStream.keyBy(UserEvent::getPin),
                pattern.within(Time.seconds(10)));
    
      //步骤4、对匹配的结果进行分流
      //这里注意到select有3个参数,第一个是超时消息的容器,这里通过旁路进行输出
      //第二个参数里定义了超时的消息如何进行处理
      //第三个参数里定义里正常匹配到规则的消息如何进行处理
        SingleOutputStreamOperator<UserEvent> result = patternStream.select(timeOutTag, new PatternTimeoutFunction<UserEvent, UserEvent>() {
            @Override
            public UserEvent timeout(Map<String, List<UserEvent>> map, long l) throws Exception {
                System.out.println("这是超时了的:" + JSON.toJSONString(map));
                return map.get("order").get(0);
            }
        }, new PatternSelectFunction<UserEvent, UserEvent>() {
            @Override
            public UserEvent select(Map<String, List<UserEvent>> map) throws Exception {
                System.out.println("这是完成的订单:" + JSON.toJSONString(map));
                return map.get("pay").get(0);//这里Map有2个KEY,就是前面定义事件链的tag
            }
        });
      
        //步骤5、从旁路拿到结果流,完成超时触达
        DataStream<UserEvent> timeoutResult = result.getSideOutput(timeOutTag);
        timeoutResult.print();
        env.execute();
    }

}

三、需要注意的点
1、必须要是EventTime
尝试了ProcessTime取不到超时的结果,只能拿到匹配到规则的结果
2、within的含义
within在每一条消息到达时,为该消息开启一个定时器,当整个时间链都匹配到结果,则终止定时器,否则被视为超时。

例如第一个条件要累计多次的时候,当满足累积多次后,会重新开始 计时来计算超时。
3、.oneOrMore().where(...)
满足条件后开始计时,再within限定的时间段内如果有满足条件的数据进来则修改计时器重新计时(类似SessionWindow)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,233评论 6 495
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,357评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,831评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,313评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,417评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,470评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,482评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,265评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,708评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,997评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,176评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,503评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,150评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,391评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,034评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,063评论 2 352

推荐阅读更多精彩内容