flink学习笔记

相关文档:

Doris官方文档

Flink v1.18 中文文档

Flink v1.18 官方文档

Flink SQL 系列文档

Flink相关拓展应用:

Dinky

StreamPark

相关概念

ETL(Extract-Transform-Load ):

用来描述将数据从来源迁移到目标的几个过程,提取、转换、加载。

常用的ETL工具有kettle、DataX、Canal等

ODS(Operational Data Store ):

作为数据库到数据仓库的一种过渡,ODS的数据结构一般与数据来源保持一致,便于减少ETL的工作复杂性。在源数据装入这一层时,要进行诸如去噪(例如有一条数据中人的年龄是 300 岁,这种属于异常数据,就需要提前做一些处理)、去重(例如在个人资料表中,同一 ID 却有两条重复数据,在接入的时候需要做一步去重)、字段命名规范等一系列操作。

DW (Data Warehouse ):

数据仓库,是数据的归宿,这里保持这所有的从ODS到来的数据,并长期保存,而且这些数据不会被修改。

DM(Data Mart):

数据集市,为了特定的应用目的或应用范围,而从数据仓库中独立出来的一部分数据,也可称为部门数据或主题数据。按照业务划分,如流量、订单、用户等,生成字段比较多的宽表,用于提供后续的业务查询,OLAP分析,数据分发等。

目前数据流转结构,如图:

简单总结:通过Flink收集MySql业务数据到Doris中的ODS层和DM层数据库中,应用读取从DM层读取数据。

Table Api/DataStream

StreamExecutionEnvironment

streamExecutionEnvironment-史上最通俗易懂的Flink源代码深入分析教程

StreamExecutionEnvironment是Flink中用于定义和执行流处理程序的主要类。它提供了一系列函数和方法来配置流处理程序的执行环境(例如并行度、checkpoint、时间特性),并将其部署到Flink集群中运行。

配置示例代码:

 // 创建一个 StreamExecutionEnvironment 对象 env,这是 Flink 流处理程序的入口点,用于配置和执行流处理任务。
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 // 启用检查点机制,并设置检查点的时间间隔为 300000 毫秒(即 30 秒)。这意味着 Flink 每 10 秒会自动触发一次检查点。
 env.enableCheckpointing(300000);
 // 配置检查点模式为 EXACTLY_ONCE,这意味着 Flink 会确保每个数据记录在发生故障后恰好被处理一次,从而保证数据的一致性和准确性。
 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
 // 设置两次检查点之间的最小暂停时间间隔为 500 毫秒。这可以防止频繁的检查点操作对性能造成影响。
 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
 // 设置检查点超时时间为 60000 毫秒(即 60 秒)。如果一个检查点在 60 秒内没有完成,则会被视为失败。
 env.getCheckpointConfig().setCheckpointTimeout(60000);
 // 设置同时进行的最大检查点数为 1。这意味着在前一个检查点完成之前,不会开始新的检查点。
 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
 // 设置可容忍的检查点失败次数为 5。这意味着在连续 5 次检查点失败后,Flink 才会停止尝试新的检查点。
 env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
 // 启用外部化检查点,并设置在取消作业时保留检查点。这样可以在作业取消后手动恢复到某个检查点。
 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup
 .RETAIN_ON_CANCELLATION);
 // 设置重启策略为基于失败率的重启策略。具体来说,如果在 5 分钟内发生 3 次失败,Flink 将尝试重启作业,每次重启之间等待 10 秒。
 env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(5), Time.seconds(10)));

关于CheckpointingMode

EXACTLY_ONCEAT_LEAST_ONCE 是 Flink 中两种不同的检查点模式,它们决定了在发生故障时数据处理的一致性级别。

以下是这两种模式的详细解释:

EXACTLY_ONCE

  • 定义:EXACTLY_ONCE 模式确保每个数据记录在发生故障后恰好被处理一次
  • 特点:
*   强一致性:即使在故障恢复过程中,也能保证数据不重复处理,也不会丢失。
    
    
*   复杂性:实现 EXACTLY_ONCE 需要更多的资源和更复杂的逻辑,可能会对性能有一定影响。
    
    
*   适用场景:适用于对数据一致性和准确性要求非常高的场景,例如金融交易、计费系统等。

AT_LEAST_ONCE

  • 定义:AT_LEAST_ONCE 模式确保每个数据记录在发生故障后至少被处理一次。
  • 特点:
*   弱一致性:在故障恢复过程中,可能会有数据重复处理,但不会丢失。
    
    
*   简单性:实现 AT_LEAST_ONCE 较为简单,对性能的影响较小。
    
    
*   适用场景:适用于对数据重复处理容忍度较高的场景,例如日志分析、监控数据处理等。

Doris Sql

Doris官方文档

建表语句示例

