Flink 使用之 CEP(SQL方式)

Flink 使用介绍相关文档目录

Flink 使用介绍相关文档目录

SQL match recognize

本篇为Flink 使用之 CEP后续。如果您对CEP不了解,请先浏览Flink 使用之 CEP

SQL的match_recognize子句给予了SQL支持CEP的能力。下面逐个介绍match_recognize子句的组成部分。

partition by 从句

分区基准字段,类似于DataStream API的keyby。将业务数据划分为多个组,分别统计。该字段会作为查询结果返回。

order by 从句

排序基准字段,实际业务中通常为时间戳。

measures 从句

定义如何输出匹配的结果,类似于select语句。需要从匹配pattern中取出结果并映射字段名。

输出结果的schema为partition by基准字段 + measures中的字段。

可以使用聚合函数。CEP业务中除了常用的数学运算聚合函数外,常用的还有:

  • first:返回一系列有序元素的第一个。
  • last:返回一系列有序元素的最后一个。

这两个函数适用于从"X+"这种Pattern获取第一个满足X条件或最后一个满足X条件的元素。

one row per match/all rows per match

这一项为SQL的输出模式配置,用来规定每次匹配过后输出多少行。

目前Flink只支持one row per match。所以说和使用写代码CEP不同的是,SQL方式及时一次匹配到多个元素,也只能输出一个。所以说我们需要使用聚合函数进行计算。或者是在measures中,将匹配到的元素内容取出,作为输出表的字段展示(打宽)。

匹配后跳转模式

和Pattern API中匹配后跳过策略相同。

  • after match skip to next row:从匹配成功的事件序列中的第一个事件的下一个事件开始进行下一次匹配
  • after match skip past last row:从匹配成功的事件序列中的最后一个事件的下一个事件开始进行下一次匹配
  • after match skip to first patternName:从匹配成功的事件序列中第一个对应于patternName的事件开始进行下一次匹配
  • after match skip to last patternName:从匹配成功的事件序列中最后一个对应于patternName的事件开始进行下一次匹配

pattern 从句

pattern从句格式为pattern (xxx) within interval 'xxx' time_unit。表示捕获符合xxx pattern的一系列元素,且这些元素的时间需要在within后面的时间范围内。

Pattern从句括号内需要填写pattern变量表达式,格式类似于正则表达式。例如:

PATTERN (A B+ C* D)

表示A之后跟随1个或多个B,然后0个或多个C,最后为D。

注意:pattern(A B)的含义为A和B必须紧密相连(相当于Pattern API中的consecutive)。

Pattern的数量限定符和正则表达式使用方式类似。我们引用官网的解释。

  • * — 0 or more rows
  • + — 1 or more rows
  • ? — 0 or 1 rows
  • { n } — exactly n rows (n > 0)
  • { n, } — n or more rows (n ≥ 0)
  • { n, m } — between n and m (inclusive) rows (0 ≤ n ≤ m, 0 < m)
  • { , m } — between 0 and m (inclusive) rows (m > 0)

除此之外,Pattern还支持greedy和reluctant。数量限定符默认是greedy类型,表示匹配尽可能多的元素,相反,reluctant类型回去匹配尽可能少的元素。和正则表达式类似,将数量限定符从greedy转化为reluctant类型需要在限定符后加一个?字符。

define 从句

defines从句用于为pattern表达式配置精确的匹配条件。相当于Pattern API中的where方法。

到这里为止SQL match_recognize各个从句的功能和编写方法已经介绍完了。但是离开实际案例,我们还是不能很好的掌握具体的用法。下一节我们引入一个具体的案例。

使用示例

场景:我们有实时的机房温度监控数据,机房温度过高会触发告警。我们想知道每个机房每次告警的起止时间和平均温度等数据。

数据源配置

首先我们配置数据源。实际生产中常用的数据源是Kafka。练习环境为了配置简单,我们使用SQL从CSV文件读入数据的方式。

要读取CSV格式,首先要引入依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>${flink.version}</version>
</dependency>

然后准备编造好的temp_record.csv数据文件。其中每列的含义分别为:机架编号,时间戳,温度。

1,2021-09-14 15:37:03.3,30
1,2021-09-14 15:37:13.3,50
1,2021-09-14 15:37:23.3,55
1,2021-09-14 15:37:33.3,60
1,2021-09-14 15:37:43.3,55
1,2021-09-14 15:37:53.3,50
1,2021-09-14 15:38:03.3,45
2,2021-09-14 15:37:03.3,30
2,2021-09-14 15:37:13.3,50
2,2021-09-14 15:37:23.3,55
2,2021-09-14 15:37:33.3,45
2,2021-09-14 15:37:43.3,55
2,2021-09-14 15:37:53.3,50
2,2021-09-14 15:38:03.3,45

