package com.ctgu.flink.sql;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigners;
import org.apache.flink.types.Row;
import static org.apache.flink.table.api.Expressions.*;
public class Flink_Sql_Window {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
String createSql =
"CREATE TABLE windowTable " +
" (" +
" `id` STRING," +
" `timestamp` BIGINT," +
" `address` STRING," +
" `value` DOUBLE," +
" `time_ltz` AS TO_TIMESTAMP_LTZ(`timestamp`, 3)," +
" `pt` AS PROCTIME()," +
" WATERMARK FOR time_ltz AS time_ltz - INTERVAL '5' SECOND" +
" )" +
" WITH (" +
" 'connector'='filesystem'," +
" 'format'='csv'," +
" 'csv.field-delimiter'=','," +
" 'path'='data/data.txt'" +
" )";
tableEnv.executeSql(createSql);
String slideWindowSql =
" select window_start, window_end, id, count(id) " +
"from Table(" +
" TUMBLE(Table windowTable, DESCRIPTOR(time_ltz), INTERVAL '4' seconds)) " +
"group by window_start, window_end, GROUPING SETS ((id), ())";
Table slideTable = tableEnv.sqlQuery(slideWindowSql);
tableEnv.toDataStream(slideTable, Row.class).print("slideTable");
String hopWindowSql =
" select window_start, window_end, id, count(id) " +
"from Table(" +
" HOP(Table windowTable, DESCRIPTOR(time_ltz), INTERVAL '4' seconds, INTERVAL '20' seconds)) " +
"group by window_start, window_end, ROLLUP (id)";
Table hopTable = tableEnv.sqlQuery(hopWindowSql);
tableEnv.toDataStream(hopTable, Row.class).print("hopTable");
String cumulateWindowSql =
" select window_start, window_end, id, address, count(id) " +
"from Table(" +
" CUMULATE(Table windowTable, DESCRIPTOR(time_ltz), INTERVAL '4' seconds, INTERVAL '20' seconds)) " +
"group by window_start, window_end, CUBE (id, address)";
Table cumulateTable = tableEnv.sqlQuery(cumulateWindowSql);
tableEnv.toDataStream(cumulateTable, Row.class).print("cumulateTable");
env.execute("Table SQL");
}
}
Flink-3.Flink SQL API
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- Flink总共有三种时间语义:Processing time(处理时间)、Event time(事件时间)以及In...
- 一.基本程序结构 Table API和SQL的程序结构,与流式处理的程序结构十分类似; 也可以近似的认为有这么几步...
- 一.整体概述 1.1 什么是 Table API 和 Flink SQL Flink本身是批流统一的处理框架,...
- 一. 函数 Flink Table 和 SQL 内置了很多 SQL 中支持的函数;如果有无法满足的需要,则可以...
- 一. 流处理中的特殊概念 Table API 和 SQL,本质上还是基于关系型表的操作方式;而关系型表、关系代...