背景
本篇为Hudi概念和特性相关介绍。依据于官网和相关博客资料,融入了个人理解。内容可能会有疏漏,欢迎大家指正和补充。
Hudi概念
Apache Hudi是一个Data Lakes的开源方案,Hudi是Hadoop Updates and Incrementals的简写,它是由Uber开发并开源的Data Lakes解决方案。Hudi具有如下基本特性/能力:
- Hudi能够摄入(Ingest)和管理(Manage)基于HDFS之上的大型分析数据集,主要目的是高效的减少入库延时。
- Hudi基于Spark/Flink/Hive来对HDFS上的数据进行更新、插入、删除等。
- Hudi在HDFS数据集上提供如下流原语:插入更新(如何改变数据集);增量拉取(如何获取变更的数据)。
- Hudi可以对HDFS上的parquet格式数据进行插入/更新操作。
- Hudi通过自定义InputFormat与Hadoop生态系统(Spark、Hive、Parquet)集成。
- Hudi通过Savepoint来实现数据恢复。
来源:Hudi数据湖简介_阿福Chris的博客-CSDN博客_hudi 数据湖
时间线
https://hudi.apache.org/docs/next/timeline/
Hudi内部维护了时间线,支持按照数据的到达时间顺序来获取数据。Hudi确保时间线上的动作是原子性的。
Hudi 表类型
https://hudi.apache.org/docs/next/table_types/#table-and-query-types
Copy on Write
数据使用列存储形式存放,每次提交都会产生新版本的列存储文件。将原有数据文件copy一份,合并入变更的数据后保存一份新的数据。因此写入放大很高,读取放大基本为0。适用于读取多写入少的场景。写入存在延迟,Flink每次checkpoint或者Spark每次微批处理才会commit新数据。读取延迟很低。可以通过hoodie.cleaner.commits.retained
配置项确定需要保存的最近commit个数,防止存储空间占用无限放大。可以提供较好的并发控制,因为读取数据的时候,无法读取到正在写入但尚未commit或者是写入失败的数据。
Merge on Read
数据使用行存储(avro)和列存储(parquet)共同存放。其中新变更的数据使用行存储,历史数据采用列存储。每当满足一定条件的时候(经过n个commit或者经过特定时间)Hudi开始compact操作,将行存储的数据和列存储的合并,生成新的列存储文件。适用于写多读少的场景。写入延迟很小,读取的时候需要合并新文件avro和parquet,存在一定延迟。需要使用定期的online compaction或者是手工执行的offline compaction将avro格式和parquet格式文件合并。
通过配置项table.type
指定表类型。
两种表类型的特性对比:
Trade-off | CopyOnWrite | MergeOnRead |
---|---|---|
Data Latency | Higher | Lower |
Query Latency | Lower | Higher |
Update cost (I/O) | Higher (rewrite entire parquet) | Lower (append to delta log) |
Parquet File Size | Smaller (high update(I/0) cost) | Larger (low update cost) |
Write Amplification | Higher | Lower (depending on compaction strategy) |
个人理解:MOR表在compaction之后也会生成包含最新版本数据的parquet文件。也就是说它的最终数据保存形式和COW表完全相同。所以MOR表可以理解成是一种每提交N次才Copy on write的COW表。MOR表的中间结果使用log日志文件(行存储)保存。log中只保存变更的数据。新增的数据使用parquet保存。由于Hudi支持像文件优化策略,如果原来的parquet被判定为小文件,新增的数据和原有数据合并后生成新的parquet文件“替换”原有旧parquet文件。如果原来parquet不是小文件,或者说合并之后的数据量超过小文件判定标准,超出小文件判定标准的部分会分裂成新的parquet文件保存。
查询类型
https://hudi.apache.org/docs/next/table_types/#table-and-query-types
Snapshot Queries : Queries see the latest snapshot of the table as of a given commit or compaction action. In case of merge on read table, it exposes near-real time data(few mins) by merging the base and delta files of the latest file slice on-the-fly. For copy on write table, it provides a drop-in replacement for existing parquet tables, while providing upsert/delete and other write side features.
Incremental Queries : Queries only see new data written to the table, since a given commit/compaction. This effectively provides change streams to enable incremental data pipelines.
Read Optimized Queries : Queries see the latest snapshot of table as of a given commit/compaction action. Exposes only the base/columnar files in latest file slices and guarantees the same columnar query performance compared to a non-hudi columnar table.
总结:
- Snapshot Queries:MOR表合并avro和parquet文件之后输出结果,存在较大的查询延迟,但是数据没有延迟。COW表直接读取现有的新版本parquet文件,查询延迟低,但是有数据延迟(最新的数据可能没有写入,还没有commit)。
- Incremental Queries:只查询某次commit或compaction之后新插入或者修改的数据。
- Read Optimized Queries:仅适用于MOR表,为了优化MOR表的读取性能,此方式读取的时候不会强制合并avro和parquet文件。而是只读取parquet文件。这样查询延迟很低,但是数据存在延迟(在avro但是还没有compact的数据无法被查询到)。
Table Type | Supported Query types |
---|---|
Copy On Write | Snapshot Queries + Incremental Queries |
Merge On Read | Snapshot Queries + Incremental Queries + Read Optimized Queries |
通过hoodie.datasource.query.type
参数控制查询类型。配置项对应为:
- snapshot(默认值)
- read_optimized
- incremental
Trade-off | Snapshot | Read Optimized |
---|---|---|
Data Latency | Lower | Higher |
Query Latency | Higher (merge base / columnar file + row based delta / log files) | Lower (raw base / columnar file performance) |
索引
Hudi通过HoodieKey(recordKey和poartition path)和file id的对应关系来加速upsert操作。这正是Hudi的索引机制。这种对应关系在初始记录写入之后不会在改变。
对于COW表插入数据的场景,索引可以快速的过滤掉不涉及数据修改的file。对于MOR表插入数据的场景,索引能够很快定位到需要合并的文件。不需要像Hive ACID一样,合并所有的base file。
Hudi支持下面4种Index选项:
- Bloom Index(默认值):基于recordkey构建布隆过滤器,可以快速定位record key位于哪个file中。
- Simple:将更新/删除的数据同表中存储的数据的key做简单的join操作。
- HBase index:将索引映射存储到HBase中。
- 自己实现索引。
所有具有GLOBAL和非GLOBAL两种(HBase本来就是global的)。其中:
- global index确保一个表的所有分区都不会有重复的数据。但是更新的删除的代价会随着表中数据量的增长而加大。对于较小的表可以接受。
- 非global index只能确保一个表的同一分区之内不会存在重复数据。需要writer去确保某条记录的更新/删除操作对应的partition path始终一致。在查找index的时候性能更好。
使用场景:
- 事实表数据迟到场景:Bloom
- event表去重(通常只append数据):Bloom
- 维度表(通常不会分区)变更(变更量较少):SIMPLE或HBase
hoodie.index.type
用来修改Index选项。
注意:使用GLOBAL_BLOOM的时候需要留意hoodie.bloom.index.update.partition.path
配置。源码给出的解释如下:
/**
* Only applies if index type is GLOBAL_BLOOM.
* <p>
* When set to true, an update to a record with a different partition from its existing one
* will insert the record to the new partition and delete it from the old partition.
* <p>
* When set to false, a record will be updated to the old partition.
*/
设置为true的话,如果record更新操作修改了partition,则会在新partition插入这条数据,然后在旧partition删除这条数据。
如果设置为false,会在就parititon更新这条数据。
hoodie.simple.index.update.partition.path
对于GLOBAL_SIMPLE也同理。
文件结构
https://hudi.apache.org/docs/next/file_layouts/
- Hudi在分布式文件系统base path下的目录结构来保存表数据。
- 表被分解为一个个分区。
- 在每个分区中,文件以file group形式组织,每个都由独特的file id标识。
- 每个file group包含多个file slice。
- 每个slice包含一个base file(commit或者compact的时候创建)还有一系列的log file(MOR表的插入或修改)。
元数据表(Metadata Table)
https://hudi.apache.org/docs/next/metadata/
用于提高读写性能,避免使用list file操作。
0.10.1以后metadata table默认启用(hoodie.metadata.enable
)。
为了确保metadata table保持最新,针对同一张Hudi表的写操作需要根据不同的场景增加相应配置。
单个writer同步表服务(清理,聚簇,压缩),只需配置hoodie.metadata.enable=true
,重启writer。
单个writer异步表服务(同一进程内),需要配置乐观并发访问控制:
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider
多个writer(不同进程)异步表服务,需要配置乐观并发访问控制:
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=<distributed-lock-provider-classname>
外部分布式锁提供方有:ZookeeperBasedLockProvider
, HiveMetastoreBasedLockProvider
和DynamoDBBasedLockProvider
。
Hudi 写类型
https://hudi.apache.org/docs/next/write_operations/
- UPSERT : This is the default operation where the input records are first tagged as inserts or updates by looking up the index. The records are ultimately written after heuristics are run to determine how best to pack them on storage to optimize for things like file sizing. This operation is recommended for use-cases like database change capture where the input almost certainly contains updates. The target table will never show duplicates.
- INSERT : This operation is very similar to upsert in terms of heuristics/file sizing but completely skips the index lookup step. Thus, it can be a lot faster than upserts for use-cases like log de-duplication (in conjunction with options to filter duplicates mentioned below). This is also suitable for use-cases where the table can tolerate duplicates, but just need the transactional writes/incremental pull/storage management capabilities of Hudi.
- BULK_INSERT : Both upsert and insert operations keep input records in memory to speed up storage heuristics computations faster (among other things) and thus can be cumbersome for initial loading/bootstrapping a Hudi table at first. Bulk insert provides the same semantics as insert, while implementing a sort-based data writing algorithm, which can scale very well for several hundred TBs of initial load. However, this just does a best-effort job at sizing files vs guaranteeing file sizes like inserts/upserts do.
总结:
- UPSERT: 通过索引来区别数据是insert还是update。不会存在重复数据。
- INSERT: 跳过index查找过程。比upsert执行要快,但无法保证数据不重复。
- BULK_INSERT: upsert和insert使用内存缓存。upsert和insert操作对加载原始(存量)数据到Hudi中不是很合适。加载初始数据适合使用bulk insert方式。此方式使用基于排序的写入算法,适合TB级别数据的导入操作。减少序列化和数据合并操作。不能够提供数据去重功能。
- DELETE: 删除操作。Hudi支持软删除和硬删除。软删除指的是保留Hoodie key的同时将所有其他字段设置为null。需要表的schema允许其他所有字段为null,然后将其他所有字段upsert为null。硬删除指的是物理删除。
通过write.operation
配置项指定。
写入步骤
The following is an inside look on the Hudi write path and the sequence of events that occur during a write.
- Deduping(去重)
同一批中的数据先去重,需要combine或者reduce by key。 - Index Lookup(查询索引)
查找插入的数据位于哪些file slice中。 - File Sizing(文件大小控制)
Hudi基于之前提交数据的平均大小,制定计划将小文件中插入足够的数据,使它的大小接近配置的容量上限限制。 - Partitioning(分区)
确定update和insert的数据属于哪个file group,可能伴随新file group的创建。 - Write I/O(写入数据)
写入数据过程。根据表类型写入base file或者是log file。 - Update Index(更新索引)
Now that the write is performed, we will go back and update the index.
写入操作已完成,更新索引。 - Commit(提交)
原子提交所有的更改。 - Clean (if needed)(清理,如果需要的话)
如果需要,运行清理步骤。 - Compaction(压缩)
如果使用MOR表,压缩会同步运行,或者是异步调度。 - Archive(归档)
最后归档步骤将老旧的timeline项目移动到归档目录。
Schema变更
https://hudi.apache.org/docs/next/schema_evolution
试验功能,Spark 3.1.x和3.2.x支持Schema变更。
Key Generation
https://hudi.apache.org/docs/next/key_generation
Primary key由RecordKey和Partition path组成。
RecordKey由hoodie.datasource.write.recordkey.field
决定。Partition path由hoodie.datasource.write.partitionpath.field
决定。
并发控制
https://hudi.apache.org/docs/next/concurrency_control
Hudi支持MVCC和乐观并发访问两种方式。MVCC方式所有的table service都使用同一个writer来保证没有冲突,避免竟态条件。新版本的Hudi增加了乐观并发访问控制(OCC)。支持文件级别的乐观锁。需要依赖外部组件实现乐观锁,例如Zookeeper,Hive metastore等。
启用并发控制:
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=<lock-provider-classname>
使用Zookeeper分布式锁:
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
hoodie.write.lock.zookeeper.url
hoodie.write.lock.zookeeper.port
hoodie.write.lock.zookeeper.lock_key
hoodie.write.lock.zookeeper.base_path
使用HiveMetastore分布式锁:
hoodie.write.lock.provider=org.apache.hudi.hive.HiveMetastoreBasedLockProvider
hoodie.write.lock.hivemetastore.database
hoodie.write.lock.hivemetastore.table
禁用并发写入:
hoodie.write.concurrency.mode=single_writer
hoodie.cleaner.policy.failed.writes=EAGER