DROP TABLE IF EXISTS testdb.d_table_a;
CREATE TABLE IF NOT EXISTS testdb.d_table_a
(
 `id`                  BIGINT COMMENT '主键id',
 `uid`                 BIGINT COMMENT '用户ID',
 `address`             varchar(512) COMMENT '地址',
)
UNIQUE KEY(id)
COMMENT "用户地址表"
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
 "replication_allocation" = "tag.location.default: 1",
 "enable_unique_key_merge_on_write" = "true"
);

参数解释:

testdb.d_table_a

数据库.表名字;Doris 中表名默认是大小写敏感的,默认的表名最大长度为 64 字节。

可大小写敏感以在第一次初始化集群时配置lower_case_table_names为大小写不敏感的;

表名最大长度可以通过配置table_name_length_limit更改,不建议配置过大。

UNIQUE KEY(id)

指定表模型为主键模型(Unique Key Model)

DISTRIBUTED BY HASH(id) BUCKETS 1

指定表的数据分布方式为按id列的哈希值分布,并设置桶的数量为1。

PROPERTIES ("replication_allocation" = "tag.location.default: 1")

设置表的属性,指定副本分配策略,这里设置的是默认位置的一个副本。

"enable_unique_key_merge_on_write" = "true"

开启部分列更新,比如第一个job同步a,b,c列,第二个job同步x,y,z列;如果不开启,第二个job会覆盖掉a,b,c列

同时需要在Flink Connector中添加配置:

<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="sql" cid="n85" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 0px; margin-top: 15px; width: inherit;">'sink.properties.partial_columns' = 'true'</pre>

数据表模型

Doris 数据表模型上目前分为三类:DUPLICATE KEY, UNIQUE KEY, AGGREGATE KEY。

  • 明细模型(Duplicate Key Model):允许指定的 Key 列重复,Doirs 存储层保留所有写入的数据,适用于必须保留所有原始数据记录的情况;

  • 主键模型(Unique Key Model):每一行的 Key 值唯一,可确保给定的 Key 列不会存在重复行,Doris 存储层对每个 key 只保留最新写入的数据,适用于数据更新的情况;

  • 聚合模型(Aggregate Key Model):可根据 Key 列聚合数据,Doris 存储层保留聚合后的数据,从而可以减少存储空间和提升查询性能;通常用于需要汇总或聚合信息(如总数或平均值)的情况。

在建表后,表模型的属性已经确认,无法修改。针对业务选择合适的模型至关重要:

  • Aggregate 模型可以通过预聚合,极大地降低聚合查询时所需扫描的数据量和查询的计算量,非常适合有固定模式的报表类查询场景。但是该模型对 count(*) 查询很不友好。同时因为固定了 Value 列上的聚合方式,在进行其他类型的聚合查询时,需要考虑语意正确性。

  • Unique 模型针对需要唯一主键约束的场景,可以保证主键唯一性约束。但是无法利用 ROLLUP 等预聚合带来的查询优势。对于聚合查询有较高性能需求的用户,推荐使用自 1.2 版本加入的写时合并实现。

  • Duplicate 适合任意维度的 Ad-hoc 查询。虽然同样无法利用预聚合的特性,但是不受聚合模型的约束,可以发挥列存模型的优势(只读取相关列,而不需要读取所有 Key 列)。

建表时列类型建议

  1. Key 列必须在所有 Value 列之前。
  1. 尽量选择整型类型。因为整型类型的计算和查找效率远高于字符串。
  1. 对于不同长度的整型类型的选择原则,遵循够用即可。
  1. 对于 VARCHAR 和 STRING 类型的长度,遵循够用即可。

索引

索引用于帮助快速过滤或查找数据。目前主要支持两类索引:

  1. 内建自动创建的智能索引,包括前缀索引ZoneMap 索引

  2. 用户手动创建的二级索引,包括倒排索引bloomfilter 索引ngram bloomfilter 索引bitmap 索引

前缀索引

在 Aggregate、Unique 和 Duplicate 三种数据模型中。底层的数据存储,是按照各自建表语句中,AGGREGATE KEY、UNIQUE KEY 和 DUPLICATE KEY 中指定的列进行排序存储的。而前缀索引,即在排序的基础上,实现的一种根据给定前缀列,快速查询数据的索引方式。

前缀索引是稀疏索引,不能精确定位到 Key 所在的行,只能粗粒度地定位出 Key 可能存在的范围,然后使用二分查找算法精确地定位 Key 的位置。

推荐规约

  1. 建表时,正确的选择列顺序,能够极大地提高查询效率
因为建表时已经指定了列顺序,所以一个表只有一种前缀索引。这对于使用其他不能命中前缀索引的列作为条件进行的查询来说,效率上可能无法满足需求,这种情况,我们可以通过创建 物化视图 来人为的调整列顺序。
  1. 前缀索引的第一个字段一定是最常查询的字段,并且需要是高基数字段:
a. 分桶字段注意事项:这个一般是数据分布比较均衡的,也是经常使用的字段,最好是高基数字段



b. Int(4)+ Int(4) + varchar(50),前缀索引长度只有 28



c. Int(4) + varchar(50) + Int(4),前缀索引长度只有 24



d. varchar(10) + varchar(50) ,前缀索引长度只有 30



e. 前缀索引(36 位):第一个字段查询性能最好,前缀索引碰见 varchar 类型的字段,会自动截断前 20 个字符



f. 最常用的查询字段如果能放到前缀索引里尽可能放到前前缀索引里,如果不能,可以放到分桶字段里
  1. 前缀索引中的字段长度尽可能明确,因为 Doris 只有前 36 个字节能走前缀索引。
  1. 如果某个范围数据在分区分桶和前缀索引中都不好设计,可以考虑引入倒排索引加速。

倒排索引(Inverted Index)

倒排索引 - Apache Doris

创建倒排索引
-- 创建示例:可以表创建时指定或者创建后新增,如下创建表时指定
CREATE TABLE table_name
(
 columns_difinition,
 -- 创建一个名为idx_name1的倒排索引,基于column_name1列。指定解析器类型,可以是english、unicode或chinese。
 INDEX idx_name1(column_name1) USING INVERTED [PROPERTIES("parser" = "english|unicode|chinese")] [COMMENT 'your comment']
 -- 创建一个名为idx_name2的倒排索引,基于column_name2列。指定解析器类型,可以是english、unicode或chinese。
 INDEX idx_name2(column_name2) USING INVERTED [PROPERTIES("parser" = "english|unicode|chinese")] [COMMENT 'your comment']
 -- 指定解析器类型为chinese,并指定解析模式为fine_grained或coarse_grained。
 INDEX idx_name3(column_name3) USING INVERTED [PROPERTIES("parser" = "chinese", "parser_mode" = "fine_grained|coarse_grained")] [COMMENT 'your comment']
 -- 指定解析器类型,并指定是否支持短语搜索。
 INDEX idx_name4(column_name4) USING INVERTED [PROPERTIES("parser" = "english|unicode|chinese", "support_phrase" = "true|false")] [COMMENT 'your comment']
 -- 指定字符过滤器类型为char_replace,过滤模式为"._",替换为(空格)。
 INDEX idx_name5(column_name4) USING INVERTED [PROPERTIES("char_filter_type" = "char_replace", "char_filter_pattern" = "._"), "char_filter_replacement" = " "] [COMMENT 'your comment']
 -- 指定字符过滤器类型为char_replace,过滤模式为"._"。
 INDEX idx_name5(column_name4) USING INVERTED [PROPERTIES("char_filter_type" = "char_replace", "char_filter_pattern" = "._")] [COMMENT 'your comment']
)

-- 表的属性定义部分
table_properties;

-- 使用示例:全文检索关键词匹配,通过 MATCH_ANY MATCH_ALL 完成
SELECT * FROM table_name WHERE column_name MATCH_ANY | MATCH_ALL 'keyword1 ...';</pre>
PROPERTIES 参数详解
parser 指定分词器
  • 默认不指定代表不分词

  • english 是英文分词,适合被索引列是英文的情况,用空格和标点符号分词,性能高

  • chinese 是中文分词,适合被索引列主要是中文的情况,性能比 English 分词低

  • unicode 是多语言混合类型分词,适用于中英文混合、多语言混合的情况。它能够对邮箱前缀和后缀、IP 地址以及字符数字混合进行分词,并且可以对中文按字符分词。

parser_mode 指定分词的模式

用于指定分词的模式,目前 parser = chinese 时支持如下几种模式:

  • fine_grained:细粒度模式,倾向于分出比较短、较多的词,比如 '武汉市长江大桥' 会分成 '武汉', '武汉市', '市长', '长江', '长江大桥', '大桥' 6 个词

  • coarse_grained:粗粒度模式,倾向于分出比较长、较少的词,,比如 '武汉市长江大桥' 会分成 '武汉市' '长江大桥' 2 个词

  • 默认 coarse_grained

support_phrase 短语查询加速

用于指定索引是否支持 MATCH_PHRASE 短语查询加速

  • true 为支持,但是索引需要更多的存储空间

  • false 为不支持,更省存储空间,可以用 MATCH_ALL 查询多个关键字

  • 默认 false

例如下面的例子指定中文分词,粗粒度模式,支持短语查询加速。

INDEX idx_name(column_name) USING INVERTED PROPERTIES("parser" = "chinese", "parser_mode" = "coarse_grained", "support_phrase" = "true")

char_filter 在分词前对文本进行预处理

用于指定在分词前对文本进行预处理,通常用于影响分词行为

