package com.ctgu.flink.sql;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigners;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.*;
public class Flink_Sql_Window {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
String createSql =
"CREATE TABLE windowTable " +
" (" +
" `id` STRING," +
" `timestamp` BIGINT," +
" `address` STRING," +
" `value` DOUBLE," +
" `time_ltz` AS TO_TIMESTAMP_LTZ(`timestamp`, 3)," +
" `pt` AS PROCTIME()," +
" WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND" +
" )" +
" WITH (" +
" 'connector'='filesystem'," +
" 'format'='csv'," +
" 'csv.field-delimiter'=','," +
" 'path'='data/data.txt'" +
" )";
tableEnv.executeSql(createSql);
String slideWindowSql =
" select window_start, window_end, id, count(id) " +
"from Table(" +
" TUMBLE(Table windowTable, DESCRIPTOR(time_ltz), INTERVAL '4' seconds)) " +
"group by window_start, window_end, GROUPING SETS ((id), ())";
Table slideTable = tableEnv.sqlQuery(slideWindowSql);
tableEnv.toDataStream(slideTable, Row.class).print("slideTable");
String hopWindowSql =
" select window_start, window_end, id, count(id) " +
"from Table(" +
" HOP(Table windowTable, DESCRIPTOR(time_ltz), INTERVAL '4' seconds, INTERVAL '20' seconds)) " +
"group by window_start, window_end, ROLLUP (id)";
Table hopTable = tableEnv.sqlQuery(hopWindowSql);
tableEnv.toDataStream(hopTable, Row.class).print("hopTable");
String cumulateWindowSql =
" select window_start, window_end, id, address, count(id) " +
"from Table(" +
" CUMULATE(Table windowTable, DESCRIPTOR(time_ltz), INTERVAL '4' seconds, INTERVAL '20' seconds)) " +
"group by window_start, window_end, CUBE (id, address)";
Table cumulateTable = tableEnv.sqlQuery(cumulateWindowSql);
tableEnv.toDataStream(cumulateTable, Row.class).print("cumulateTable");
env.execute("Table SQL");
}
}
Flink-3.Flink SQL API
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
推荐阅读更多精彩内容
- Flink总共有三种时间语义:Processing time(处理时间)、Event time(事件时间)以及In...
- 一.基本程序结构 Table API和SQL的程序结构,与流式处理的程序结构十分类似; 也可以近似的认为有这么几步...
- 一.整体概述 1.1 什么是 Table API 和 Flink SQL Flink本身是批流统一的处理框架,...
- 一. 函数 Flink Table 和 SQL 内置了很多 SQL 中支持的函数;如果有无法满足的需要,则可以...
- 一. 流处理中的特殊概念 Table API 和 SQL,本质上还是基于关系型表的操作方式;而关系型表、关系代...