Flink 实践教程-进阶(5):排序(乱序调整)

作者:腾讯云流计算 Oceanus 团队

流计算 Oceanus 简介

流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点的企业级实时大数据分析平台。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。本文将为您详细介绍如何使用 Windowing TVF 配合聚合函数,实时调整乱序数据,经过聚合分析后存入 MySQL 中。
操作视频→

前置准备

创建流计算 Oceanus 集群

进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。

创建消息队列 CKafka

进入 CKafka 控制台 [3],点击左上角【新建】,即可完成 CKafka 的创建,具体可参考 CKafka 创建实例 [4]。创建 Topic: 进入 CKafka 实例,点击【topic 管理】>【新建】,即可完成 Topic 的创建,具体可参考 CKafka 创建 Topic [5]。数据准备: 进入同子网的 CVM 下,启动 Kafka 客户端,模拟发送数据,具体操作参见 运行 Kafka 客户端 [6]。

# 启动 Kafka 生产者命令
// 按顺序插入如下数据,注意这里数据时间是乱序的

创建 MySQL 实例

进入 MySQL 控制台 [7],点击【新建】。具体可参考官方文档 创建 MySQL 实例 [8]。

-- 建表语句

流计算 Oceanus 作业

1. 创建 Source

CREATE TABLE `kafka_json_source_table` (

2. 创建 Sink

CREATE TABLE `jdbc_upsert_sink_table` (

3. 编写业务 SQL

INSERT INTO `jdbc_upsert_sink_table`

4. 查询数据

进入 MySQL 控制台 [7],单击右侧【登陆】快速登陆数据库,选择相应的库表查询数据。

image

笔者这里设置的 10s 的延迟水印,可以看到在 29~3030~31时间段的数据统计是正确,并没有因为数据延时而出现漏统计的现象。31~32时间段的数据并没有统计出来,这是因为我们最后一条数据时间是2021-12-22 14:31:15,其水印时间为2021-12-22 14:31:05,小于窗口关闭时间,导致这段时间窗口还未关闭、未计算。

总结

  • WARTERMARK是跟随在每条数据上的一条特殊标签,而且只增不减(可以相等)。WARTERMARK并不能影响数据出现在哪个窗口(本例中由event_time决定),其主要决定窗口是否关闭(当水印时间大于窗口结束时间时,窗口关闭并计算)。

  • 如果数据延时过大,例如小时级别,可以配合allowedLateness算子合理性使用WARTERMARK,当达到水印结束时间时,窗口并不关闭,只进行计算操作,当时间到达allowedLateness算子设置的时间后,窗口才真正关闭,并在原先的基础上再次进行计算。如在allowedLateness算子设置的时间后才达到的数据,我们可以使用sideOutputLateData算子将迟到的数据输出到侧输出流进行计算。这里需要注意allowedLatenesssideOutputLateData算子目前只能使用 Stream API 实现。

  • 目前 flink 1.13 的 Windowing TVF 函数并不能单独使用,需配合AGGREGATEJOINTOPN使用。建议优先使用 Windowing TVF 实现窗口聚合等功能,因为 Windowing TVF 更符合 SQL 书写规范,底层优化逻辑也更好。

参考链接

[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview [2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298 [3] CKafka 控制台:https://console.cloud.tencent.com/ckafka/index?rid=1 [4] CKafka 创建实例:https://cloud.tencent.com/document/product/597/54839 [5] Ckafka 创建 Topic:https://cloud.tencent.com/document/product/597/54854 [6] 运行 Kafka 客户端:https://cloud.tencent.com/document/product/597/56840 [7] MySQL 控制台:https://console.cloud.tencent.com/cdb [8] 创建 MySQL 实例:https://cloud.tencent.com/document/product/236/46433

流计算 Oceanus ****限量秒杀专享活动火爆进行中→

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容