Flink_CEP

Flink的复杂事件处理CEP

概述:
    复杂事件处理(CEP) 是一种基于流处理的技术,将系统数据看作不同类型的事件,通过分析事件之间的关系,建立不同
    的事件关系序列库,并利用过滤、关联、聚合等技术,最终由简单事件产生高级事件,并通过模式规则的方式对重要信息
    进行跟踪和分析,从实时数据中发掘有价值的信息.复杂事件处理主要应用于防范网络欺诈、设备故障检测、风险规避
    和智能营销等领域,
实现:
    Flink基于DataStream API提供了Flink CEP组件栈,专门用于对复杂事件的处理,帮助用户从流数数据中发掘有价值的信息.

CEP相关概念:

网页介绍:
    https://blog.csdn.net/weixin_45549790/article/details/100552515
1. 配置依赖:
    在使用Flink CEP组件之前,需要将Flink CEP的依赖库引入项目工程中.
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-cep-scala_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
2. 事件定义
概述:
    1. 简单事件: 
        简单事件存在于现实场景中,主要的特点为处理单一事件,事件的定义可以直接观察出来,处理过程中无须关注多个事件
        之间的关系,能够通过简单的数据处理手段将结果计算出来.
    2. 复杂事件:
        相对于简单事件,复杂事件处理的不仅是单一的事件,也处理由多个事件组成的复合事件,复杂事件处理监测分析事件流
        (Event Streaming) ,当特定事件发生时来触发某些动作.
    复杂事件中事件与事件之间包含多种类型关系,常见的有时序关系、聚合关系、层次关系、依赖关系及因果关系等.

Pattern API

概述:
    FlinkCEP中提供了Pattern API用于对输入流数据的复杂事件规则定义,并从事件流中抽取事件结果,包含四个步骤.
    1. 输入事件流的创建
    2. Pattern的定义
    3. Pattern应用在事件流上检测
    4. 选取结果
1. 模式定义
概述:
    定义Pattern可以是单次执行模式,也可以是循环执行模式。单次执行模式一次只接受一个事件,循环执行模式可以接受
    一个或者多个事件,通常情况下,可以通过指定循环次数将单次执行模型变为循环执行 模式,每种模式能够将多个条件
    组合应用到同一事件之上,条件组合可以通过where方法进行叠加,每个Pattern都是通过begin方法定义的.
实现:
    val start  = Pattern.begin[Event]("start_pattern")
    下一步通过Pattern.where()方法在Pattern上指定Condition,只有当Condition满足之后,当前的Pattern才会接受事件
    start.where(_.getCallType=="success")
1.1 设置循环次数
概述: 
    对于已经创建好的Pattern,可以指定循环次数,形成循环执行的Pattern.
1. times:
    可以通过times指定固定的循环执行次数
    // 指定循环触发4次
    start.times(4)
    // 可以执行触发次数范围,让循环执行次数在该范围之内
    start.times(2,4);
2. optional:  
    也可以通过optional关键字指定要么不触发要么触发指定的次数
    start.times(4).optional();
    start.times(2,4).optional();
3. greedy:
    可以通过greedy将Pattern标记为贪婪模式,在Pattern匹配成功的前提下,会尽可能多地触发
    // 触发2,3,4次,尽可能重复执行
    start.times(2,4).greedy();
    // 触发0,2,3,4次,尽可能重复执行
    start.times(2,4).optional().greedy();
4. oneOrMore: 可以通过oneOrMore方法执行触发一次或多次
    // 触发一次或者多次
    start.oneOrMore();
    // 触发一次或者多次,尽可能重复执行
    start.oneOrMore().greedy();
    // 触发0次或者多次
    start.oneOrMore().optional();
    // 触发0次或者多次,尽可能重复执行
    start.oneOrMore().optional().greedy();
5. timesOgmore
    通过timesOrMore方法可以指定固定次数以上,例如执行两次以上
    //触发两次或者多次
    start.timesOrMore(2);
    // 触发两次或者多次,尽可能重复执行
    start.timesOrMore(2).greedy();
    // 不触发护着触发两次以上,尽可能重复执行
    start.timesOrMore(2).optional().greedy();
2. 定义条件
概述:
    每个模式都需要指定触发条件,作为事件进入到该模式是否接受的判断依据,当事件中的数值满足了条件时,
    便进行下一步操作,在Flink CEP中通过pattern.where()、pattern.or()以及pattern.until()方法来为
    Pattern指定条件,且Pattern条件有Simple Conditions及Combining Conditions等类型
