背景
Spark/Flink可以使用Hive的metastore,但是Hive无法通过Hive metastore中的Spark/Flink表直接查询数据。为了解决这个问题,可以配置使用Hive sync。在Spark/Flink操作表的时候,自动同步Hive的元数据。这样就可以通过Hive查询Hudi表的内容。
Hive metastore通过目录结构的来维护元数据,数据的更新是通过覆盖来保证事务。但是数据湖是通过追踪文件来管理元数据,一个目录中可以包含多个版本的文件。这一点和Hive元数据管理是不同的。所以说为了兼容Hive metastore,Hudi需要实时从Timeline同步元数据到Hive metastore。
环境信息
- Flink 1.17.2
- Hudi 0.15.0
- Spark 3.3.2
- Hive 3.1.2
Hive 兼容Hudi格式
复制编译后的packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.11.1.jar
到各节点Hive安装目录的auxlib
目录中。
进入beeline
后执行:
set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
或者
set hive.input.format = org.apache.hadoop.hive.ql.io.HiveInputFormat;
set hive.stats.autogather = false;
然后再查询Hive表。
Hive查询遇到的错误
向量化执行问题
在Hive中执行有聚合函数的Hive SQL会发生问题。例如select count(*) from t1
。报错内容为:
java.lang.ClassCastException: org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch cannot be cast to org.apache.hadoop.io.ArrayWritable
原因是Hive启用了向量化查询。要解决这个问题需要关闭向量化查询。方法为查询前执行:
set hive.vectorized.execution.enabled = false;
set hive.vectorized.execution.reduce.enabled = false;
Flink Hudi Hive Sync
Hudi Flink Bundle的编译要求:如果要使用Hive Sync功能,编译时候需要激活flink-bundle-shade-hive3
profile。编译命令如下所示:
mvn clean package -Dflink1.15 -Dscala2.12 -DskipTests -Pflink-bundle-shade-hive3
Flink的配置要求:Hive Sync只有在Flink checkpoint的时候才会触发。因此必须要配置checkpoint。经验证state.backend.type
默认的hashmap
无法触发Hive Sync。需要按照如下方式配置:
execution.checkpointing.interval: 3s
state.backend.type: rocksdb
state.checkpoints.dir: hdfs:///flink-checkpoints
为了演示方便取用较短的checkpoint时间间隔。
Flink Hive Sync支持两种模式连接Hive:
- Hive Metastore(hms): 连接Hive Metastore 9083端口。
- JDBC: 连接HiveServer 10000端口。
两种使用方式如下所示:
-- hms 模式模板
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = '${db_path}/t1',
'table.type' = 'COPY_ON_WRITE', -- 如果使用MERGE_ON_READ,只有生成parquet文件之后,Hive才能查出数据
'hive_sync.enable' = 'true', -- 必须。启用Hive sync
'hive_sync.mode' = 'hms', -- 必须。设置模式未hms,默认为jdbc
'hive_sync.metastore.uris' = 'thrift://${ip}:9083' -- 必须。端口需要在 hive-site.xml上配置
);
-- jdbc 模式模板
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = '${db_path}/t1',
'table.type' = 'COPY_ON_WRITE', -- 如果使用MERGE_ON_READ,只有生成parquet文件之后,Hive才能查出数据
'hive_sync.enable' = 'true', -- 必须。启用Hive sync
'hive_sync.mode' = 'jdbc', -- 必须。设置模式未hms,默认为jdbc
'hive_sync.metastore.uris' = 'thrift://${ip}:9083', -- 必须。端口需要在hive-site.xml上配置
'hive_sync.jdbc_url'='jdbc:hive2://${ip}:10000', -- 必须。hiveServer端口
'hive_sync.table'='${table_name}', -- 必须。同步过去的hive表名
'hive_sync.db'='${db_name}', -- 必须。同步过去的hive表所在数据库名
'hive_sync.username'='${user_name}', -- 必须。JDBC 用户名
'hive_sync.password'='${password}' -- 必须。JDBC 密码
);
我们可以使用hive_sync.db
指定同步到Hive中表所在的database,hive_sync.table
指定同步到Hive中的表名。如果没有配置,它们的默认值参见org/apache/hudi/table/catalog/HoodieHiveCatalog.java
:
newOptions.computeIfAbsent(FlinkOptions.HIVE_SYNC_DB.key(), k -> tablePath.getDatabaseName());
newOptions.computeIfAbsent(FlinkOptions.HIVE_SYNC_TABLE.key(), k -> tablePath.getObjectName());
由上面源代码可知hive_sync.db
默认为hudi表在Flink中的database名称,Flink中默认为default_database
,hive_sync.table
默认为Flink中的Hudi表名称。
接下来举一个例子。我们使用HMS方式配置Hive Sync:
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs:///zy/hudi/',
'table.type' = 'COPY_ON_WRITE',
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://manager127:9083',
'hive_sync.table'='t1',
'hive_sync.db'='default'
);
-- 插入测试数据
INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
稍等一会儿查看Flink Web Dashboard的jobmanager日志,如果发现类似如下:
INFO org.apache.hudi.sink.StreamWriteOperatorCoordinator [] - Executor executes action [sync hive metadata for instant 20240710084456072] success!
INFO org.apache.hudi.hive.HiveSyncTool [] - Sync complete for t1
说明Hive sync已经触发。前面Hive Sync的配置没有问题。
然后我们进入beeline,执行:
use default_database;
show tables;
我们可以看到同步过来的t1
表。
然后执行:
select * from t1;
可以从Hive中查出Hudi表数据。
上面例子中的Hive Sync表是COW类型。如果Hive Sync的是MOR类型表。同步之后会出现t1
,t1_ro
和t1_rt
三个表。它们的区别和用途参见读优化表和实时表一节。
FAQ
NoSuchMethodError问题
如果执行Flink的时候遇到如下错误(Hudi 0.11.1和Flink 1.13.2曾经遇到):
java.lang.NoSuchMethodError: org.apache.parquet.schema.Types$PrimitiveBuilder.as(Lorg/apache/parquet/schema/LogicalTypeAnnotation;)Lorg/apache/parquet/schema/Types$Builder
需要修改packaging/hudi-flink-bundle/pom.xml
,在relocations
标签中加入:
<relocation>
<pattern>org.apache.parquet</pattern>
<shadedPattern>${flink.bundle.shade.prefix}org.apache.parquet</shadedPattern>
</relocation>
然后重新编译。
参考链接:
https://github.com/apache/hudi/issues/3042
Hive Sync时候的报错
近期使用Flink配合Hudi发现表数据可以成功插入但是没有同步到hive表。
Flink Web Dashboard查看JobManager日志,发现HMSDDLExecutor
第86行执行遇到错误:
java.lang.NoClassDefFoundError: org/apache/calcite/plan/RelOptRule
解决方法:
复制$FLINK_HOME/opt/flink-table-planner_2.12-1.17.2.jar
到$FLINK_HOME/lib
,然后删除或移走$FLINK_HOME/lib/flink-table-planner-loader-1.17.2.jar
。再重新启动Yarn session和SQL client。插入数据后可以看到如下日志:
org.apache.hudi.hive.HiveSyncTool [] - Syncing target hoodie table with hive table(default_database.t1). Hive metastore URL from HiveConf:thrift://ip:9083). Hive metastore URL from HiveSyncConfig:thrift://ip:9083, basePath :/hudi/t1
说明Hive sync成功。可在beeline中使用:
select * from default_database.t1;
查询表中的内容。
java.lang.ClassNotFoundException: org.apache.hadoop.mapred.JobConf
复制如下Hadoop依赖到$FLINK_HOME/lib
目录中:
hadoop-common-3.1.1.3.0.1.0-187.jar
hadoop-mapreduce-client-common-3.1.1.3.0.1.0-187.jar
hadoop-mapreduce-client-core-3.1.1.3.0.1.0-187.jar
hadoop-mapreduce-client-jobclient-3.1.1.3.0.1.0-187.jar
以jar包名字为准,版本号可能和实际环境不同。
Spark Hudi Hive Sync
Spark Hive Sync目前只支持DataFrame API。下面使用官网的例子插入数据到hudi_cow
表:
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val tableName = "hudi_cow"
val basePath = "/user/hive/warehouse/hudi_cow"
val schema = StructType(Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
StructField("preComb", LongType,true),
StructField("name", StringType,true),
StructField("versionId", StringType,true),
StructField("toBeDeletedStr", StringType,true),
StructField("intToLong", IntegerType,true),
StructField("longToInt", LongType,true)
))
val data0 = Seq(Row("row_1", "2021/01/01",0L,"bob","v_0","toBeDel0",0,1000000L),
Row("row_2", "2021/01/01",0L,"john","v_0","toBeDel0",0,1000000L),
Row("row_3", "2021/01/02",0L,"tom","v_0","toBeDel0",0,1000000L))
var dfFromData0 = spark.createDataFrame(data0,schema)
dfFromData0.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option(TABLE_NAME, tableName).
option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL).
option(OPERATION_OPT_KEY, "upsert").
option("hoodie.index.type","SIMPLE").
option("hoodie.datasource.write.hive_style_partitioning","true").
option("hoodie.datasource.hive_sync.jdbcurl","jdbc:hive2://manager127:10000/").
option("hoodie.datasource.hive_sync.database","default").
option("hoodie.datasource.hive_sync.table","hudi_cow").
option("hoodie.datasource.hive_sync.partition_fields","partitionId").
option("hoodie.datasource.hive_sync.enable","true").
option("hoodie.datasource.hive_sync.username","hdfs").
mode(Overwrite).
save(basePath)
Spark Hudi Hive Sync配置项含义如下:
- hoodie.datasource.hive_sync.jdbcurl: Hive metastore连接JDBC URL。
- hoodie.datasource.hive_sync.database: Hive database名称。
- hoodie.datasource.hive_sync.table: Hive table名称。
- hoodie.datasource.hive_sync.partition_fields: Hive分区字段名。
- hoodie.datasource.hive_sync.enable: 是否启动Hive sync。
- hoodie.datasource.hive_sync.username:访问Hive时使用的用户名。
- hoodie.datasource.hive_sync.password:访问Hive使用的用户对应的密码。
和Flink一样,执行成功后可以使用Hive通过beeline
查询Hudi表数据。
读优化表和实时表
MOR表元数据同步到Hive会出现一个ro表和一个rt表(RT表只有MOR表同步到Hive之后才会生成)。例如原Hudi表的名字为test
,同步到Hive之后会出现如下3个表:
- test
- test_ro
- test_rt
其中ro表为读优化表。在读取的时候忽略log文件,只读取base file(parquet)文件的内容,相当于忽略了上次compaction之后的更新数据(和snapshot query)类似。读取速度较快。
rt为实时表,在读取的时候会合并log文件和base file,返回最新版本的数据。读取速度较慢。
RO表(Read Optimized Table)特点:
- 压缩后的Parquet文件:RO表只暴露压缩后的Parquet文件,这意味着查询性能被优化,因为Parquet是一种列式存储格式,适合分析型查询。
- 查询性能:由于RO表优化了读取操作,它适合用于批量查询和分析场景。
-
查询方式:RO表的查询方式与普通Hive表类似,需要设置
hive.input.format
为org.apache.hadoop.hive.ql.io.HiveInputFormat
或者org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat
来正确查询数据。
RT表(Real-Time Table)特点:
- 实时性:RT表提供了对Hudi表最新数据的实时视图,可以立即看到最近的写入操作。
- 增量查询:RT表支持增量查询,允许用户查询自上次查询以来的数据变更,这对于需要实时监控数据变化的场景非常有用。
-
查询模式:在查询RT表时,用户可以设置
hive.input.format
为org.apache.hadoop.hive.ql.io.HiveInputFormat
,并且需要设置增量查询相关的参数,如hoodie.mytableName.consume.mode
、hoodie.mytableName.consume.start.timestamp
和hoodie.mytableName.consume.max.commits
。 - 适用场景:RT表适合用于低延迟的数据访问和实时数据分析。
Hive增量查询RT表时候有如下配置项:
# 默认为SNAPSHOT,非增量,设置为INCREMENTAL模式支持增量读取
set hoodie.mytableName.consume.mode=INCREMENTAL;
# 指定增量查询范围是下面timestamp之后的n次commit的数据
# 如果设置为-1,则增量范围是下面timestamp之后的所有数据
set hoodie.mytableName.consume.max.commits=3;
# 指定从哪个timestamp开始增量查询
set hoodie.mytableName.consume.start.timestamp=commitTime;
需要注意的是,仅设置hoodie.mytableName.consume.start.timestamp
是不够的,查询的时候需要指定where
子句,限制_hoodie_commit_time
的范围。例如:
select * from mytableName where _hoodie_commit_time > someTimestamp;
因为Hudi会定期将新老数据合并(compaction和小文件合并机制),hoodie.mytableName.consume.start.timestamp
配置项是文件级别的过滤,对于合并之后的数据,文件级别的过滤无法精确过滤出满足条件的数据(文件级别过滤的作用是粗筛出不符合条件的文件,减少Hive的map数量,加快计算速度),因此SQL中仍需要加入where条件判断。
Kerberos环境Hive Sync
对于Flink引擎,需要额外配置属性hive_sync.conf.dir
,指向hive-site.xml
配置文件所在的目录。例如:
'hive_sync.conf.dir' = '/path/to/hive/conf'
Spark引擎目前没有找到类似的配置项。