flinksql计算实时计算uv和pv

1.处理时间处理场景

set execution.checkpointing.interval=4min;
set execution.checkpointing.timeout= 2min;
set execution.checkpointing.mode=EXACTLY_ONCE;
set execution.checkpointing.tolerable-failed-checkpoints=5;
set table.local-time-zone = 'Asia/Shanghai';
set table.exec.state.ttl=1day;

CREATE TABLE `kafka_source` (
    did,
    dt,
    proc_time AS PROCTIME()
) with (
--省略配置信息
);

CREATE VIEW base AS
SELECT  CAST(DATE_FORMAT (TUMBLE_START(proc_time,INTERVAL '1' MINUTES),'yyyyMMdd') AS int) AS `date`
       ,DATE_FORMAT (TUMBLE_START(proc_time,INTERVAL '1' MINUTES),'yyyy-MM-dd HH')         AS `hour`
       ,DATE_FORMAT (TUMBLE_START(proc_time,INTERVAL '1' MINUTES),'yyyy-MM-dd HH:mm')      AS `minute`
       ,COUNT(did)                                                                          AS pv
       ,SUM(if(rn = 1,1,0))                                                                 AS uv
FROM
(
    SELECT  proc_time
           ,did
           ,ROW_NUMBER() OVER (PARTITION BY dt,did ORDER BY proc_time) AS rn
    FROM
    (
        SELECT  *
        FROM `kafka_source`
    ) topn
) agg
GROUP BY  TUMBLE(proc_time,INTERVAL '1' MINUTES)
INSERT INTO target SELECT * FROM base;

2.事件时间处理场景

set execution.checkpointing.interval=4min;
set execution.checkpointing.timeout= 2min;
set execution.checkpointing.mode=EXACTLY_ONCE;
set execution.checkpointing.tolerable-failed-checkpoints=5;
set table.local-time-zone = 'Asia/Shanghai';
set table.exec.state.ttl=1day;

CREATE TABLE `kafka_source` (
    did,
    dt,
    event_time AS TO_TIMESTAMP_LTZ(deal_time,3),
    WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND
) with (
--省略配置信息
);

CREATE VIEW base AS
SELECT  CAST(DATE_FORMAT (TUMBLE_START(event_time,INTERVAL '1' MINUTES),'yyyyMMdd') AS int) AS `date`
       ,DATE_FORMAT (TUMBLE_START(event_time,INTERVAL '1' MINUTES),'yyyy-MM-dd HH')         AS `hour`
       ,DATE_FORMAT (TUMBLE_START(event_time,INTERVAL '1' MINUTES),'yyyy-MM-dd HH:mm')      AS `minute`
       ,COUNT(did)                                                                          AS pv
       ,SUM(if(rn = 1,1,0))                                                                 AS uv
FROM
(
    SELECT  event_time
           ,did
           ,ROW_NUMBER() OVER (PARTITION BY dt,did ORDER BY event_time) AS rn
    FROM
    (
        SELECT  *
        FROM `kafka_source`
    ) topn
) agg
GROUP BY  TUMBLE(event_time,INTERVAL '1' MINUTES)
INSERT INTO target SELECT * FROM base;
  • 注:flink版本为1.14,低版本时间函数使用方式不同
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容