1. 简单条件:
    Simple Condition 继承于Iterative Condition类,其主要根据事件中的字段信息进行判断,决定是否接受该事件
    // 把通话成功的事件挑选出来
    start.where(_.getCallType=='success')
2. 组合条件:
    组合条件是将简单条件进行合并,通常情况下也可以使用where方法进行条件的组合,默认每个条件通过AND逻辑相连,
    如果需要使用OR逻辑,直接使用or方法连接条件即可.
    // 把通话成功,或者通话时长大于10s的事件挑选出来.
    val start = Pattern.begin[StationLog]("start_pattern")
    .where(_.callType=="success")
    .or(_.duration>10)
3. 终止条件:
    如果程序中使用了oneOrMore或者oneOrMore().optional()方法,则必须制定终止条件,否则模式中的规则会一直
    循环下去,如下终止条件通过until()方法制定
    pattern.oneOrMore.until(_.callOut.startsWith("186"))
3. 模式序列
概述:
    将相互独立的模式进行组合然后形成模式序列,模式序列基本的编写方式和独立模式一致,各个模式之间通过邻近
    条件进行连接即可,其中有严格邻近、宽松邻近、非确定宽松邻近三种邻近连接条件
1. 严格邻近:
    严格邻近条件中,需要所有的事件都按照顺序满足模式条件,不允许忽略任意不满足的模式
    val strict:Pattern[Event] = start.next("middel").where(...)
2. 宽松邻近:
    在宽松邻近条件下,会忽略没有成功匹配模式条件,并不会像严格邻近要求得那么高,那么可以简单理解为OR的逻辑关系
    val relaxed:Pattern[Event,_] = start.followedBy("middle").where(...)
3. 非确定宽松邻近:
    和宽松邻近条件相比,非确定宽松邻近条件旨在模式匹配过程中可以忽略已经匹配的条件.
    val nonDetermin:Pattern[Event,_] = start.followedByAny("middle").where(...)
4. 额外:
    除以上模式序列外,还可以定义" 不希望出现某种邻近关系":
    .notNext() ---- 不想让某个事件严格紧邻前一个事件发生
    .notFollowedBy() ---- 不想让某个事件在两个事件之间发生
注意:
    1. 所有模式序列必须以 .begin()开始
    2. 模式序列不能以 .notFollowedBy() 结束
    3. "not" 类型的模式不能被 optional所修饰
    4. 此外,还可以为模式指定时间约束,用来要求在多长时间内匹配有效
    // 指定模式在10s内有效
    pattern.within(Time.seconds(10));

2. 模式检测:

概述:
    调用 CEP.pattern() ,给定输入流和模式,就能得到一个PatternStream
    //CEP 做模式检测
    val patternStream = CEP.pattern[EventLog](dataStream++.keyBy(_.id),pattern)

3. 选择结果

概述:
    得到PatternStream类型的数据集后,接下来数据获取都基于PatternStream,该数据集中包含了所有的匹配事件.
    目前在FlinkCEP中提供select 和flatSelect两种方法从PatternStream提取事件结果事件
1. 通过Select Function抽取正常事件
概述:
    可以通过在PatternStream的Select方法中传入自定义Select Function完成对匹配事件的转换与输出.
    其中Select Function的输入参数为Map[String,Iterable[In]],Map中的Key为模式序列中的Pattern名称,
    Value为对应Pattern所接受的事件集合,格式为输入事件的数据类型
code:
    def selectFunction(pattern : Map[String, Iterable[IN]]): OUT = {
    //获取pattern中的startEvent
    val startEvent = pattern.get("start_pattern").get.next
    
    //获取Pattern中middleEvent
    val middleEvent = pattern.get("middle").get.next
    //返回结果
    OUT(startEvent, middleEvent)
    }
2. 通过Flat Select Function抽取正常事件
概述:
    Flat Select Funciton 和 Select Function 相似,不过 Flat Select Funciton 在每次
    调用可以返回任意数量的结果。因为 Flat Select Funciton 使用 Collector 作为返回结果
    的容器,可以将需要输出的事件都放置在 Collector 中返回