char_filter_type:指定使用不同功能的 char_filter(目前仅支持 char_replace)

char_replace 将 pattern 中每个 char 替换为一个 replacement 中的 char

  • char_filter_pattern:需要被替换掉的字符数

  • char_filter_replacement:替换后的字符数组,可以不用配置,默认为一个空格字符

例如下面的例子将点和下划线替换成空格,达到将点和下划线作为单词分隔符的目的,影响分词行为。

INDEX idx_name(column_name) USING INVERTED PROPERTIES("parser" = "unicode", "char_filter_type" = "char_replace", "char_filter_pattern" = "._", "char_filter_replacement" = " ")

ignore_above 不分词字符串索引的长度限制

用于指定不分词字符串索引(没有指定 parser)的长度限制

  • 长度超过 ignore_above 设置的字符串不会被索引。对于字符串数组,ignore_above 将分别应用于每个数组元素,长度超过 ignore_above 的字符串元素将不被索引。

  • 默认为 256,单位是字节

lower_case 将分词进行小写转换
  • true: 转换小写

  • false:不转换小写

  • 从 2.1.2 版本开始默认为 true,自动转小写,之前的版本默认为 false

stopwords 停用词表

默认的内置停用词表包含一些无意义的词:'is'、'the'、'a' 等。在写入或者查询时,分词器会忽略停用词表中的词。

  • none: 使用空的停用词表
增加倒排索引
-- 语法 1
CREATE INDEX idx_name ON table_name(column_name) USING INVERTED [PROPERTIES(...)] [COMMENT 'your comment'];
-- 语法 2
ALTER TABLE table_name ADD INDEX idx_name(column_name) USING INVERTED [PROPERTIES(...)] [COMMENT 'your comment'];

注意:CREATE / ADD INDEX 操作只是新增了索引定义,这个操作之后的新写入数据会生成倒排索引,而存量数据需要使用 BUILD INDEX 触发

-- 语法 1,默认给全表的所有分区 BUILD INDEX
BUILD INDEX index_name ON table_name;
-- 语法 2,可指定 Partition,可指定一个或多个
BUILD INDEX index_name ON table_name PARTITIONS(partition_name1, partition_name2);

通过 SHOW BUILD INDEX 查看 BUILD INDEX 进度:

<pre spellcheck="false" class="md-fences mock-cm md-end-block" lang="sql" cid="n196" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit;">SHOW BUILD INDEX [FROM db_name];
-- 示例 1,查看所有的 BUILD INDEX 任务进展
SHOW BUILD INDEX;
-- 示例 2,查看指定 table 的 BUILD INDEX 任务进展
SHOW BUILD INDEX where TableName = "table1";</pre>

通过 CANCEL BUILD INDEX 取消 BUILD INDEX

CANCEL BUILD INDEX ON table_name;
CANCEL BUILD INDEX ON table_name (job_id1,jobid_2,...);

提示

BUILD INDEX 会生成一个异步任务执行,在每个 BE 上有多个线程执行索引构建任务,通过 BE 参数 alter_index_worker_count 可以设置,默认值是 3。

2.1.4 之前的版本 BUILD INDEX 会一直重试直到成功,从这个版本开始通过失败和超时机制避免一直重试。

  1. 一个 tablet 的多数副本 BUILD INDEX 失败后,整个 BUILD INDEX 失败结束
  1. 时间超过 alter_table_timeout_second (),BUILD INDEX 超时结束
  1. 用户可以多次触发 BUILD INDEX,已经 BUILD 成功的索引不会重复 BUILD
删除倒排索引
-- 语法 1
DROP INDEX idx_name ON table_name;
-- 语法 2
ALTER TABLE table_name DROP INDEX idx_name;

提示

DROP INDEX 会删除索引定义,新写入数据不会再写索引,同时会生成一个异步任务执行索引删除操作,在每个 BE 上有多个线程执行索引删除任务,通过 BE 参数 alter_index_worker_count 可以设置,默认值是 3。

数据类型

数据类型 - Apache Doris

分区分桶

Doris 的分区(Partition)和分桶(Bucket)是优化数据存储和查询性能的核心机制,其设计逻辑如下:

分区(Partition)

将表数据按指定规则(如时间、地域等)划分为逻辑块,每个分区独立存储和管理。

核心作用

  • 减少扫描范围:查询时仅读取相关分区,避免全表扫描。
  • 管理数据生命周期:可便捷删除或归档过期分区(如删除 3 年前的数据)。
  • 按需加载:结合冷热存储策略,将热分区存于 SSD,冷分区存于 HDD。

分区策略

  • 范围分区(Range):常用时间字段(DAY/MONTH),如按天划分。
  • 列表分区(List):按枚举值划分(如按国家 CNUS 分区)。
  • 动态分区:自动创建新分区(如在写入时按日期自动生成分区)。

