1、导入依赖
<!-- 使用table api 引入的依赖,使用桥接器和底层datastream api连接支持-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--如果需要在本地运行table api和sql 还需要引入一下依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--如果想实现自定义的数据格式来做序列化,需要引入一下依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!--连接外部数据格式解析,采用csv方式来解析-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
1.1、从文件中输入
路径:input/clicks.txt
Bob,./test/111,1000
Bob,./test/222,1000
Bob,./test/333,1000
Bob,./test/444,1000
image.png
2、输出到控制台demo
package com.flinktest.wc;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
public class CommApiTest1 {
public static void main(String[] args) throws Exception{
// 创建执行环境的两种方式,流方式 & 表方式
// 1 创建执行环境(流方式创建)
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setParallelism(1);
// StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2 创建执行环境(表方式创建) 基于alibaba 的 blink planner实现
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// 3 创建一张连接器表(输入表)
String createInDDL = "CREATE TABLE clickTable (" +
"user_name STRING, " +
"url STRING, " +
"ts BIGINT " +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = 'input/clicks.txt'," +
" 'format' = 'csv'" +
")";
tableEnv.executeSql(createInDDL);
// 4 创建一张连接器表(输出表)
String createOutDDL = "CREATE TABLE outTable (" +
"user_name STRING, " +
"url STRING " +
") WITH (" +
" 'connector' = 'filesystem'," +
" 'path' = 'output'," +
" 'format' = 'csv'" +
")";
tableEnv.executeSql(createOutDDL);
// 使用输入表
Table clickTable = tableEnv.from("clickTable");
// 使用table api方式来查询
Table resultTable = clickTable.where($("user_name").isEqual("Bob"))
.select($("user_name"), $("url"));
// 将结果表注册到临时表中,这样就可以使用这张表了
tableEnv.createTemporaryView("result2",resultTable);
// 使用sql 方式来查询查询
Table resultTable2 = tableEnv.sqlQuery("select user_name,url from result2");
// 输出表,输出到文件
// resultTable.executeInsert("outTable");
// 创建一张控制台打印的一张表
String createPrintOutDDL = "CREATE TABLE printOutTable (" +
"user_name STRING, " +
"url STRING " +
") WITH (" +
" 'connector' = 'print' " +
")";
tableEnv.executeSql(createPrintOutDDL);
// 输出到控制台
resultTable2.executeInsert("printOutTable");
}
}