注意:CSV最后必须要有空行,否则解析时候报错。

接下来根据实例数据的schema,编写create table语句并检查是否能正常读入数据。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val createTableSql =
  """
    |create table temp_record (
    |rack_id int,
    |ts timestamp(3),
    |temp int,
    |WATERMARK FOR ts AS ts - INTERVAL '1' SECOND)
    |with (
    |'connector'='filesystem',
    |'path'='/path/to/temp_record.csv',
    |'format'='csv'
    |)
    |""".stripMargin

tEnv.executeSql(createTableSql)

tEnv.executeSql("select * from temp_record").print()

如果一切无误,我们可以看到控制台打印出了CSV文件的内容,如下所示:

+----+-------------+-------------------------+-------------+
| op |     rack_id |                      ts |        temp |
+----+-------------+-------------------------+-------------+
| +I |           1 | 2021-09-14 15:37:53.300 |          50 |
| +I |           1 | 2021-09-14 15:37:23.300 |          55 |
| +I |           1 | 2021-09-14 15:37:43.300 |          55 |
| +I |           2 | 2021-09-14 15:37:43.300 |          55 |
| +I |           1 | 2021-09-14 15:38:03.300 |          45 |
| +I |           2 | 2021-09-14 15:37:03.300 |          30 |
| +I |           1 | 2021-09-14 15:37:03.300 |          30 |
| +I |           1 | 2021-09-14 15:37:13.300 |          50 |
| +I |           2 | 2021-09-14 15:37:33.300 |          45 |
| +I |           2 | 2021-09-14 15:37:13.300 |          50 |
| +I |           1 | 2021-09-14 15:37:33.300 |          60 |
| +I |           2 | 2021-09-14 15:37:53.300 |          50 |
| +I |           2 | 2021-09-14 15:38:03.300 |          45 |
| +I |           2 | 2021-09-14 15:37:23.300 |          55 |
+----+-------------+-------------------------+-------------+

到这里数据源配置完毕。

编写业务逻辑

在这个例子中。我们假设机器告警温度为大于等于50度。我们需要查询出每个机架的高温告警开始时间,高温告警结束时间,告警起始温度,告警结束温度和告警期间平均温度。按照业务需求,我们编写SQL如下。

val cepSql =
  """
    |select * from temp_record
    |match_recognize(
    |partition by rack_id
    |order by ts
    |measures
    |A.ts as start_ts,
    |last(B.ts) as end_ts,
    |A.temp as start_temp,
    |last(B.temp) as end_temp,
    |avg(B.temp) as avg_temp
    |one row per match
    |after match skip to next row
    |pattern (A B+ C) within interval '90' second
    |define
    |A as A.temp < 50,
    |B as B.temp >= 50,
    |C as C.temp < 50
    |)
    |""".stripMargin

tEnv.executeSql(cepSql).print()

执行结果:

+----+-------------+-------------------------+-------------------------+-------------+-------------+-------------+
| op |     rack_id |                start_ts |                  end_ts |  start_temp |    end_temp |    avg_temp |
+----+-------------+-------------------------+-------------------------+-------------+-------------+-------------+
| +I |           2 | 2021-09-14 15:37:03.300 | 2021-09-14 15:37:23.300 |          30 |          55 |          52 |
| +I |           2 | 2021-09-14 15:37:33.300 | 2021-09-14 15:37:53.300 |          45 |          50 |          52 |
| +I |           1 | 2021-09-14 15:37:03.300 | 2021-09-14 15:37:53.300 |          30 |          50 |          54 |
+----+-------------+-------------------------+-------------------------+-------------+-------------+-------------+

SQL部分解析:

  • 我们需要将不同rack_id的数据分开统计,因此需要partition by rack_id
  • measures中使用last(B.ts) as end_ts获取最后一个对应pattern B的元素, 由define子句可知,pattern B为温度大于等于50的元素。所以它的含义为高温告警期间最后一个告警的温度。
  • pattern by子句和measures子句两者共同决定的输出表格的schema。
  • pattern子句和define子句两者一起确定了匹配模板,含义为一个温度低于50度的元素,紧跟一个或多个温度大于等于50度的元素,然后再跟一个温度低于50度的元素。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,012评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,628评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,653评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,485评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,574评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,590评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,596评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,340评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,794评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,102评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,276评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,940评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,583评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,201评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,441评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,173评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,136评论 2 352

推荐阅读更多精彩内容