SQL API 是 Flink 中最顶级的 API , 它构建了 Table API 之上, 也可以方便的和 Table 做转换, 构建 SQL 所使用的Environment 也是 Table Environment . Flink SQL 底层使用 Apache Calcite 框架, 将标准的 Flink SQL 语句解析并转换成底层的算子处理逻辑. 下面就直接用 Flink 官方仓库中的 案例 Code Link来做一个演示.
- 获取执行环境
// 首先同样有流处理和批处理的区别,
// 获取对应的environment之后直接转换为Table environment ,
// 就可以使用SQL API,
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
- 拿到要操作的表
// 将 Stream 转换为 Table, 可以采用不同的办法
// 将 DataStream 转换为 Table
Table tableA = tEnv.fromDataStream(orderA, "user, product, amount");
// 将 DataStream 注册成 Table
tEnv.registerDataStream("OrderB", orderB, "user, product, amount");
- 执行SQL语句
// TableEnvironment 有 SqlQuery 和 SqlUpdate 两种操作符可以使用
// union 两个 table
Table result = tEnv.sqlQuery("SELECT * FROM " + tableA + " WHERE amount > 2 UNION ALL " +"SELECT * FROM OrderB WHERE amount > 2");
SQL可以执行许多复杂的操作,本文先简单的了解下 SQL 的API