示例:

-- 按天分区(时间范围)
PARTITION BY RANGE (dt) (
    PARTITION p202301 VALUES [('2023-01-01'), ('2023-01-02')),
    PARTITION p202302 VALUES [('2023-01-02'), ('2023-01-03'))
);

-- 动态分区(每天自动创建)
PROPERTIES (
    "dynamic_partition.enable" = "true", 
    "dynamic_partition.time_unit" = "DAY",
    "dynamic_partition.end" = "3"  -- 保留最近3天分区
);

分桶(Bucket)

将分区内的数据进一步划分为物理块(桶),每个桶对应一个数据分片(Tablet),分布式存储在节点上。

核心作用

  • 并行计算:查询时多个桶并发处理,提升性能。
  • 数据均衡:通过哈希均匀分布数据,避免节点热点。
  • 优化 JOIN:相同分桶规则的表可进行 Colocate Join,减少数据 Shuffle。

分桶策略

  • 哈希分桶(Hash):基于分桶键的哈希值分配数据(最常用)。
  • 随机分桶(Random ):无明确分桶键,数据随机分配(适用于无明确查询条件字段)。

分桶键选择原则

  • 高基数:选区分度高的字段(如 user_id 而非 gender)。
  • 常用查询条件:优先选 WHERE/JOIN/GROUP BY 的字段。
  • 数据均匀性:确保数据均匀分布,避免倾斜。

示例

-- 按 user_id 哈希分桶,分10个桶
DISTRIBUTED BY HASH(user_id) BUCKETS 10;

分区分桶组合策略模板

CREATE TABLE dm.user_behavior (
    dt DATE COMMENT '数据日期分区', 
    user_id BIGINT COMMENT '用户ID',
    device VARCHAR(32) COMMENT '设备类型',
    event_type STRING COMMENT '事件类型'
)
ENGINE=OLAP
PARTITION BY RANGE(dt) ()
DISTRIBUTED BY HASH(user_id) BUCKETS 32
PROPERTIES (
    "dynamic_partition.enable" = "true",   
    "dynamic_partition.time_unit" = "DAY",
    "dynamic_partition.start" = "-7",    -- 保留7天前的分区
    "dynamic_partition.end" = "3",       -- 提前创建3天分区
    "dynamic_partition.prefix" = "p",    -- 分区名前缀
    "replication_num" = "3"              -- 生产环境必须3副本
);

警告

强制规约

  1. 数据库字符集指定 UTF-8,并且只支持 UTF-8。
  1. 表的副本数必须为 3(未指定副本数时,默认为 3)。
  1. 5 亿以上的数据必须设置分区分桶策略

视图

逻辑视图

没有物理存储,所有在视图上的查询相当于在视图对应的子查询上进行。

视图的创建和删除不会影响底层表的数据。

-- 逻辑视图
CREATE VIEW xxx_dm.dm_view_award
(
   `id`             COMMENT '主键id',
   `artisan_id`     COMMENT '工匠uid',
   `name`           COMMENT '奖项名称',
   `images`         COMMENT '奖项图片',
   `status`         COMMENT '奖项状态0待审核1审核通过2审核不通过',
   `shelf_state`    COMMENT '上下架状态0下架1上架;',
   `reason`         COMMENT '审核原因'
) COMMENT "工匠奖项视图" AS
select
    `id`,
    `artisan_id`,
    `name`,
    `images`,
    `status`,
    `shelf_state`,
    `reason`
from xxx_ods.ods_award

物化视图

物化视图是既包含计算逻辑也包含数据的实体。

物化视图分同步和异步

  • 同步物化视图需要与基表的数据保持强一致性。
  • 异步物化视图与基表的数据保持最终一致性,可能会有一定的延迟。它通常用于对数据时效性要求不高的场景,一般使用 T+1 或小时级别的数据来构建物化视图。如果时效性要求高,则考虑使用同步物化视图。

目前,同步物化视图不支持直接查询,而异步物化视图支持直接查询。

-- 创建同步物化视图
create materialized view store_amt as 
select store_id, sum(sale_amt) from sales_records group by store_id;

-- 删除物化视图
drop materialized view store_amt on sales_records;

其他

ENGINE

ENGINE=OLAP

ENGINE 的类型是 olap,即默认的 ENGINE 类型。在 Doris 中,只有这个ENGINE 类型是由 Doris 负责数据管理和存储的。其他 ENGINE 类型,如 mysql、broker、es 等等,本质上只是对外部其他数据库或系统中的表的映射,以保证 Doris 可以读取这些数据。而 Doris 本身并不创建、管理和存储任何非 olap ENGINE 类型的表和数据。

Flink Sql

Flink v1.18 中文文档

Flink v1.18 官方文档

Flink SQL 系列文档

连接器

Doris 连接

