Flink 1.14.3
package com.ctgu.flink.sql;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.*;
public class Flink_Table_Window {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String createSql = "CREATE TABLE windowTable " +
" (" +
" `id` STRING," +
" `timestamp` BIGINT," +
" `value` DOUBLE," +
" `time_ltz` AS TO_TIMESTAMP_LTZ(`timestamp`, 3)," +
" `pt` AS PROCTIME()," +
" WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND" +
" )" +
" WITH (" +
" 'connector'='filesystem'," +
" 'format'='csv'," +
" 'csv.field-delimiter'=' '," +
" 'path'='data/dataInfo.txt'" +
" )";
tableEnv.executeSql(createSql);
Table table = tableEnv.from("windowTable");
table.printSchema();
Table tumble = table.window(Tumble.over(lit(4).seconds()).on($("time_ltz")).as("tw"))
.groupBy($("tw"), $("id"))
.select($("id"), $("value").count(), $("tw").end().toTime());
Table slide = table.window(Slide.over(lit(20).seconds()).every(lit(4).seconds()).on($("time_ltz")).as("tw"))
.groupBy($("tw"), $("id"))
.select($("id"), $("value").count(), $("tw").end().toTime());
Table result = table.select($("id"), $("value"), $("time_ltz").toTime());
tableEnv.toDataStream(result, Row.class).print("result");
tableEnv.toDataStream(tumble, Row.class).print("tumble");
tableEnv.toDataStream(slide, Row.class).print("slide");
Table rank = table.window(
Over.partitionBy($("id"))
// .orderBy($("pt")).preceding(rowInterval(10L)).as("ow"))
.orderBy($("time_ltz")).preceding(lit(4).seconds()).as("ow"))
.select($("id"),
$("value"),
$("value").max().over($("ow")).as("value_max"),
$("time_ltz"));
rank.printSchema();
tableEnv.toChangelogStream(rank, Schema.newBuilder()
.column("id", "STRING")
.column("score", "DOUBLE")
.column("value_max", "DOUBLE")
.column("time_ltz", "TIMESTAMP_LTZ(3)")
.build()).print("rank");
result.printSchema();
env.execute("Table SQL");
}
}
maven 配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.ctgu</groupId>
<artifactId>flink_class</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<flink-version>1.14.3</flink-version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.12</artifactId>
<version>${flink-version}</version>
</dependency>
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.2-test3</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
</dependency>
</dependencies>
</project>