描述:现在的任务是找出一个单一股票价格不断下降的时期。
一、Maven项目pom.xml
二、测试数据
- 输入:
- +----+--------------------------------+-------------------------+----------------------+----------------------+
| op | symbol | rt | price | tax |
+----+--------------------------------+-------------------------+----------------------+----------------------+
| +I | ACME | 2018-09-17T02:00 | 12 | 1 |
| +I | ACME | 2018-09-17T02:00:01 | 17 | 2 |
| +I | ACME | 2018-09-17T02:00:02 | 19 | 1 |
| +I | ACME | 2018-09-17T02:00:03 | 21 | 3 |
| +I | ACME | 2018-09-17T02:00:04 | 25 | 2 |
| +I | ACME | 2018-09-17T02:00:05 | 18 | 1 |
| +I | ACME | 2018-09-17T02:00:06 | 15 | 1 |
| +I | ACME | 2018-09-17T02:00:07 | 14 | 2 |
| +I | ACME | 2018-09-17T02:00:08 | 24 | 2 |
| +I | ACME | 2018-09-17T02:00:09 | 25 | 2 |
| +I | ACME | 2018-09-17T02:00:10 | 19 | 1 |
+----+--------------------------------+-------------------------+----------------------+----------------------+
三、代码
public class CepExample4 {
public static void main(String[] args) throws Exception {
// 1.创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 2.创建表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
URL resource = CepExample1.class.getResource("/ticker.txt");
DataStream<String> inputStream = env.readTextFile(resource.getPath());
DataStream<Ticker> dataStream = inputStream.map(line -> {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String[] fields = line.trim().split(",");
Long timestamp = sdf.parse(fields[1]).getTime();
return new Ticker(fields[0], timestamp, new Long(fields[2]), new Long(fields[3]));
}).assignTimestampsAndWatermarks(
WatermarkStrategy.<Ticker>forBoundedOutOfOrderness(Duration.ofSeconds(0))
.withTimestampAssigner((Ticker, timestamp) -> Ticker.getTimestamp()));
// dataStream转化成Table
tableEnv.createTemporaryView("Ticker", dataStream, $("symbol"), $("timestamp").rowtime().as("rt"), $("price"), $("tax"));
tableEnv.executeSql("SELECT * FROM Ticker").print();
tableEnv.executeSql("SELECT * " +
"FROM Ticker " +
"MATCH_RECOGNIZE (" +
"ORDER BY rt " +
"MEASURES " + // 定义子句的输出。
"START_ROW.rt AS start_tstamp, " +
"LAST(PRICE_DOWN.rt) AS bottom_tstamp, " +
"LAST(PRICE_UP.rt) AS end_tstamp " +
"ONE ROW PER MATCH " + // 对于每一次成功的匹配,只产生一个输出事件。
"AFTER MATCH SKIP TO LAST PRICE_UP " + // 匹配成功后,从匹配成功的事件序列中最后一个对应于patternItem的事件开始进行下一次匹配。
"PATTERN (START_ROW PRICE_DOWN+ PRICE_UP) " + // 尽管不存在 START_ROW 模式变量,但它具有一个始终被评估为 TRUE 隐式条件。
"DEFINE " +
"PRICE_DOWN AS " +
"(LAST(PRICE_DOWN.price, 1) IS NULL AND PRICE_DOWN.price < START_ROW.price) OR " + // LAST(PRICE_DOWN, 1)表示PATTERN PRICE_DOWN所对应的事件序列中的倒数第1个事件。
"PRICE_DOWN.price < LAST(PRICE_DOWN.price, 1), " +
"PRICE_UP AS " +
"PRICE_UP.price > LAST(PRICE_DOWN.price, 1) " +
") ").print();
env.execute();
}
}
四、结果
- 结果:
- +----+-------------------------+-------------------------+-------------------------+
| op | start_tstamp | bottom_tstamp | end_tstamp |
+----+-------------------------+-------------------------+-------------------------+
| +I | 2018-09-17T02:00:04 | 2018-09-17T02:00:07 | 2018-09-17T02:00:08 |
+----+-------------------------+-------------------------+-------------------------+