Hudi 使用之Flink读写

前言

本篇从Hudi角度介绍Flink常用的几种读写操作。内容主要来源于官网例子和个人理解。

读者如果想了解从编译、部署到使用的步骤,请参考:Flink 使用之 Hudi 编译部署、配置和使用

Flink读

参考链接:
Table & Query Types | Apache Hudi
SQL Queries | Apache Hudi

控制读类型的配置项为hoodie.datasource.query.type,具有如下三个值:

  • snapshot:快照读,是默认的行为,获取最新版本的全量数据。从行存储和列存储中获取最新的数据版本返回给用户。
  • incremental:增量读。读取某一段时间范围内的数据。用户如果配置了commit时间范围(起或者止,也可以两者均有),自动启动增量读模式,无需用户显式去配置此配置项。
  • read_optimized:读优化。仅基于列存储获取最新版本的数据。对于COW表而言获取的是最新数据,对于MOR表而言,行存储(log文件)中的数据会被忽略掉。直到compaction将log中的数据合并到列存储的时候,新变更的数据才能够被查询到。读优化模式读取数据的耗时要比快照读模式少。

Snapshot Query

默认的查询方式,查询表中已存在的全量数据。如果是MOR表,查询的时候不仅考虑parquet列存储的数据,还会合并avro log中的增量数据。可以启用data skipping加快读取速度。启用data skipping需要配置如下参数:

metadata.enabled = true
hoodie.metadata.index.column.stats.enable = true
read.data.skipping.enabled = true

此外还有一个配置项hoodie.metadata.index.column.stats.column.list,表示哪些字段的统计信息需要索引。如果不配置,默认为索引所有的字段。

Streaming Query

流模式增量读取。和批模式读取不同的是,流模式读取会一直持续不断的返回实时更新的结果。而批模式则在查询结束之后退出,往后再更新的数据,只有再次执行批模式读取的时候才能够获取到。
启用流模式读取需要配置read.streaming.enabledtrue
和流模式读取相关的配置项还有:

  • read.start-commit:从哪个commit开始读取。需要配置commit的时间戳,格式为yyyyMMddHHmmss。还可以配置为earliest,意为最早的commit。默认值为最近一次的commit(即lastest commit)。
  • read.streaming.skip_compaction:是否跳过compaction instant。跳过compaction instant的目的是防止数据重复和在开启changelog模式的情况下,保持其语义的正确性。

增量读取

增量读取字面上理解是只读取部分时间范围内的数据。可配合流模式读取使用,读取新摄入的数据。也可以配合批模式查询,指定起(read.start-commit)止(read.end-commit)时间范围,查询某时间范围内的数据历史值。也可以只指定read.end-commit,查询某个时间点之前的历史值,实现时间穿梭查询。需要注意的是时间范围是闭区间。
前面提及的配置项的解释如下:

  • read.start-commit:从哪个commit开始读取。需要配置commit的时间戳,格式为yyyyMMddHHmmss。还可以配置为earliest,意为最早的commit。默认值为最近一次的commit(即lastest commit)。
  • read.end-commit:读取截止到哪个commit,配置格式,默认值与read.start-commit相同。

CDC Query

Hudi 0.14.1版本开始支持。下游可以查询到数据的变更信息,更新前的数据和更新之后的数据。适用于Flink流式增量读取。CDC读取需要配置'cdc.enabled' = 'true'
使用例子:

CREATE TABLE hudi_table(  
ts BIGINT,  
uuid VARCHAR(40) PRIMARY KEY NOT ENFORCED,  
rider VARCHAR(20),  
driver VARCHAR(20),  
fare DOUBLE,  
city VARCHAR(20)  
)  
PARTITIONED BY (`city`)  
WITH (  
'connector' = 'hudi',  
'path' = 'file:///tmp/hudi_table',  
'table.type' = 'COPY_ON_WRITE',  
'cdc.enabled' = 'true' -- this option enable the cdc log enabled  
);
-- 插入初始数据
INSERT INTO hudi_table  
VALUES  
(1695159649087,'334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco'),  
(1695091554788,'e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70 ,'san_francisco'),  
(1695046462179,'9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90 ,'san_francisco'),  
(1695332066204,'1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco'),  
(1695516137016,'e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo'),  
(1695376420876,'7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40 ,'sao_paulo'),  
(1695173887231,'3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06 ,'chennai'),  
(1695115999911,'c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');
-- 设置为批运行模式
SET 'execution.runtime-mode' = 'batch';  

此时打开另一个Flink SQL client,执行:

CREATE TABLE hudi_table(
    ts BIGINT,
    uuid VARCHAR(40) PRIMARY KEY NOT ENFORCED,
    rider VARCHAR(20),
    driver VARCHAR(20),
    fare DOUBLE,
    city VARCHAR(20)
)
PARTITIONED BY (`city`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs:///hudi_table',
  'table.type' = 'COPY_ON_WRITE',
  'cdc.enabled' = 'true',
  'read.streaming.enabled'='true'
);
-- 设置执行模式为流模式
SET 'execution.runtime-mode' = 'streaming';  
-- 使用流模式查询hudi_table表
select * from hudi_table;

在此窗口等待执行结果。
然后在第一个窗口中执行:

UPDATE hudi_table SET fare = 25.0 WHERE uuid = '334e26e9-8355-45cc-97c6-c31daf0df330';

在第二个窗口中可观察到CDC查询结果。

Flink写

参考链接:Using Flink | Apache Hudi

Bulk insert

Bulk insert用户将来自其他数据源的初始数据导入到Hudi表中。避免index bootstrap步骤。适合将初始数据导入到Hudi中,是效率最高的写入方式。
需要注意的是:
bulk_insert可以减少数据序列化以及合并操作,于此同时,该数据写入方式会跳过数据去重,所以用户需要保证数据的唯一性。
bulk_insert在批量写入模式中更有效率。默认情况下,批量执行模式按照partition path对输入记录进行排序,然后将这些记录写入Hudi,该方式可以避免频繁切换文件句柄,导致写入性能下降。
bulk_insert通过write.tasks参数指定并行度,并行度影响小文件的数量。

和bulk insert相关的配置项有:

参数名称 是否必须 默认值 参数说明
write.operation true upsert 设置为 bulk_insert 开启bulk insert
write.tasks false 4 bulk_insert 并行度对应写入的文件数,但是有最大文件大小限制。实际写入的文件个数 >= write.bucket_assign.tasks
write.bulk_insert.shuffle_input false true 写入前是否根据分区字段进行数据重分布。 启用此选项将减少小文件的数量,可能导致数据倾斜
write.bulk_insert.sort_input false true 写入前是否根据分区字段对数据进行排序。 启用此选项将在写任务写多个分区时减少小文件的数量
write.bulk_insert.sort_input.by_record_key false false 写入前是否根据record_key字段对数据进行排序。
write.sort.memory false 128 排序算子可用托管内存。 默认为 128 MB

使用示例:

create table hudi.call_center  
    with (  
        'connector' = 'hudi',  
        'path' = 'hdfs:///call_center',  
        'table.type' = 'COPY_ON_WRITE',  
        'write.operation' = 'bulk_insert',  
        'hoodie.datasource.write.recordkey.field' = 'cc_call_center_sk'  
        )  
    like call_center;

Index bootstrap

载入表中存量数据的索引到Flink的state中,保存于checkpoint。通俗来说是为现有的Hudi表建立起索引,加快后面upsert的速度。

相关的配置参数:

参数名称 是否必须 默认值 参数说明
index.bootstrap.enabled true false 当启用index bootstrap功能时,将Hudi表中的记录全部加载到Flink state中
index.partition.regex false * 优化参数,设置正则表达式来过滤分区。 默认所有分区都加载到Flink状态
write.index_bootstrap.tasks false write.tasks值相同 决定index bootstrap过程的task数量,影响index bootstrap的速度

使用方法:

  1. CREATE TABLE创建与Hudi表,table.type必须正确配置。
  2. 设置index.bootstrap.enabled = true启用index bootstrap功能。
  3. flink-conf.yaml文件中启用 checkpoint,设置execution.checkpointing.tolerable-failed-checkpoints = n(取决于Flink checkpoint执行时间)。因为index bootstrap时间可能会很长,只有在index bootstrap完成的时候,checkpoint才能够成功。
  4. 等待checkpoint第一次成功,预示着index bootstrap完成。
  5. 在index bootstrap完成后,用户可以退出并保存savepoint。
  6. 重启任务,设置index.bootstrap.enablefalse

注意事项:

  1. Index bootstrap过程是阻塞的,期间无法完成checkpoint。
  2. Index bootstrap由输入数据触发。 用户需要确保每个分区中至少有一条记录。
  3. Index bootstrap并发执行。用户可以在日志文件中查找finish loading the index under partition以及Load record form file相关字眼,跟踪index bootstrap的进度。
  4. Checkpoint第一次成功表明index bootstrap已完成。 如果从checkpoint恢复,不需要重复索引过程。

Changelog模式

Hudi可以记录数据的中间状态(I / -U / U / D) ,类似于Flink的changelog stream。Hudi MOR表以行的形式存储,支持保留变更状态信息。
启用changelog模式需要在表中开启changelog.enabled=true配置项。开启之后数据变更的中间结果都会被保留下来。
注意:

  1. 批量读方式任然会合并中间结果,无论是否启用changelog。
  2. 启用changelog模式Hudi也只是尽力去保留中间变更数据。异步压缩会将changelog数据合并为最终结果。所以说如果数据没有被及时消费掉,那么这条数据只能读取到它的最终状态。为了缓解这种情况,可以配合设置compaction.delta_commits 和/或 compaction.delta_seconds,让compaction间隔时间加大,从而增加中间变更数据的保留时间。clean.retain_commits可以控制clean操作保留最近多少个commit。保留的commit个数越多,changelog记录保留的时间越长,容许下游延迟消费的能力越强。

Upsert和Insert

Upsert和Insert都是插入数据操作。不同之处在于upsert比insert多了数据去重的功能。Upsert在插入数据之前会查询已存在的数据在哪个数据文件(根据表schema定义的primary key字段去确认数据是否存在),然后去更新这条数据。Insert操作没有查询已存在数据和更新数据的行为,直接将新数据插入,因此会出现重复数据。Insert相比upsert操作速度更快。
Upsert和Insert使用write.operation配置项控制。默认为upsert。
下面的例子我们修改默认的数据写入操作为insert:

CREATE TABLE hudi_table(
    ts BIGINT,
    uuid VARCHAR(40) PRIMARY KEY NOT ENFORCED,
    rider VARCHAR(20),
    driver VARCHAR(20),
    fare DOUBLE,
    city VARCHAR(20)
)
PARTITIONED BY (`city`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs:///hudi_table',
  'table.type' = 'COPY_ON_WRITE',
  'write.operation' = 'insert'
);
-- 使用insert模式插入uuid字段重复的数据,不会更新已有数据,uuid重复的数据会插入
insert into hudi_table values(...);
-- 查询时发现新插入的uuid重复的数据和原有数据可以共存
select * from hudi_table;

对于Insert模式,MOR表使用小文件合并策略,新数据优先追加在最小的log文件中;COW表会直接写入parquet文件,不适用任何的小文件合并策略。如果我们需要在COW insert模式启动小文件和并策略的话,需要设置write.insert.clustertrue,启用inline clustering。

参考链接:Using Flink | Apache Hudi

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容