Flink Doris Connector - Apache Doris

'connector' = 'doris',
'fenodes' = 'doris-fe1:8030,doris-fe2:8030',
'table.identifier' = 'db_name.table_name',
'username' = 'username',
'password' = 'password',

Mysql-CDC 连接

MySQL | Apache Flink CDC

'connector' = 'mysql-cdc',
'hostname' = ${biz.mysql.hostname},
'port' = ${biz.mysql.port},
'username' = ${biz.mysql.username},
'password' = ${biz.mysql.password},
'database-name' = 'database_name',
'table-name' = 'table_name'

JDBC连接

JDBC | Apache Flink

'connector' = 'jdbc',
'url' = ${dm.mysql.url},
'table-name' = 'table_name',
'username' = ${dm.mysql.username},
'password' = ${dm.mysql.password},
'driver' = 'com.mysql.cj.jdbc.Driver'

elasticsearch-7

Elasticsearch | Apache Flink

必填参数

'connector' = 'elasticsearch-7',
'hosts' = 'http://es-node1:9200;http://es-node2:9200',
'index' = 'test-flink',

DDL 操作示例

-- 创建 MySQL 源表(支持增量同步)
CREATE TABLE mysql_source_table (
    id BIGINT COMMENT '主键',
    name STRING,
    create_time TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED  -- 必须指定主键用于 UPDATE/DELETE
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'mysql-host',
    'port' = '3306',
    'username' = 'flink',
    'password' = '******',
    'database-name' = 'source_db',
    'table-name' = 'source_table',
    'server-time-zone' = 'Asia/Shanghai'  -- 避免时区问题
);

-- 创建 Doris 目标表(需对齐 MySQL 结构) 
CREATE TABLE doris_sink_table (
    id BIGINT,
    name STRING,
    create_time TIMESTAMP,
    PRIMARY KEY (id)  -- Doris 主键模型
) WITH (
    'connector' = 'doris',
    'fenodes' = 'doris-fe:8030',
    'table.identifier' = 'dm.target_table',
    'username' = 'flink',
    'password' = '******',
    'sink.properties.partial_columns' = 'true', -- 是否启用部分更新
    'sink.properties.column_separator' = '$^&@#/pre>, -- 列换行符
    'sink.properties.line_delimiter' = '!^*^!'      -- 行换行符
);

关于换行符号

flink sql 读取出来的数据类似如下 : 1,张三,湖南 2,李四,湖北 3,王五,北京 西城区 4,赵六,广东

其中列分隔符为',',行分隔符为'\n',其中'西城区'一行被当成了一条单独的数据,但是又与doris中表字段不匹配,所以设置行分隔符为'!*!'(写成自己数据中不容易出现的组合符号就行),否则写入失败

函数

系统内置函数

并行度与资源分配

-- 单独调整算子并行度(如窗口计算)
SELECT 
    TUMBLE_START(create_time, INTERVAL '5' MINUTE) AS window_start,
    COUNT(*) AS cnt 
FROM mysql_source_table
GROUP BY TUMBLE(create_time, INTERVAL '5' MINUTE)
/*+ OPTIONS('source.parallelism' = '4') */;  -- 指定源表并行度

故障排查与监控

错误场景 解决方案
主键冲突写入 Doris 失败 检查 Doris 表是否为 UNIQUE KEY 模型,若为聚合模型需配置 sink.enable-delete = 'true'
CDC 数据延迟高 增加 Source 并行度
server-id重复 调整 MySQL server-id 范围避免冲突,默认情况下是5400-6400;建议设置一个明确的值
Checkpoint 超时 增大 execution.checkpointing.timeout,优化 State Backend 性能
JSON 解析异常 添加 'sink.ignore-parse-errors' = 'true' 临时跳过错误数据

异常:

Cannot have more than one execute() or executeAsync() call in a single environment.

原因:

Flink 1.18明确要求:多个INSERT语句必须通过StatementSet提交为单个作业。直接循环执行executeSql(insert)会触发多个作业,违反“同一个环境只能提交一次作业”的限制。本地测试可能在批处理模式或特殊配置下允许多作业提交,而生产环境(通常是流模式)会严格限制。

解决方案:

多条 INSERT 语句,使用 TableEnvironment 中的 createStatementSet 创建一个 StatementSet 对象,然后使用 StatementSet 中的 addInsertSql() 方法添加多条 INSERT 语句,最后通过 StatementSet 中的 execute() 方法来执行。

实战场景范例

场景 1:实时数据清洗

-- 清洗并过滤无效数据
INSERT INTO doris_cleaned_data
SELECT 
    user_id,
    TRIM(username) AS username,  -- 去除前后空格
    REGEXP_REPLACE(phone, '[^0-9]', '') AS phone  -- 过滤非数字字符
FROM mysql_raw_data
WHERE LENGTH(phone) = 11;  -- 只保留有效手机号

