前言
本篇从Hudi角度介绍Flink常用的几种读写操作。内容主要来源于官网例子和个人理解。
读者如果想了解从编译、部署到使用的步骤,请参考:Flink 使用之 Hudi 编译部署、配置和使用
Flink读
参考链接:
Table & Query Types | Apache Hudi
SQL Queries | Apache Hudi
控制读类型的配置项为hoodie.datasource.query.type
,具有如下三个值:
- snapshot:快照读,是默认的行为,获取最新版本的全量数据。从行存储和列存储中获取最新的数据版本返回给用户。
- incremental:增量读。读取某一段时间范围内的数据。用户如果配置了commit时间范围(起或者止,也可以两者均有),自动启动增量读模式,无需用户显式去配置此配置项。
- read_optimized:读优化。仅基于列存储获取最新版本的数据。对于COW表而言获取的是最新数据,对于MOR表而言,行存储(log文件)中的数据会被忽略掉。直到compaction将log中的数据合并到列存储的时候,新变更的数据才能够被查询到。读优化模式读取数据的耗时要比快照读模式少。
Snapshot Query
默认的查询方式,查询表中已存在的全量数据。如果是MOR表,查询的时候不仅考虑parquet列存储的数据,还会合并avro log中的增量数据。可以启用data skipping加快读取速度。启用data skipping需要配置如下参数:
metadata.enabled = true
hoodie.metadata.index.column.stats.enable = true
read.data.skipping.enabled = true
此外还有一个配置项hoodie.metadata.index.column.stats.column.list
,表示哪些字段的统计信息需要索引。如果不配置,默认为索引所有的字段。
Streaming Query
流模式增量读取。和批模式读取不同的是,流模式读取会一直持续不断的返回实时更新的结果。而批模式则在查询结束之后退出,往后再更新的数据,只有再次执行批模式读取的时候才能够获取到。
启用流模式读取需要配置read.streaming.enabled
为true
。
和流模式读取相关的配置项还有:
- read.start-commit:从哪个commit开始读取。需要配置commit的时间戳,格式为
yyyyMMddHHmmss
。还可以配置为earliest
,意为最早的commit。默认值为最近一次的commit(即lastest commit)。 - read.streaming.skip_compaction:是否跳过compaction instant。跳过compaction instant的目的是防止数据重复和在开启changelog模式的情况下,保持其语义的正确性。
增量读取
增量读取字面上理解是只读取部分时间范围内的数据。可配合流模式读取使用,读取新摄入的数据。也可以配合批模式查询,指定起(read.start-commit
)止(read.end-commit
)时间范围,查询某时间范围内的数据历史值。也可以只指定read.end-commit
,查询某个时间点之前的历史值,实现时间穿梭查询。需要注意的是时间范围是闭区间。
前面提及的配置项的解释如下:
- read.start-commit:从哪个commit开始读取。需要配置commit的时间戳,格式为
yyyyMMddHHmmss
。还可以配置为earliest
,意为最早的commit。默认值为最近一次的commit(即lastest commit)。 - read.end-commit:读取截止到哪个commit,配置格式,默认值与
read.start-commit
相同。
CDC Query
Hudi 0.14.1版本开始支持。下游可以查询到数据的变更信息,更新前的数据和更新之后的数据。适用于Flink流式增量读取。CDC读取需要配置'cdc.enabled' = 'true'
。
使用例子:
CREATE TABLE hudi_table(
ts BIGINT,
uuid VARCHAR(40) PRIMARY KEY NOT ENFORCED,
rider VARCHAR(20),
driver VARCHAR(20),
fare DOUBLE,
city VARCHAR(20)
)
PARTITIONED BY (`city`)
WITH (
'connector' = 'hudi',
'path' = 'file:///tmp/hudi_table',
'table.type' = 'COPY_ON_WRITE',
'cdc.enabled' = 'true' -- this option enable the cdc log enabled
);
-- 插入初始数据
INSERT INTO hudi_table
VALUES
(1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),
(1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70 ,'san_francisco'),
(1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90 ,'san_francisco'),
(1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),
(1695516137016,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'),
(1695376420876,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40 ,'sao_paulo'),
(1695173887231,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06 ,'chennai'),
(1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');
-- 设置为批运行模式
SET 'execution.runtime-mode' = 'batch';
此时打开另一个Flink SQL client,执行:
CREATE TABLE hudi_table(
ts BIGINT,
uuid VARCHAR(40) PRIMARY KEY NOT ENFORCED,
rider VARCHAR(20),
driver VARCHAR(20),
fare DOUBLE,
city VARCHAR(20)
)
PARTITIONED BY (`city`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs:///hudi_table',
'table.type' = 'COPY_ON_WRITE',
'cdc.enabled' = 'true',
'read.streaming.enabled'='true'
);
-- 设置执行模式为流模式
SET 'execution.runtime-mode' = 'streaming';
-- 使用流模式查询hudi_table表
select * from hudi_table;
在此窗口等待执行结果。
然后在第一个窗口中执行:
UPDATE hudi_table SET fare = 25.0 WHERE uuid = '334e26e9-8355-45cc-97c6-c31daf0df330';
在第二个窗口中可观察到CDC查询结果。
Flink写
参考链接:Using Flink | Apache Hudi
Bulk insert
Bulk insert用户将来自其他数据源的初始数据导入到Hudi表中。避免index bootstrap步骤。适合将初始数据导入到Hudi中,是效率最高的写入方式。
需要注意的是:
bulk_insert可以减少数据序列化以及合并操作,于此同时,该数据写入方式会跳过数据去重,所以用户需要保证数据的唯一性。
bulk_insert在批量写入模式中更有效率。默认情况下,批量执行模式按照partition path对输入记录进行排序,然后将这些记录写入Hudi,该方式可以避免频繁切换文件句柄,导致写入性能下降。
bulk_insert通过write.tasks
参数指定并行度,并行度影响小文件的数量。
和bulk insert相关的配置项有:
参数名称 | 是否必须 | 默认值 | 参数说明 |
---|---|---|---|
write.operation |
true |
upsert |
设置为 bulk_insert 开启bulk insert |
write.tasks |
false |
4 |
bulk_insert 并行度对应写入的文件数,但是有最大文件大小限制。实际写入的文件个数 >= write.bucket_assign.tasks
|
write.bulk_insert.shuffle_input |
false |
true |
写入前是否根据分区字段进行数据重分布。 启用此选项将减少小文件的数量,可能导致数据倾斜 |
write.bulk_insert.sort_input |
false |
true |
写入前是否根据分区字段对数据进行排序。 启用此选项将在写任务写多个分区时减少小文件的数量 |
write.bulk_insert.sort_input.by_record_key |
false |
false |
写入前是否根据record_key字段对数据进行排序。 |
write.sort.memory |
false |
128 |
排序算子可用托管内存。 默认为 128 MB |
使用示例:
create table hudi.call_center
with (
'connector' = 'hudi',
'path' = 'hdfs:///call_center',
'table.type' = 'COPY_ON_WRITE',
'write.operation' = 'bulk_insert',
'hoodie.datasource.write.recordkey.field' = 'cc_call_center_sk'
)
like call_center;
Index bootstrap
载入表中存量数据的索引到Flink的state中,保存于checkpoint。通俗来说是为现有的Hudi表建立起索引,加快后面upsert的速度。
相关的配置参数:
参数名称 | 是否必须 | 默认值 | 参数说明 |
---|---|---|---|
index.bootstrap.enabled |
true |
false |
当启用index bootstrap功能时,将Hudi表中的记录全部加载到Flink state中 |
index.partition.regex |
false |
* |
优化参数,设置正则表达式来过滤分区。 默认所有分区都加载到Flink状态 |
write.index_bootstrap.tasks |
false |
和write.tasks 值相同 |
决定index bootstrap过程的task数量,影响index bootstrap的速度 |
使用方法:
-
CREATE TABLE
创建与Hudi表,table.type
必须正确配置。 - 设置
index.bootstrap.enabled = true
启用index bootstrap功能。 - 在
flink-conf.yaml
文件中启用 checkpoint,设置execution.checkpointing.tolerable-failed-checkpoints = n
(取决于Flink checkpoint执行时间)。因为index bootstrap时间可能会很长,只有在index bootstrap完成的时候,checkpoint才能够成功。 - 等待checkpoint第一次成功,预示着index bootstrap完成。
- 在index bootstrap完成后,用户可以退出并保存savepoint。
- 重启任务,设置
index.bootstrap.enable
为false
。
注意事项:
- Index bootstrap过程是阻塞的,期间无法完成checkpoint。
- Index bootstrap由输入数据触发。 用户需要确保每个分区中至少有一条记录。
- Index bootstrap并发执行。用户可以在日志文件中查找
finish loading the index under partition
以及Load record form file
相关字眼,跟踪index bootstrap的进度。 - Checkpoint第一次成功表明index bootstrap已完成。 如果从checkpoint恢复,不需要重复索引过程。
Changelog模式
Hudi可以记录数据的中间状态(I / -U / U / D) ,类似于Flink的changelog stream。Hudi MOR表以行的形式存储,支持保留变更状态信息。
启用changelog模式需要在表中开启changelog.enabled=true
配置项。开启之后数据变更的中间结果都会被保留下来。
注意:
- 批量读方式任然会合并中间结果,无论是否启用changelog。
- 启用changelog模式Hudi也只是尽力去保留中间变更数据。异步压缩会将changelog数据合并为最终结果。所以说如果数据没有被及时消费掉,那么这条数据只能读取到它的最终状态。为了缓解这种情况,可以配合设置
compaction.delta_commits
和/或compaction.delta_seconds
,让compaction间隔时间加大,从而增加中间变更数据的保留时间。clean.retain_commits
可以控制clean操作保留最近多少个commit。保留的commit个数越多,changelog记录保留的时间越长,容许下游延迟消费的能力越强。
Upsert和Insert
Upsert和Insert都是插入数据操作。不同之处在于upsert比insert多了数据去重的功能。Upsert在插入数据之前会查询已存在的数据在哪个数据文件(根据表schema定义的primary key字段去确认数据是否存在),然后去更新这条数据。Insert操作没有查询已存在数据和更新数据的行为,直接将新数据插入,因此会出现重复数据。Insert相比upsert操作速度更快。
Upsert和Insert使用write.operation
配置项控制。默认为upsert。
下面的例子我们修改默认的数据写入操作为insert:
CREATE TABLE hudi_table(
ts BIGINT,
uuid VARCHAR(40) PRIMARY KEY NOT ENFORCED,
rider VARCHAR(20),
driver VARCHAR(20),
fare DOUBLE,
city VARCHAR(20)
)
PARTITIONED BY (`city`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs:///hudi_table',
'table.type' = 'COPY_ON_WRITE',
'write.operation' = 'insert'
);
-- 使用insert模式插入uuid字段重复的数据,不会更新已有数据,uuid重复的数据会插入
insert into hudi_table values(...);
-- 查询时发现新插入的uuid重复的数据和原有数据可以共存
select * from hudi_table;
对于Insert模式,MOR表使用小文件合并策略,新数据优先追加在最小的log文件中;COW表会直接写入parquet文件,不适用任何的小文件合并策略。如果我们需要在COW insert模式启动小文件和并策略的话,需要设置write.insert.cluster
为true
,启用inline clustering。