code:
    def flatSelectFn(pattern : Map[String, Iterable[IN]], collector :
    Collector[OUT]) = {
    //获取pattern中startEvent
    val startEvent = pattern.get("start_pattern").get.next
    //获取Pattern中middleEvent
    val middleEvent = pattern.get("middle").get.next
    //并根据startEvent的Value数量进行返回
    for (i <- 0 to startEvent.getValue) {
    collector.collect(OUT(startEvent, middleEvent))
    }}
3.通过 Select Funciton 抽取超时事件
案例:
    如果模式中有 within(time), 那么就很有可能有超时的数据存在, 通过 PatternStream.
    Select 方法分别获取超时事件和正常事件。首先需要创建 OutputTag 来标记超时事件,然
    后在 PatternStream.select 方法中使用 OutputTag,就可以将超时事件从 PatternStream
    中抽取出来。
    // 通过CEP.pattern方法创建PatternStream
    val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
    //创建OutputTag,并命名为timeout-output
    val timeoutTag = OutputTag[String]("timeout-output")
    //调用PatternStream select()并指定timeoutTag
    val result: SingleOutputStreamOperator[NormalEvent] =
    patternStream.select(timeoutTag){
    //超时事件获取
    (pattern: Map[String, Iterable[Event]], timestamp: Long) =>
    TimeoutEvent()//返回异常事件
    } {
    //正常事件获取
    pattern: Map[String, Iterable[Event]] =>
    101
    NormalEvent()//返回正常事件
    }
    //调用getSideOutput方法,并指定timeoutTag将超时事件输出
    val timeoutResult: DataStream[TimeoutEvent] =
    result.getSideOutput(timeoutTag)
案例:
需求:
    从一堆的登录日志中,匹配一个恶意登录的模式(如果一个用户连续失败3次则是恶意登录),从而
    找到那些用户名是恶意登录
code:
    package FlinkDemo.cep
    
    import java.util
    
    import org.apache.flink.cep.PatternSelectFunction
    import org.apache.flink.cep.scala.CEP
    import org.apache.flink.cep.scala.pattern.Pattern
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.windowing.time.Time
    
    /**
      * 登录告警系统
      * 从一堆的登录日志中,匹配一个恶意登录的模式(如果一个用户连续失败3次,则是恶意登录)
      * 从而找到那些用户名是恶意 登录
      */
    
    object TestCepDemo {
    
      case class EventLog(id: Long, userName: String, eventType: String, eventTime: Long)
    
      def main(args: Array[String]): Unit = {
    
        // 初始化Flink的Streaming(流计算)上下文执行环境
        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
        // 指定EventTime为时间语义
        streamEnv.setParallelism(1)
        // 导入隐式类型转换
        import org.apache.flink.streaming.api.scala._
        // 设置流处理时间为事件时间
        streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        val stream = streamEnv.fromCollection(List(
          new TestCepDemo.EventLog(1, "张三", "fail", 1574840003),
          new TestCepDemo.EventLog(1, "张三", "fail", 1574840004),
          new TestCepDemo.EventLog(1, "张三", "fail", 1574840005),
          new TestCepDemo.EventLog(2, "李四", "fail", 1574840006),
          new TestCepDemo.EventLog(2, "李四", "sucess", 1574840007),
          new TestCepDemo.EventLog(1, "张三", "fail", 1574840008))
        ).assignAscendingTimestamps(_.eventTime * 1000)
        stream.print("input_data")
        // 定义模式
        val pattern = Pattern.begin[EventLog]("begin").where(_.eventType.equals("fail"))
          .next("next1").where(_.eventType.equals("fail"))
          .next("next2").where(_.eventType.equals("fail"))
          .within(Time.seconds(10))
        // CEP做模式检测
        val patternStream = CEP
          .pattern[EventLog](stream.keyBy(_.id), pattern)
    
        // 第三步: 输出alert
        val result = patternStream.select(new PatternSelectFunction[EventLog, String] {
          override def select(map: util.Map[String, util.List[EventLog]]): String = {
            val iter = map.keySet().iterator()
            val e1 = map.get(iter.next()).iterator().next()
            val e2 = map.get(iter.next()).iterator().next()
            val e3 = map.get(iter.next()).iterator().next()
            "id:" + e1.id + " 用户名:" + e1.userName + "登录的时间:" + e1.eventTime + ":" + e2.eventTime + ":" + e3.eventTime
          }
        })
        result.print("main")
        streamEnv.execute("sql")
    
      }
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352

推荐阅读更多精彩内容