一. 去重参数
如果需要去重,可以在创建hudi table的时候,指定主键,这样重复的数据只会保留最新的一行。
CREATE TABLE test_hudi_flink41 (
id int PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
price int,
ts int,
dt VARCHAR(10)
)
PARTITIONED BY (dt)
WITH (
'connector' = 'hudi',
'hoodie.datasource.write.recordkey.field' = 'id',
......
);
如上,hudi表以id为主键。
当然同其它SQL一样,我们也可以设置联合主键。
二. 并发参数
常用的并发参数:
在Flink SQL中操作Hudi表,并调整并发参数
三. 压缩参数
在线压缩的参数,通过设置 compaction.async.enabled = false
关闭在线压缩执行,但是调度 compaction.schedule.enabled
仍然建议开启,之后通过离线压缩直接执行。
四. 文件大小参数
Hudi会自管理文件大小, 避免向查询引擎暴露小文件,其中自动处理文件大小起很大作用。 在进行insert/upsert操作时,Hudi可以将文件大小维护在一个指定文件大小。
目前只有 log 文件的写入大小可以做到精确控制, parquet 文件大小按照估算值。
五. Hadoop 参数
从 0.12.0 开始支持,如果有跨集群提交执行的需求,可以通过 sql 的 ddl指定 per-job级别的 hadoop 配置。
六. 内存参数
七. MOR表相关参数
stage backend 换成 rocksdb (默认的 in-memory state-backend 非常吃内存)
内存够的话,
compaction.max_memory
调大些(默认是 100MB 可以调到 1GB)关注 TM 分配给每个 write task 的内存,保证每个 write task 能够分配到 write.task.max.size 所配置的大小, 比如 TM 的 内存是 4GB跑了 2个 StreamWriteFunction 那每个 write function 能分到 2GB, 尽量预留一些 buffer,因为网络buffer, TM上其它类型task(比如 BucketAssignFunction 也会吃些内存)
需要关注 compaciton 的 内存变化, compaction.max_memory 控制每个 compaction task 读 log 时 可以利用的内存大小, compaction.tasks 控制了 compaction task的并发。
八. COW表相关参数
stage backend 换成 rocksdb (默认的 in-memory state-backend 非常吃内存)
write.task.max.size 和 write.merge.max_memory 同时调大 (默认是 1GB 和 100MB 可以调到 2GB和1GB)。
关注 TM 分配给每个 write task 的内存,保证每个 write task 能够分配到 write.task.max.size 所配置的大小, 比如 TM 的 内存是 4GB跑了 2个 StreamWriteFunction 那每个 write function 能分到 2GB, 尽量预留一些 buffer,因为网络buffer, TM上其它类型task(比如 BucketAssignFunction 也会吃些内存)
九. Hudi读取方式参数
Hudi读取方式参数:
开启 流读:
下面这个参数是每隔4秒去拉去数据(默认60秒)
增量读取:
限流:
如果将全量数据(百亿数量级) 和 增量数据先同步到kafka, 再通过flink流式 消费的方式将 库表 数据直接导成 hoodie 表,因为直接消费全量部分数据: 量大(吞吐高)、乱序严重(写入的partition随机),会导致写入性能退化,出现吞吐毛刺,这时候可以开启限速参数,保证流量平稳写入。