场景 2:跨表 JOIN 计算

-- 用户订单宽表构建
INSERT INTO doris_user_orders
SELECT 
    u.user_id,
    u.name,
    COUNT(o.order_id) AS order_count,
    SUM(o.amount) AS total_amount 
FROM mysql_users u
LEFT JOIN mysql_orders o 
ON u.user_id = o.user_id
GROUP BY u.user_id, u.name;

场景 3:窗口聚合统计

-- 每 5 分钟统计订单金额
INSERT INTO doris_order_stats
SELECT 
    TUMBLE_START(order_time, INTERVAL '5' MINUTE) AS window_start,
    COUNT(*) AS order_num,
    SUM(amount) AS total_amount 
FROM mysql_orders
GROUP BY TUMBLE(order_time, INTERVAL '5' MINUTE);

基础类型对照表

Doris 类型 Flink SQL 类型 MySQL 类型 关键注意事项
BOOLEAN BOOLEAN TINYINT(1) MySQL 用 TINYINT(1) 模拟布尔值,0=false,非0=true
TINYINT TINYINT TINYINT 范围一致(-128 ~ 127)
SMALLINT SMALLINT SMALLINT 范围一致
INT INT INT 范围一致
BIGINT BIGINT BIGINT 范围一致
LARGEINT DECIMAL(38,0) 不支持 Doris 的 LARGEINT 是128位整数,Flink可用字符串绕过
FLOAT FLOAT FLOAT 单精度浮点数
DOUBLE DOUBLE DOUBLE 双精度浮点数
DECIMAL(p,s) DECIMAL(p,s) DECIMAL(p,s) / NUMERIC 精度需对齐(Doris最大精度38位,Flink同)
VARCHAR(n) VARCHAR(n) VARCHAR(n) Doris 的 VARCHAR 最大长度 65533,MySQL允许更长
STRING STRING TEXT / LONGTEXT Doris STRING 是别名,相当于 VARCHAR(65533)
DATE DATE DATE 日期格式一致('YYYY-MM-DD')
DATETIME TIMESTAMP DATETIME Doris DATETIME 不存储时区,Flink用 TIMESTAMP_LTZ 处理时区
TIMESTAMP TIMESTAMP TIMESTAMP MySQL TIMESTAMP自动转UTC,写入时注意时区
JSON STRING / ROW<...> JSON Flink需用STRING处理原始JSON字符串

特殊类型处理

类型 处理建议
时间戳(跨时区) 确保 Flink 作业时区配置一致,建议使用 TIMESTAMP_LTZ 并在写入Doris时转为字符串。
精度超限(如DECIMAL) 如果MySQL的精度超过Doris定义,写入时会直接截断(可能丢数据)。
超长字符串(如TEXT) MySQL的TEXT写入Doris时若长度超限会截断,建议用Doris的STRING
枚举类型(ENUM) MySQL的ENUM建议转成Flink的STRING,再写入Doris。
无符号整数(UNSIGNED) MySQL的UNSIGNED INT需在Flink转为BIGINT(Doris无无符号类型)。

优化

状态存储(State Backend)

状态过期(State TTL):

在 Apache Flink 中,状态 TTL(Time-To-Live) 是一种用于控制状态生命周期的机制。通过配置状态 TTL,你可以指定状态数据在多长时间后自动过期并被清理,从而避免状态无限增长、节省内存资源,并提升作业性能。

Flink 在做流式聚合(如 COUNT(DISTINCT)、SUM()、GROUP BY 等)时会将中间结果保存在状态中(State),这些状态默认不会自动删除。如果不加限制: 状态体积会不断增长,影响性能; 可能导致 OOM(内存溢出); 增加检查点(Checkpoint)和保存点(Savepoint)的大小和时间。 配置 TTL 后,Flink 会在状态过期后自动清除它们,实现“自动回收”。

