1.处理时间处理场景
set execution.checkpointing.interval=4min;
set execution.checkpointing.timeout= 2min;
set execution.checkpointing.mode=EXACTLY_ONCE;
set execution.checkpointing.tolerable-failed-checkpoints=5;
set table.local-time-zone = 'Asia/Shanghai';
set table.exec.state.ttl=1day;
CREATE TABLE `kafka_source` (
did,
dt,
proc_time AS PROCTIME()
) with (
--省略配置信息
);
CREATE VIEW base AS
SELECT CAST(DATE_FORMAT (TUMBLE_START(proc_time,INTERVAL '1' MINUTES),'yyyyMMdd') AS int) AS `date`
,DATE_FORMAT (TUMBLE_START(proc_time,INTERVAL '1' MINUTES),'yyyy-MM-dd HH') AS `hour`
,DATE_FORMAT (TUMBLE_START(proc_time,INTERVAL '1' MINUTES),'yyyy-MM-dd HH:mm') AS `minute`
,COUNT(did) AS pv
,SUM(if(rn = 1,1,0)) AS uv
FROM
(
SELECT proc_time
,did
,ROW_NUMBER() OVER (PARTITION BY dt,did ORDER BY proc_time) AS rn
FROM
(
SELECT *
FROM `kafka_source`
) topn
) agg
GROUP BY TUMBLE(proc_time,INTERVAL '1' MINUTES)
INSERT INTO target SELECT * FROM base;
2.事件时间处理场景
set execution.checkpointing.interval=4min;
set execution.checkpointing.timeout= 2min;
set execution.checkpointing.mode=EXACTLY_ONCE;
set execution.checkpointing.tolerable-failed-checkpoints=5;
set table.local-time-zone = 'Asia/Shanghai';
set table.exec.state.ttl=1day;
CREATE TABLE `kafka_source` (
did,
dt,
event_time AS TO_TIMESTAMP_LTZ(deal_time,3),
WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND
) with (
--省略配置信息
);
CREATE VIEW base AS
SELECT CAST(DATE_FORMAT (TUMBLE_START(event_time,INTERVAL '1' MINUTES),'yyyyMMdd') AS int) AS `date`
,DATE_FORMAT (TUMBLE_START(event_time,INTERVAL '1' MINUTES),'yyyy-MM-dd HH') AS `hour`
,DATE_FORMAT (TUMBLE_START(event_time,INTERVAL '1' MINUTES),'yyyy-MM-dd HH:mm') AS `minute`
,COUNT(did) AS pv
,SUM(if(rn = 1,1,0)) AS uv
FROM
(
SELECT event_time
,did
,ROW_NUMBER() OVER (PARTITION BY dt,did ORDER BY event_time) AS rn
FROM
(
SELECT *
FROM `kafka_source`
) topn
) agg
GROUP BY TUMBLE(event_time,INTERVAL '1' MINUTES)
INSERT INTO target SELECT * FROM base;
- 注:flink版本为1.14,低版本时间函数使用方式不同