在之前的文章Hello FlinkCEP和FlinkCEP with EventTime介绍了FlinkCEP的基本使用方法,本文将介绍flink提供的sql方式实现模式匹配,即Detecting Patterns in Tables.
完整样例
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.formats.csv.CsvRowDeserializationSchema;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.util.Properties;
public class FlinkCEPSqlExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
final TableSchema tableSchema = new TableSchema(new String[]{"symbol","tax","price", "rowtime"}, new TypeInformation[]{Types.STRING, Types.STRING, Types.LONG, Types.SQL_TIMESTAMP});
final TypeInformation<Row> typeInfo = tableSchema.toRowType();
final CsvRowDeserializationSchema.Builder deserSchemaBuilder = new CsvRowDeserializationSchema.Builder(typeInfo).setFieldDelimiter(',');
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "host-10-1-236-139:6667");
FlinkKafkaConsumer010<Row> myConsumer = new FlinkKafkaConsumer010<>(
"foo",
deserSchemaBuilder.build(),
properties);
myConsumer.setStartFromLatest();
DataStream<Row> stream = env.addSource(myConsumer).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator());
tableEnv.registerDataStream("Ticker", stream, "symbol,tax,price,rowtime.rowtime");
Table result = tableEnv.sqlQuery("SELECT * " +
"FROM Ticker " +
" MATCH_RECOGNIZE( " +
" PARTITION BY symbol " +
" ORDER BY rowtime " +
" MEASURES " +
" A.price AS firstPrice, " +
" B.price AS lastPrice " +
" ONE ROW PER MATCH " +
" AFTER MATCH SKIP PAST LAST ROW " +
" PATTERN (A+ B) " +
" DEFINE " +
" A AS A.price < 10, " +
" B AS B.price > 100 " +
" )");
final TableSchema tableSchemaResult = new TableSchema(new String[]{"symbol","firstPrice","lastPrice"}, new TypeInformation[]{Types.STRING, Types.LONG, Types.LONG});
final TypeInformation<Row> typeInfoResult = tableSchemaResult.toRowType();
DataStream ds = tableEnv.toAppendStream(result, typeInfoResult);
ds.print();
env.execute("Flink CEP via SQL example");
}
private static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<Row> {
private final long maxOutOfOrderness = 5000;
private long currentMaxTimestamp;
@Override
public long extractTimestamp(Row row, long previousElementTimestamp) {
System.out.println("Row is " + row);
long timestamp = StringUtilsPlus.dateToStamp(String.valueOf(row.getField(3)));
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
System.out.println("watermark:" + StringUtilsPlus.stampToDate(String.valueOf(currentMaxTimestamp - maxOutOfOrderness)));
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
}
样例说明
-
PARTITION BY symbol
根据symbol字段进行逻辑分区 -
ORDER BY rowtime
根据事件时间进行排序,避免时序混乱 -
MEASURES
里面定义了要输出的字段 -
ONE ROW PER MATCH
为输出模式,另一种模式为ALL ROWS PER MATCH
,但是目前flink1.9中只支持ONE ROW PER MATCH
-
AFTER MATCH SKIP PAST LAST ROW
匹配后的事件丢弃策略,这种策略保证每个事件最多匹配一次
- SKIP PAST LAST ROW - resumes the pattern matching at the next row after the last row of the current match.
- SKIP TO NEXT ROW - continues searching for a new match starting at the next row after the starting row of the match.
- SKIP TO LAST variable - resumes the pattern matching at the last row that is mapped to the specified pattern variable.
- SKIP TO FIRST variable - resumes the pattern matching at the first row that is mapped to the specified pattern variable.
PATTERN (A+ B)
定义了两个模式的关系,如果是(A B)
表示两个事件必须第一个事件满足A的条件,第二个事件满足B的条件。(A+ B)
的含义就是在满足B的条件之前,可以有1个或多个事件满足A的条件。具体规则参见Defining a PatternDEFINE
中定义了具体的每个模式的规则虽然是以sql的形式运行,但是最终内部的运行逻辑和FlinkCEP with EventTime中介绍的是一致的,只是flink对SQL进行了解析
总结
本文提供了一个简单的FlinkCEP SQL样例,可以让读者快速体验FlinkCEP SQL的运行效果,本例的业务含义可以理解为,连续小额交易后突然有一笔大额交易即输出结果。