相关文档:
Flink相关拓展应用:
相关概念
ETL(Extract-Transform-Load ):
用来描述将数据从来源迁移到目标的几个过程,提取、转换、加载。
常用的ETL工具有kettle、DataX、Canal等
ODS(Operational Data Store ):
作为数据库到数据仓库的一种过渡,ODS的数据结构一般与数据来源保持一致,便于减少ETL的工作复杂性。在源数据装入这一层时,要进行诸如去噪(例如有一条数据中人的年龄是 300 岁,这种属于异常数据,就需要提前做一些处理)、去重(例如在个人资料表中,同一 ID 却有两条重复数据,在接入的时候需要做一步去重)、字段命名规范等一系列操作。
DW (Data Warehouse ):
数据仓库,是数据的归宿,这里保持这所有的从ODS到来的数据,并长期保存,而且这些数据不会被修改。
DM(Data Mart):
数据集市,为了特定的应用目的或应用范围,而从数据仓库中独立出来的一部分数据,也可称为部门数据或主题数据。按照业务划分,如流量、订单、用户等,生成字段比较多的宽表,用于提供后续的业务查询,OLAP分析,数据分发等。
目前数据流转结构,如图:
简单总结:通过Flink收集MySql业务数据到Doris中的ODS层和DM层数据库中,应用读取从DM层读取数据。
Table Api/DataStream
StreamExecutionEnvironment
streamExecutionEnvironment-史上最通俗易懂的Flink源代码深入分析教程
StreamExecutionEnvironment是Flink中用于定义和执行流处理程序的主要类。它提供了一系列函数和方法来配置流处理程序的执行环境(例如并行度、checkpoint、时间特性),并将其部署到Flink集群中运行。
配置示例代码:
// 创建一个 StreamExecutionEnvironment 对象 env,这是 Flink 流处理程序的入口点,用于配置和执行流处理任务。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用检查点机制,并设置检查点的时间间隔为 300000 毫秒(即 30 秒)。这意味着 Flink 每 10 秒会自动触发一次检查点。
env.enableCheckpointing(300000);
// 配置检查点模式为 EXACTLY_ONCE,这意味着 Flink 会确保每个数据记录在发生故障后恰好被处理一次,从而保证数据的一致性和准确性。
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置两次检查点之间的最小暂停时间间隔为 500 毫秒。这可以防止频繁的检查点操作对性能造成影响。
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 设置检查点超时时间为 60000 毫秒(即 60 秒)。如果一个检查点在 60 秒内没有完成,则会被视为失败。
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 设置同时进行的最大检查点数为 1。这意味着在前一个检查点完成之前,不会开始新的检查点。
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 设置可容忍的检查点失败次数为 5。这意味着在连续 5 次检查点失败后,Flink 才会停止尝试新的检查点。
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);
// 启用外部化检查点,并设置在取消作业时保留检查点。这样可以在作业取消后手动恢复到某个检查点。
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup
.RETAIN_ON_CANCELLATION);
// 设置重启策略为基于失败率的重启策略。具体来说,如果在 5 分钟内发生 3 次失败,Flink 将尝试重启作业,每次重启之间等待 10 秒。
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(5), Time.seconds(10)));
关于CheckpointingMode
EXACTLY_ONCE 和 AT_LEAST_ONCE 是 Flink 中两种不同的检查点模式,它们决定了在发生故障时数据处理的一致性级别。
以下是这两种模式的详细解释:
EXACTLY_ONCE
- 定义:EXACTLY_ONCE 模式确保每个数据记录在发生故障后恰好被处理一次
- 特点:
* 强一致性:即使在故障恢复过程中,也能保证数据不重复处理,也不会丢失。 * 复杂性:实现 EXACTLY_ONCE 需要更多的资源和更复杂的逻辑,可能会对性能有一定影响。 * 适用场景:适用于对数据一致性和准确性要求非常高的场景,例如金融交易、计费系统等。AT_LEAST_ONCE
- 定义:AT_LEAST_ONCE 模式确保每个数据记录在发生故障后至少被处理一次。
- 特点:
* 弱一致性:在故障恢复过程中,可能会有数据重复处理,但不会丢失。 * 简单性:实现 AT_LEAST_ONCE 较为简单,对性能的影响较小。 * 适用场景:适用于对数据重复处理容忍度较高的场景,例如日志分析、监控数据处理等。
Doris Sql
建表语句示例
DROP TABLE IF EXISTS testdb.d_table_a;
CREATE TABLE IF NOT EXISTS testdb.d_table_a
(
`id` BIGINT COMMENT '主键id',
`uid` BIGINT COMMENT '用户ID',
`address` varchar(512) COMMENT '地址',
)
UNIQUE KEY(id)
COMMENT "用户地址表"
DISTRIBUTED BY HASH(id) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"enable_unique_key_merge_on_write" = "true"
);
参数解释:
testdb.d_table_a
数据库.表名字;Doris 中表名默认是大小写敏感的,默认的表名最大长度为 64 字节。
可大小写敏感以在第一次初始化集群时配置lower_case_table_names为大小写不敏感的;
表名最大长度可以通过配置table_name_length_limit更改,不建议配置过大。
UNIQUE KEY(id)
指定表模型为主键模型(Unique Key Model)
DISTRIBUTED BY HASH(id) BUCKETS 1
指定表的数据分布方式为按id列的哈希值分布,并设置桶的数量为1。
PROPERTIES ("replication_allocation" = "tag.location.default: 1")
设置表的属性,指定副本分配策略,这里设置的是默认位置的一个副本。
"enable_unique_key_merge_on_write" = "true"
开启部分列更新,比如第一个job同步a,b,c列,第二个job同步x,y,z列;如果不开启,第二个job会覆盖掉a,b,c列
同时需要在Flink Connector中添加配置:
<pre spellcheck="false" class="md-fences md-end-block ty-contain-cm modeLoaded" lang="sql" cid="n85" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: normal; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 0px; margin-top: 15px; width: inherit;">'sink.properties.partial_columns' = 'true'</pre>
数据表模型
Doris 数据表模型上目前分为三类:DUPLICATE KEY, UNIQUE KEY, AGGREGATE KEY。
明细模型(Duplicate Key Model):允许指定的 Key 列重复,Doirs 存储层保留所有写入的数据,适用于必须保留所有原始数据记录的情况;
主键模型(Unique Key Model):每一行的 Key 值唯一,可确保给定的 Key 列不会存在重复行,Doris 存储层对每个 key 只保留最新写入的数据,适用于数据更新的情况;
聚合模型(Aggregate Key Model):可根据 Key 列聚合数据,Doris 存储层保留聚合后的数据,从而可以减少存储空间和提升查询性能;通常用于需要汇总或聚合信息(如总数或平均值)的情况。
在建表后,表模型的属性已经确认,无法修改。针对业务选择合适的模型至关重要:
Aggregate 模型可以通过预聚合,极大地降低聚合查询时所需扫描的数据量和查询的计算量,非常适合有固定模式的报表类查询场景。但是该模型对 count(*) 查询很不友好。同时因为固定了 Value 列上的聚合方式,在进行其他类型的聚合查询时,需要考虑语意正确性。
Unique 模型针对需要唯一主键约束的场景,可以保证主键唯一性约束。但是无法利用 ROLLUP 等预聚合带来的查询优势。对于聚合查询有较高性能需求的用户,推荐使用自 1.2 版本加入的写时合并实现。
Duplicate 适合任意维度的 Ad-hoc 查询。虽然同样无法利用预聚合的特性,但是不受聚合模型的约束,可以发挥列存模型的优势(只读取相关列,而不需要读取所有 Key 列)。
建表时列类型建议
- Key 列必须在所有 Value 列之前。
- 尽量选择整型类型。因为整型类型的计算和查找效率远高于字符串。
- 对于不同长度的整型类型的选择原则,遵循够用即可。
- 对于 VARCHAR 和 STRING 类型的长度,遵循够用即可。
索引
索引用于帮助快速过滤或查找数据。目前主要支持两类索引:
内建自动创建的智能索引,包括前缀索引和 ZoneMap 索引。
用户手动创建的二级索引,包括倒排索引、bloomfilter 索引、ngram bloomfilter 索引 和 bitmap 索引。
前缀索引
在 Aggregate、Unique 和 Duplicate 三种数据模型中。底层的数据存储,是按照各自建表语句中,AGGREGATE KEY、UNIQUE KEY 和 DUPLICATE KEY 中指定的列进行排序存储的。而前缀索引,即在排序的基础上,实现的一种根据给定前缀列,快速查询数据的索引方式。
前缀索引是稀疏索引,不能精确定位到 Key 所在的行,只能粗粒度地定位出 Key 可能存在的范围,然后使用二分查找算法精确地定位 Key 的位置。
推荐规约
- 建表时,正确的选择列顺序,能够极大地提高查询效率。
因为建表时已经指定了列顺序,所以一个表只有一种前缀索引。这对于使用其他不能命中前缀索引的列作为条件进行的查询来说,效率上可能无法满足需求,这种情况,我们可以通过创建 物化视图 来人为的调整列顺序。
- 前缀索引的第一个字段一定是最常查询的字段,并且需要是高基数字段:
a. 分桶字段注意事项:这个一般是数据分布比较均衡的,也是经常使用的字段,最好是高基数字段 b. Int(4)+ Int(4) + varchar(50),前缀索引长度只有 28 c. Int(4) + varchar(50) + Int(4),前缀索引长度只有 24 d. varchar(10) + varchar(50) ,前缀索引长度只有 30 e. 前缀索引(36 位):第一个字段查询性能最好,前缀索引碰见 varchar 类型的字段,会自动截断前 20 个字符 f. 最常用的查询字段如果能放到前缀索引里尽可能放到前前缀索引里,如果不能,可以放到分桶字段里
- 前缀索引中的字段长度尽可能明确,因为 Doris 只有前 36 个字节能走前缀索引。
- 如果某个范围数据在分区分桶和前缀索引中都不好设计,可以考虑引入倒排索引加速。
倒排索引(Inverted Index)
创建倒排索引
-- 创建示例:可以表创建时指定或者创建后新增,如下创建表时指定
CREATE TABLE table_name
(
columns_difinition,
-- 创建一个名为idx_name1的倒排索引,基于column_name1列。指定解析器类型,可以是english、unicode或chinese。
INDEX idx_name1(column_name1) USING INVERTED [PROPERTIES("parser" = "english|unicode|chinese")] [COMMENT 'your comment']
-- 创建一个名为idx_name2的倒排索引,基于column_name2列。指定解析器类型,可以是english、unicode或chinese。
INDEX idx_name2(column_name2) USING INVERTED [PROPERTIES("parser" = "english|unicode|chinese")] [COMMENT 'your comment']
-- 指定解析器类型为chinese,并指定解析模式为fine_grained或coarse_grained。
INDEX idx_name3(column_name3) USING INVERTED [PROPERTIES("parser" = "chinese", "parser_mode" = "fine_grained|coarse_grained")] [COMMENT 'your comment']
-- 指定解析器类型,并指定是否支持短语搜索。
INDEX idx_name4(column_name4) USING INVERTED [PROPERTIES("parser" = "english|unicode|chinese", "support_phrase" = "true|false")] [COMMENT 'your comment']
-- 指定字符过滤器类型为char_replace,过滤模式为"._",替换为(空格)。
INDEX idx_name5(column_name4) USING INVERTED [PROPERTIES("char_filter_type" = "char_replace", "char_filter_pattern" = "._"), "char_filter_replacement" = " "] [COMMENT 'your comment']
-- 指定字符过滤器类型为char_replace,过滤模式为"._"。
INDEX idx_name5(column_name4) USING INVERTED [PROPERTIES("char_filter_type" = "char_replace", "char_filter_pattern" = "._")] [COMMENT 'your comment']
)
-- 表的属性定义部分
table_properties;
-- 使用示例:全文检索关键词匹配,通过 MATCH_ANY MATCH_ALL 完成
SELECT * FROM table_name WHERE column_name MATCH_ANY | MATCH_ALL 'keyword1 ...';</pre>
PROPERTIES 参数详解
parser 指定分词器
默认不指定代表不分词
english是英文分词,适合被索引列是英文的情况,用空格和标点符号分词,性能高
chinese是中文分词,适合被索引列主要是中文的情况,性能比 English 分词低
unicode是多语言混合类型分词,适用于中英文混合、多语言混合的情况。它能够对邮箱前缀和后缀、IP 地址以及字符数字混合进行分词,并且可以对中文按字符分词。
parser_mode 指定分词的模式
用于指定分词的模式,目前 parser = chinese 时支持如下几种模式:
fine_grained:细粒度模式,倾向于分出比较短、较多的词,比如 '武汉市长江大桥' 会分成 '武汉', '武汉市', '市长', '长江', '长江大桥', '大桥' 6 个词
coarse_grained:粗粒度模式,倾向于分出比较长、较少的词,,比如 '武汉市长江大桥' 会分成 '武汉市' '长江大桥' 2 个词
默认 coarse_grained
support_phrase 短语查询加速
用于指定索引是否支持 MATCH_PHRASE 短语查询加速
true 为支持,但是索引需要更多的存储空间
false 为不支持,更省存储空间,可以用 MATCH_ALL 查询多个关键字
默认 false
例如下面的例子指定中文分词,粗粒度模式,支持短语查询加速。
INDEX idx_name(column_name) USING INVERTED PROPERTIES("parser" = "chinese", "parser_mode" = "coarse_grained", "support_phrase" = "true")
char_filter 在分词前对文本进行预处理
用于指定在分词前对文本进行预处理,通常用于影响分词行为
char_filter_type:指定使用不同功能的 char_filter(目前仅支持 char_replace)
char_replace 将 pattern 中每个 char 替换为一个 replacement 中的 char
char_filter_pattern:需要被替换掉的字符数
char_filter_replacement:替换后的字符数组,可以不用配置,默认为一个空格字符
例如下面的例子将点和下划线替换成空格,达到将点和下划线作为单词分隔符的目的,影响分词行为。
INDEX idx_name(column_name) USING INVERTED PROPERTIES("parser" = "unicode", "char_filter_type" = "char_replace", "char_filter_pattern" = "._", "char_filter_replacement" = " ")
ignore_above 不分词字符串索引的长度限制
用于指定不分词字符串索引(没有指定 parser)的长度限制
长度超过 ignore_above 设置的字符串不会被索引。对于字符串数组,ignore_above 将分别应用于每个数组元素,长度超过 ignore_above 的字符串元素将不被索引。
默认为 256,单位是字节
lower_case 将分词进行小写转换
true: 转换小写
false:不转换小写
从 2.1.2 版本开始默认为 true,自动转小写,之前的版本默认为 false
stopwords 停用词表
默认的内置停用词表包含一些无意义的词:'is'、'the'、'a' 等。在写入或者查询时,分词器会忽略停用词表中的词。
- none: 使用空的停用词表
增加倒排索引
-- 语法 1
CREATE INDEX idx_name ON table_name(column_name) USING INVERTED [PROPERTIES(...)] [COMMENT 'your comment'];
-- 语法 2
ALTER TABLE table_name ADD INDEX idx_name(column_name) USING INVERTED [PROPERTIES(...)] [COMMENT 'your comment'];
注意:CREATE / ADD INDEX 操作只是新增了索引定义,这个操作之后的新写入数据会生成倒排索引,而存量数据需要使用 BUILD INDEX 触发
-- 语法 1,默认给全表的所有分区 BUILD INDEX
BUILD INDEX index_name ON table_name;
-- 语法 2,可指定 Partition,可指定一个或多个
BUILD INDEX index_name ON table_name PARTITIONS(partition_name1, partition_name2);
通过
SHOW BUILD INDEX查看BUILD INDEX进度:<pre spellcheck="false" class="md-fences mock-cm md-end-block" lang="sql" cid="n196" mdtype="fences" style="box-sizing: border-box; overflow: visible; font-family: var(--monospace); font-size: 0.9em; display: block; break-inside: avoid; text-align: left; white-space: pre-wrap; background-image: inherit; background-position: inherit; background-size: inherit; background-repeat: inherit; background-attachment: inherit; background-origin: inherit; background-clip: inherit; background-color: rgb(248, 248, 248); position: relative !important; border: 1px solid rgb(231, 234, 237); border-radius: 3px; padding: 8px 4px 6px; margin-bottom: 15px; margin-top: 15px; width: inherit;">SHOW BUILD INDEX [FROM db_name];
-- 示例 1,查看所有的 BUILD INDEX 任务进展
SHOW BUILD INDEX;
-- 示例 2,查看指定 table 的 BUILD INDEX 任务进展
SHOW BUILD INDEX where TableName = "table1";</pre>通过
CANCEL BUILD INDEX取消BUILD INDEX:CANCEL BUILD INDEX ON table_name;
CANCEL BUILD INDEX ON table_name (job_id1,jobid_2,...);
提示
BUILD INDEX会生成一个异步任务执行,在每个 BE 上有多个线程执行索引构建任务,通过 BE 参数alter_index_worker_count可以设置,默认值是 3。2.1.4 之前的版本
BUILD INDEX会一直重试直到成功,从这个版本开始通过失败和超时机制避免一直重试。
- 一个 tablet 的多数副本
BUILD INDEX失败后,整个BUILD INDEX失败结束
- 时间超过
alter_table_timeout_second(),BUILD INDEX超时结束
- 用户可以多次触发
BUILD INDEX,已经 BUILD 成功的索引不会重复 BUILD
删除倒排索引
-- 语法 1
DROP INDEX idx_name ON table_name;
-- 语法 2
ALTER TABLE table_name DROP INDEX idx_name;
提示
DROP INDEX会删除索引定义,新写入数据不会再写索引,同时会生成一个异步任务执行索引删除操作,在每个 BE 上有多个线程执行索引删除任务,通过 BE 参数alter_index_worker_count可以设置,默认值是 3。
数据类型
分区分桶
Doris 的分区(Partition)和分桶(Bucket)是优化数据存储和查询性能的核心机制,其设计逻辑如下:
分区(Partition)
将表数据按指定规则(如时间、地域等)划分为逻辑块,每个分区独立存储和管理。
核心作用:
- 减少扫描范围:查询时仅读取相关分区,避免全表扫描。
- 管理数据生命周期:可便捷删除或归档过期分区(如删除 3 年前的数据)。
- 按需加载:结合冷热存储策略,将热分区存于 SSD,冷分区存于 HDD。
分区策略:
- 范围分区(Range):常用时间字段(
DAY/MONTH),如按天划分。
- 列表分区(List):按枚举值划分(如按国家
CN、US分区)。
- 动态分区:自动创建新分区(如在写入时按日期自动生成分区)。
示例:
-- 按天分区(时间范围)
PARTITION BY RANGE (dt) (
PARTITION p202301 VALUES [('2023-01-01'), ('2023-01-02')),
PARTITION p202302 VALUES [('2023-01-02'), ('2023-01-03'))
);
-- 动态分区(每天自动创建)
PROPERTIES (
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.end" = "3" -- 保留最近3天分区
);
分桶(Bucket)
将分区内的数据进一步划分为物理块(桶),每个桶对应一个数据分片(Tablet),分布式存储在节点上。
核心作用:
- 并行计算:查询时多个桶并发处理,提升性能。
- 数据均衡:通过哈希均匀分布数据,避免节点热点。
- 优化 JOIN:相同分桶规则的表可进行 Colocate Join,减少数据 Shuffle。
分桶策略:
- 哈希分桶(Hash):基于分桶键的哈希值分配数据(最常用)。
- 随机分桶(Random ):无明确分桶键,数据随机分配(适用于无明确查询条件字段)。
分桶键选择原则:
- 高基数:选区分度高的字段(如
user_id而非gender)。
- 常用查询条件:优先选 WHERE/JOIN/GROUP BY 的字段。
- 数据均匀性:确保数据均匀分布,避免倾斜。
示例:
-- 按 user_id 哈希分桶,分10个桶
DISTRIBUTED BY HASH(user_id) BUCKETS 10;
分区分桶组合策略模板
CREATE TABLE dm.user_behavior (
dt DATE COMMENT '数据日期分区',
user_id BIGINT COMMENT '用户ID',
device VARCHAR(32) COMMENT '设备类型',
event_type STRING COMMENT '事件类型'
)
ENGINE=OLAP
PARTITION BY RANGE(dt) ()
DISTRIBUTED BY HASH(user_id) BUCKETS 32
PROPERTIES (
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-7", -- 保留7天前的分区
"dynamic_partition.end" = "3", -- 提前创建3天分区
"dynamic_partition.prefix" = "p", -- 分区名前缀
"replication_num" = "3" -- 生产环境必须3副本
);
警告
强制规约
- 数据库字符集指定 UTF-8,并且只支持 UTF-8。
- 表的副本数必须为 3(未指定副本数时,默认为 3)。
- 5 亿以上的数据必须设置分区分桶策略
视图
逻辑视图
没有物理存储,所有在视图上的查询相当于在视图对应的子查询上进行。
视图的创建和删除不会影响底层表的数据。
-- 逻辑视图
CREATE VIEW xxx_dm.dm_view_award
(
`id` COMMENT '主键id',
`artisan_id` COMMENT '工匠uid',
`name` COMMENT '奖项名称',
`images` COMMENT '奖项图片',
`status` COMMENT '奖项状态0待审核1审核通过2审核不通过',
`shelf_state` COMMENT '上下架状态0下架1上架;',
`reason` COMMENT '审核原因'
) COMMENT "工匠奖项视图" AS
select
`id`,
`artisan_id`,
`name`,
`images`,
`status`,
`shelf_state`,
`reason`
from xxx_ods.ods_award
物化视图
物化视图是既包含计算逻辑也包含数据的实体。
物化视图分同步和异步
- 同步物化视图需要与基表的数据保持强一致性。
- 异步物化视图与基表的数据保持最终一致性,可能会有一定的延迟。它通常用于对数据时效性要求不高的场景,一般使用 T+1 或小时级别的数据来构建物化视图。如果时效性要求高,则考虑使用同步物化视图。
目前,同步物化视图不支持直接查询,而异步物化视图支持直接查询。
-- 创建同步物化视图
create materialized view store_amt as
select store_id, sum(sale_amt) from sales_records group by store_id;
-- 删除物化视图
drop materialized view store_amt on sales_records;
其他
ENGINE
ENGINE=OLAP
ENGINE 的类型是 olap,即默认的 ENGINE 类型。在 Doris 中,只有这个ENGINE 类型是由 Doris 负责数据管理和存储的。其他 ENGINE 类型,如 mysql、broker、es 等等,本质上只是对外部其他数据库或系统中的表的映射,以保证 Doris 可以读取这些数据。而 Doris 本身并不创建、管理和存储任何非 olap ENGINE 类型的表和数据。
Flink Sql
连接器
Doris 连接
Flink Doris Connector - Apache Doris
'connector' = 'doris',
'fenodes' = 'doris-fe1:8030,doris-fe2:8030',
'table.identifier' = 'db_name.table_name',
'username' = 'username',
'password' = 'password',
Mysql-CDC 连接
'connector' = 'mysql-cdc',
'hostname' = ${biz.mysql.hostname},
'port' = ${biz.mysql.port},
'username' = ${biz.mysql.username},
'password' = ${biz.mysql.password},
'database-name' = 'database_name',
'table-name' = 'table_name'
JDBC连接
'connector' = 'jdbc',
'url' = ${dm.mysql.url},
'table-name' = 'table_name',
'username' = ${dm.mysql.username},
'password' = ${dm.mysql.password},
'driver' = 'com.mysql.cj.jdbc.Driver'
elasticsearch-7
必填参数
'connector' = 'elasticsearch-7',
'hosts' = 'http://es-node1:9200;http://es-node2:9200',
'index' = 'test-flink',
DDL 操作示例
-- 创建 MySQL 源表(支持增量同步)
CREATE TABLE mysql_source_table (
id BIGINT COMMENT '主键',
name STRING,
create_time TIMESTAMP,
PRIMARY KEY (id) NOT ENFORCED -- 必须指定主键用于 UPDATE/DELETE
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysql-host',
'port' = '3306',
'username' = 'flink',
'password' = '******',
'database-name' = 'source_db',
'table-name' = 'source_table',
'server-time-zone' = 'Asia/Shanghai' -- 避免时区问题
);
-- 创建 Doris 目标表(需对齐 MySQL 结构)
CREATE TABLE doris_sink_table (
id BIGINT,
name STRING,
create_time TIMESTAMP,
PRIMARY KEY (id) -- Doris 主键模型
) WITH (
'connector' = 'doris',
'fenodes' = 'doris-fe:8030',
'table.identifier' = 'dm.target_table',
'username' = 'flink',
'password' = '******',
'sink.properties.partial_columns' = 'true', -- 是否启用部分更新
'sink.properties.column_separator' = '$^&@#/pre>, -- 列换行符
'sink.properties.line_delimiter' = '!^*^!' -- 行换行符
);
关于换行符号
flink sql 读取出来的数据类似如下 : 1,张三,湖南 2,李四,湖北 3,王五,北京 西城区 4,赵六,广东
其中列分隔符为',',行分隔符为'\n',其中'西城区'一行被当成了一条单独的数据,但是又与doris中表字段不匹配,所以设置行分隔符为'!*!'(写成自己数据中不容易出现的组合符号就行),否则写入失败
函数
并行度与资源分配
-- 单独调整算子并行度(如窗口计算)
SELECT
TUMBLE_START(create_time, INTERVAL '5' MINUTE) AS window_start,
COUNT(*) AS cnt
FROM mysql_source_table
GROUP BY TUMBLE(create_time, INTERVAL '5' MINUTE)
/*+ OPTIONS('source.parallelism' = '4') */; -- 指定源表并行度
故障排查与监控
| 错误场景 | 解决方案 |
|---|---|
| 主键冲突写入 Doris 失败 | 检查 Doris 表是否为 UNIQUE KEY 模型,若为聚合模型需配置 sink.enable-delete = 'true' |
| CDC 数据延迟高 | 增加 Source 并行度 |
| server-id重复 | 调整 MySQL server-id 范围避免冲突,默认情况下是5400-6400;建议设置一个明确的值 |
| Checkpoint 超时 | 增大 execution.checkpointing.timeout,优化 State Backend 性能 |
| JSON 解析异常 | 添加 'sink.ignore-parse-errors' = 'true' 临时跳过错误数据 |
异常:
Cannot have more than one execute() or executeAsync() call in a single environment.
原因:
Flink 1.18明确要求:多个INSERT语句必须通过StatementSet提交为单个作业。直接循环执行executeSql(insert)会触发多个作业,违反“同一个环境只能提交一次作业”的限制。本地测试可能在批处理模式或特殊配置下允许多作业提交,而生产环境(通常是流模式)会严格限制。
解决方案:
多条 INSERT 语句,使用 TableEnvironment 中的 createStatementSet 创建一个 StatementSet 对象,然后使用 StatementSet 中的 addInsertSql() 方法添加多条 INSERT 语句,最后通过 StatementSet 中的 execute() 方法来执行。
实战场景范例
场景 1:实时数据清洗
-- 清洗并过滤无效数据
INSERT INTO doris_cleaned_data
SELECT
user_id,
TRIM(username) AS username, -- 去除前后空格
REGEXP_REPLACE(phone, '[^0-9]', '') AS phone -- 过滤非数字字符
FROM mysql_raw_data
WHERE LENGTH(phone) = 11; -- 只保留有效手机号
场景 2:跨表 JOIN 计算
-- 用户订单宽表构建
INSERT INTO doris_user_orders
SELECT
u.user_id,
u.name,
COUNT(o.order_id) AS order_count,
SUM(o.amount) AS total_amount
FROM mysql_users u
LEFT JOIN mysql_orders o
ON u.user_id = o.user_id
GROUP BY u.user_id, u.name;
场景 3:窗口聚合统计
-- 每 5 分钟统计订单金额
INSERT INTO doris_order_stats
SELECT
TUMBLE_START(order_time, INTERVAL '5' MINUTE) AS window_start,
COUNT(*) AS order_num,
SUM(amount) AS total_amount
FROM mysql_orders
GROUP BY TUMBLE(order_time, INTERVAL '5' MINUTE);
基础类型对照表
| Doris 类型 | Flink SQL 类型 | MySQL 类型 | 关键注意事项 |
|---|---|---|---|
BOOLEAN |
BOOLEAN |
TINYINT(1) |
MySQL 用 TINYINT(1) 模拟布尔值,0=false,非0=true |
TINYINT |
TINYINT |
TINYINT |
范围一致(-128 ~ 127) |
SMALLINT |
SMALLINT |
SMALLINT |
范围一致 |
INT |
INT |
INT |
范围一致 |
BIGINT |
BIGINT |
BIGINT |
范围一致 |
LARGEINT |
DECIMAL(38,0) |
不支持 |
Doris 的 LARGEINT 是128位整数,Flink可用字符串绕过 |
FLOAT |
FLOAT |
FLOAT |
单精度浮点数 |
DOUBLE |
DOUBLE |
DOUBLE |
双精度浮点数 |
DECIMAL(p,s) |
DECIMAL(p,s) |
DECIMAL(p,s) / NUMERIC
|
精度需对齐(Doris最大精度38位,Flink同) |
VARCHAR(n) |
VARCHAR(n) |
VARCHAR(n) |
Doris 的 VARCHAR 最大长度 65533,MySQL允许更长 |
STRING |
STRING |
TEXT / LONGTEXT
|
Doris STRING 是别名,相当于 VARCHAR(65533)
|
DATE |
DATE |
DATE |
日期格式一致('YYYY-MM-DD') |
DATETIME |
TIMESTAMP |
DATETIME |
Doris DATETIME 不存储时区,Flink用 TIMESTAMP_LTZ 处理时区 |
TIMESTAMP |
TIMESTAMP |
TIMESTAMP |
MySQL TIMESTAMP自动转UTC,写入时注意时区 |
JSON |
STRING / ROW<...>
|
JSON |
Flink需用STRING处理原始JSON字符串 |
特殊类型处理
| 类型 | 处理建议 |
|---|---|
| 时间戳(跨时区) | 确保 Flink 作业时区配置一致,建议使用 TIMESTAMP_LTZ 并在写入Doris时转为字符串。 |
| 精度超限(如DECIMAL) | 如果MySQL的精度超过Doris定义,写入时会直接截断(可能丢数据)。 |
| 超长字符串(如TEXT) | MySQL的TEXT写入Doris时若长度超限会截断,建议用Doris的STRING。 |
| 枚举类型(ENUM) | MySQL的ENUM建议转成Flink的STRING,再写入Doris。 |
| 无符号整数(UNSIGNED) | MySQL的UNSIGNED INT需在Flink转为BIGINT(Doris无无符号类型)。 |
优化
状态存储(State Backend)
状态过期(State TTL):
在 Apache Flink 中,状态 TTL(Time-To-Live) 是一种用于控制状态生命周期的机制。通过配置状态 TTL,你可以指定状态数据在多长时间后自动过期并被清理,从而避免状态无限增长、节省内存资源,并提升作业性能。
Flink 在做流式聚合(如 COUNT(DISTINCT)、SUM()、GROUP BY 等)时会将中间结果保存在状态中(State),这些状态默认不会自动删除。如果不加限制: 状态体积会不断增长,影响性能; 可能导致 OOM(内存溢出); 增加检查点(Checkpoint)和保存点(Savepoint)的大小和时间。 配置 TTL 后,Flink 会在状态过期后自动清除它们,实现“自动回收”。
维表(Lookup Table)
JOIN 类型 Regular Join❌ 否
简单测试、小数据量
Lookup Join✅ 是
维度补全、查库兜底
Interval Join✅ 是
有明确时间窗口
Temporal Join✅ 是
查最新维度信息 Broadcast Join✅ 是
小表广播给大流
FLINK SQL模板
-- 启用检查点机制
SET 'execution.checkpointing.interval' = '5000';
-- 检查点模式
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
-- 检查点模式
SET 'execution.checkpointing.timeout' = '600000';
-- 允许同时进行的 Checkpoint 数量
SET 'execution.checkpointing.max-concurrent-checkpoints' = '1';
-- Checkpoint 之间的最小间隔(避免频繁触发)
SET 'execution.checkpointing.min-pause-between-checkpoints' = '1000';
-- Checkpoint 容忍失败次数
SET 'execution.checkpointing.tolerable-failed-checkpoints' = '3';
-- 启用增量 Checkpoint
SET 'state.backend.incremental' = 'true';
SET 'state.backend' = 'rocksdb';
-- 设置保存点的目标目录
SET 'state.checkpoints.dir' = 'file:///xxx/checkpoints';
-- 设置保存点的目标目录
SET 'state.savepoints.dir' = 'file:///xxx/savepoints';
-- 是一个用于指定JobManager存储已完成作业档案的目录配置项
set 'jobmanager.archive.fs.dir' = 'file:///xxx/flink-job-archive';
-- 状态过期时间
set 'table.exec.state.ttl' = '1 d';
-- 开启mini-batch
-- set 'table.exec.mini-batch.enabled' = 'true';
-- 最大缓存时间
-- set 'table.exec.mini-batch.allow-latency' = '5 s';
-- 批次大小
-- set 'table.exec.mini-batch.size' = '10000';
-- 开启预聚合
-- set 'table.optimizer.agg-phase-strategy' = 'TWO_PHASE';
-- taskmanager总内存
-- set 'taskmanager.memory.process.size' = '2 g';
-- -----[source]-----
-- -----[Mysql-CDC]-----
CREATE TABLE source_table (
`id` bigint,
`name` varchar(12),
`age` int,
`birthday` timestamp,
PRIMARY KEY (id) NOT ENFORCED
)
WITH
(
'connector' = 'mysql-cdc',
'hostname' = ${mysql.hostname},
'port' = ${mysql.port},
'username' = ${mysql.username},
'password' = ${mysql.password},
'database-name' = 'dbName',
'table-name' = 'tableName',
-- 尽量配置的参数
-- 'parallelism' = '4', -- 并行度(server-id有几个就设置多少)
-- 'server-id' = 'test-server_id' -- MySQL CDC 的服务器 ID (范围),全局唯一
-- 可选参数
-- 'debezium.snapshot.mode' = 'initial', -- 快照模式
-- 'debezium.snapshot.locking.mode' = 'none', -- 快照锁模式
-- 'scan.incremental.snapshot.enabled' = 'true', -- 增量快照 (适用于大表)
-- 'connect.timeout' = '30s', -- 连接超时时间
-- 'connect.max-retries' = '3', -- 最大重试次数
-- 'heartbeat.interval' = '30000', -- 心跳间隔(毫秒)
-- 'scan.startup.mode' = 'initial', -- 启动模式
-- scan.startup.timestamp-millis = ‘1754647055002’ -- 从binlog指定时间后开始读取
);
-- -----[JDBC]-----
CREATE TABLE source_table2 (
`id` bigint,
`name` varchar(12),
`age` int,
`birthday` timestamp,
PRIMARY KEY (id) NOT ENFORCED
)
WITH
(
'connector' = 'jdbc',
'url' = ${mysql.url}, -- 对应数据库url链接变量,如果没有请新增
'table-name' = 'tableName',
'username' = ${mysql.username},
'password' = ${mysql.password},
'driver' = 'com.mysql.cj.jdbc.Driver',
-- 可选参数
-- 'lookup.cache.ttl' = '10min', -- 查找缓存的过期时间
-- 'connection.max-retry-timeout' = '30s', -- 连接最大重试超时时间
-- 'scan.fetch-size' = '1000', -- 每次读取的行数
-- 'lookup.cache.max-rows' = '10000', -- 查找缓存的最大行数
-- 'lookup.max-retries' = '3', -- 查找最大重试次数
-- 'sink.buffer-flush.max-rows' = '1000', -- 写入前缓存的最大行数
-- 'sink.buffer-flush.interval' = '2s', -- 写入前的最大缓存时间
-- 'sink.max-retries' = '3', -- 写入最大重试次数
-- 'sink.parallelism' = '4' -- 写入并行度
);
-- -----[sink]-----
-- -----[Doris]-----
CREATE TABLE sink_table (
`id` bigint,
`name` varchar(12),
`age` int,
`birthday` timestamp,
PRIMARY KEY (id) NOT ENFORCED
)
WITH
(
'connector' = 'doris',
'fenodes' = ${doris.fenodes},
'table.identifier' = 'dbName.tableName',
'username' = ${doris.username},
'password' = ${doris.password},
'sink.properties.column_separator' = '$^&@#$',
'sink.properties.line_delimiter' = '!^*^!',
'sink.label-prefix' = 'job名称' -- 修改为job名称,如果该job中有多个sink,添加后缀以区分(该值所有任务唯一)
);
-- -----[INSERT]-----
insert into
source_table
select
`id`,
`name`,
`age`,
`birthday`
from
sink_table
where
age > 18;