维表(Lookup Table

JOIN 类型 Regular Join❌ 否

简单测试、小数据量

Lookup Join✅ 是

维度补全、查库兜底

Interval Join✅ 是

有明确时间窗口

Temporal Join✅ 是

查最新维度信息 Broadcast Join✅ 是

小表广播给大流

FLINK SQL模板

-- 启用检查点机制
SET 'execution.checkpointing.interval' = '5000';
-- 检查点模式
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
-- 检查点模式
SET 'execution.checkpointing.timeout' = '600000';
-- 允许同时进行的 Checkpoint 数量
SET 'execution.checkpointing.max-concurrent-checkpoints' = '1';
-- Checkpoint 之间的最小间隔(避免频繁触发)
SET 'execution.checkpointing.min-pause-between-checkpoints' = '1000';
-- Checkpoint 容忍失败次数
SET 'execution.checkpointing.tolerable-failed-checkpoints' = '3';
-- 启用增量 Checkpoint
SET 'state.backend.incremental' = 'true';
SET 'state.backend' = 'rocksdb';
--  设置保存点的目标目录
SET 'state.checkpoints.dir' = 'file:///xxx/checkpoints';
--  设置保存点的目标目录
SET 'state.savepoints.dir' = 'file:///xxx/savepoints';
--  是一个用于指定JobManager存储已完成作业档案的目录配置项
set 'jobmanager.archive.fs.dir' = 'file:///xxx/flink-job-archive';
-- 状态过期时间
set 'table.exec.state.ttl' = '1 d';
-- 开启mini-batch
--  set 'table.exec.mini-batch.enabled' = 'true';
-- 最大缓存时间
--  set 'table.exec.mini-batch.allow-latency' = '5 s';
-- 批次大小
--  set 'table.exec.mini-batch.size' = '10000';
-- 开启预聚合
--  set 'table.optimizer.agg-phase-strategy' = 'TWO_PHASE';
-- taskmanager总内存
--  set 'taskmanager.memory.process.size' = '2 g';

-- -----[source]-----
-- -----[Mysql-CDC]-----
CREATE TABLE source_table (
  `id` bigint,
  `name` varchar(12),
  `age` int,
  `birthday` timestamp,
  PRIMARY KEY (id) NOT ENFORCED
)
WITH
  (
    'connector' = 'mysql-cdc',
    'hostname' = ${mysql.hostname},
    'port' = ${mysql.port},
    'username' = ${mysql.username},
    'password' = ${mysql.password},
    'database-name' = 'dbName',
    'table-name' = 'tableName',
    -- 尽量配置的参数
    --  'parallelism' = '4',                    -- 并行度(server-id有几个就设置多少)
    --  'server-id' = 'test-server_id'          -- MySQL CDC 的服务器 ID (范围),全局唯一
    -- 可选参数
    -- 'debezium.snapshot.mode' = 'initial',         -- 快照模式
    -- 'debezium.snapshot.locking.mode' = 'none',    -- 快照锁模式
    -- 'scan.incremental.snapshot.enabled' = 'true', -- 增量快照 (适用于大表)
    --  'connect.timeout' = '30s',                    -- 连接超时时间
    --  'connect.max-retries' = '3',                  -- 最大重试次数
    --  'heartbeat.interval' = '30000',               -- 心跳间隔(毫秒)
    --  'scan.startup.mode' = 'initial',              -- 启动模式
    --  scan.startup.timestamp-millis = ‘1754647055002’ -- 从binlog指定时间后开始读取
  );

-- -----[JDBC]-----
CREATE TABLE source_table2 (
  `id` bigint,
  `name` varchar(12),
  `age` int,
  `birthday` timestamp,
  PRIMARY KEY (id) NOT ENFORCED
)
WITH
  (
    'connector' = 'jdbc',
    'url' = ${mysql.url},       -- 对应数据库url链接变量,如果没有请新增
    'table-name' = 'tableName',
    'username' = ${mysql.username},
    'password' = ${mysql.password},
    'driver' = 'com.mysql.cj.jdbc.Driver',
    -- 可选参数
    --  'lookup.cache.ttl' = '10min',           -- 查找缓存的过期时间
    --  'connection.max-retry-timeout' = '30s', -- 连接最大重试超时时间
    --  'scan.fetch-size' = '1000',             -- 每次读取的行数
    --  'lookup.cache.max-rows' = '10000',      -- 查找缓存的最大行数
    --  'lookup.max-retries' = '3',             -- 查找最大重试次数

    --  'sink.buffer-flush.max-rows' = '1000',  -- 写入前缓存的最大行数
    --  'sink.buffer-flush.interval' = '2s',    -- 写入前的最大缓存时间
    --  'sink.max-retries' = '3',               -- 写入最大重试次数
    --  'sink.parallelism' = '4'                -- 写入并行度
  );

-- -----[sink]-----
-- -----[Doris]-----
CREATE TABLE sink_table (
  `id` bigint,
  `name` varchar(12),
  `age` int,
  `birthday` timestamp,
  PRIMARY KEY (id) NOT ENFORCED
)
WITH
  (
    'connector' = 'doris',
    'fenodes' = ${doris.fenodes},
    'table.identifier' = 'dbName.tableName',
    'username' = ${doris.username},
    'password' = ${doris.password},
    'sink.properties.column_separator' = '$^&@#$',
    'sink.properties.line_delimiter' = '!^*^!',
    'sink.label-prefix' = 'job名称'          -- 修改为job名称,如果该job中有多个sink,添加后缀以区分(该值所有任务唯一)
  );

-- -----[INSERT]-----
insert into
  source_table
select
  `id`,
  `name`,
  `age`,
  `birthday`
from
  sink_table
where
  age > 18;
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容