Hudi Hive sync 使用

背景

Spark/Flink可以使用Hive的metastore,但是Hive无法通过Hive metastore中的Spark/Flink表直接查询数据。为了解决这个问题,可以配置使用Hive sync。在Spark/Flink操作表的时候,自动同步Hive的元数据。这样就可以通过Hive查询Hudi表的内容。

Hive metastore通过目录结构的来维护元数据,数据的更新是通过覆盖来保证事务。但是数据湖是通过追踪文件来管理元数据,一个目录中可以包含多个版本的文件。这一点和Hive元数据管理是不同的。所以说为了兼容Hive metastore,Hudi需要实时从Timeline同步元数据到Hive metastore。

环境信息

  • Flink 1.17.2
  • Hudi 0.15.0
  • Spark 3.3.2
  • Hive 3.1.2

Hive 兼容Hudi格式

复制编译后的packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.11.1.jar到各节点Hive安装目录的auxlib目录中。

进入beeline后执行:

set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;

或者

set hive.input.format = org.apache.hadoop.hive.ql.io.HiveInputFormat; 
set hive.stats.autogather = false;

然后再查询Hive表。

Hive查询遇到的错误

向量化执行问题

在Hive中执行有聚合函数的Hive SQL会发生问题。例如select count(*) from t1。报错内容为:

java.lang.ClassCastException: org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch cannot be cast to org.apache.hadoop.io.ArrayWritable

原因是Hive启用了向量化查询。要解决这个问题需要关闭向量化查询。方法为查询前执行:

set hive.vectorized.execution.enabled = false;
set hive.vectorized.execution.reduce.enabled = false;

Flink Hudi Hive Sync

Hudi Flink Bundle的编译要求:如果要使用Hive Sync功能,编译时候需要激活flink-bundle-shade-hive3profile。编译命令如下所示:

mvn clean package -Dflink1.15 -Dscala2.12 -DskipTests -Pflink-bundle-shade-hive3

Flink的配置要求:Hive Sync只有在Flink checkpoint的时候才会触发。因此必须要配置checkpoint。经验证state.backend.type默认的hashmap无法触发Hive Sync。需要按照如下方式配置:

execution.checkpointing.interval: 3s
state.backend.type: rocksdb
state.checkpoints.dir: hdfs:///flink-checkpoints

为了演示方便取用较短的checkpoint时间间隔。

Flink Hive Sync支持两种模式连接Hive:

  • Hive Metastore(hms): 连接Hive Metastore 9083端口。
  • JDBC: 连接HiveServer 10000端口。

两种使用方式如下所示:

-- hms 模式模板
CREATE TABLE t1(
  uuid VARCHAR(20),
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = '${db_path}/t1',
  'table.type' = 'COPY_ON_WRITE',  -- 如果使用MERGE_ON_READ,只有生成parquet文件之后,Hive才能查出数据
  'hive_sync.enable' = 'true',     -- 必须。启用Hive sync
  'hive_sync.mode' = 'hms',        -- 必须。设置模式未hms,默认为jdbc
  'hive_sync.metastore.uris' = 'thrift://${ip}:9083' -- 必须。端口需要在 hive-site.xml上配置
);


-- jdbc 模式模板
CREATE TABLE t1(
  uuid VARCHAR(20),
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = '${db_path}/t1',
  'table.type' = 'COPY_ON_WRITE',  -- 如果使用MERGE_ON_READ,只有生成parquet文件之后,Hive才能查出数据
  'hive_sync.enable' = 'true',     -- 必须。启用Hive sync
  'hive_sync.mode' = 'jdbc',       -- 必须。设置模式未hms,默认为jdbc
  'hive_sync.metastore.uris' = 'thrift://${ip}:9083', -- 必须。端口需要在hive-site.xml上配置
  'hive_sync.jdbc_url'='jdbc:hive2://${ip}:10000',    -- 必须。hiveServer端口
  'hive_sync.table'='${table_name}',                  -- 必须。同步过去的hive表名
  'hive_sync.db'='${db_name}',                        -- 必须。同步过去的hive表所在数据库名
  'hive_sync.username'='${user_name}',                -- 必须。JDBC 用户名
  'hive_sync.password'='${password}'                  -- 必须。JDBC 密码
);

我们可以使用hive_sync.db指定同步到Hive中表所在的database,hive_sync.table指定同步到Hive中的表名。如果没有配置,它们的默认值参见org/apache/hudi/table/catalog/HoodieHiveCatalog.java

newOptions.computeIfAbsent(FlinkOptions.HIVE_SYNC_DB.key(), k -> tablePath.getDatabaseName());  
newOptions.computeIfAbsent(FlinkOptions.HIVE_SYNC_TABLE.key(), k -> tablePath.getObjectName());

由上面源代码可知hive_sync.db默认为hudi表在Flink中的database名称,Flink中默认为default_databasehive_sync.table默认为Flink中的Hudi表名称。

接下来举一个例子。我们使用HMS方式配置Hive Sync:

CREATE TABLE t1(
  uuid VARCHAR(20),
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs:///zy/hudi/',
  'table.type' = 'COPY_ON_WRITE',
  'hive_sync.enable' = 'true',
  'hive_sync.mode' = 'hms',
  'hive_sync.metastore.uris' = 'thrift://manager127:9083',
  'hive_sync.table'='t1', 
  'hive_sync.db'='default'
);

-- 插入测试数据

INSERT INTO t1 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

稍等一会儿查看Flink Web Dashboard的jobmanager日志,如果发现类似如下:

INFO  org.apache.hudi.sink.StreamWriteOperatorCoordinator          [] - Executor executes action [sync hive metadata for instant 20240710084456072] success!
INFO  org.apache.hudi.hive.HiveSyncTool                            [] - Sync complete for t1

说明Hive sync已经触发。前面Hive Sync的配置没有问题。

然后我们进入beeline,执行:

use default_database;
show tables;

我们可以看到同步过来的t1表。

然后执行:

select * from t1;

可以从Hive中查出Hudi表数据。

上面例子中的Hive Sync表是COW类型。如果Hive Sync的是MOR类型表。同步之后会出现t1t1_rot1_rt三个表。它们的区别和用途参见读优化表和实时表一节。

FAQ

NoSuchMethodError问题

如果执行Flink的时候遇到如下错误(Hudi 0.11.1和Flink 1.13.2曾经遇到):

java.lang.NoSuchMethodError: org.apache.parquet.schema.Types$PrimitiveBuilder.as(Lorg/apache/parquet/schema/LogicalTypeAnnotation;)Lorg/apache/parquet/schema/Types$Builder

需要修改packaging/hudi-flink-bundle/pom.xml,在relocations标签中加入:

<relocation>
  <pattern>org.apache.parquet</pattern>
  <shadedPattern>${flink.bundle.shade.prefix}org.apache.parquet</shadedPattern>
</relocation>

然后重新编译。

参考链接:
https://github.com/apache/hudi/issues/3042

Hive Sync时候的报错

近期使用Flink配合Hudi发现表数据可以成功插入但是没有同步到hive表。

Flink Web Dashboard查看JobManager日志,发现HMSDDLExecutor第86行执行遇到错误:

java.lang.NoClassDefFoundError: org/apache/calcite/plan/RelOptRule

解决方法:

复制$FLINK_HOME/opt/flink-table-planner_2.12-1.17.2.jar$FLINK_HOME/lib,然后删除或移走$FLINK_HOME/lib/flink-table-planner-loader-1.17.2.jar。再重新启动Yarn session和SQL client。插入数据后可以看到如下日志:

org.apache.hudi.hive.HiveSyncTool                            [] - Syncing target hoodie table with hive table(default_database.t1). Hive metastore URL from HiveConf:thrift://ip:9083). Hive metastore URL from HiveSyncConfig:thrift://ip:9083, basePath :/hudi/t1

说明Hive sync成功。可在beeline中使用:

select * from default_database.t1;

查询表中的内容。

java.lang.ClassNotFoundException: org.apache.hadoop.mapred.JobConf

复制如下Hadoop依赖到$FLINK_HOME/lib目录中:

hadoop-common-3.1.1.3.0.1.0-187.jar
hadoop-mapreduce-client-common-3.1.1.3.0.1.0-187.jar
hadoop-mapreduce-client-core-3.1.1.3.0.1.0-187.jar
hadoop-mapreduce-client-jobclient-3.1.1.3.0.1.0-187.jar

以jar包名字为准,版本号可能和实际环境不同。

Spark Hudi Hive Sync

Spark Hive Sync目前只支持DataFrame API。下面使用官网的例子插入数据到hudi_cow表:

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row


val tableName = "hudi_cow"
val basePath = "/user/hive/warehouse/hudi_cow"

val schema = StructType(Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
StructField("preComb", LongType,true),
StructField("name", StringType,true),
StructField("versionId", StringType,true),
StructField("toBeDeletedStr", StringType,true),
StructField("intToLong", IntegerType,true),
StructField("longToInt", LongType,true)
))

val data0 = Seq(Row("row_1", "2021/01/01",0L,"bob","v_0","toBeDel0",0,1000000L), 
               Row("row_2", "2021/01/01",0L,"john","v_0","toBeDel0",0,1000000L), 
               Row("row_3", "2021/01/02",0L,"tom","v_0","toBeDel0",0,1000000L))

var dfFromData0 = spark.createDataFrame(data0,schema)

dfFromData0.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
  option(RECORDKEY_FIELD_OPT_KEY, "rowId").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
  option(TABLE_NAME, tableName).
  option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL).
  option(OPERATION_OPT_KEY, "upsert").
  option("hoodie.index.type","SIMPLE").
  option("hoodie.datasource.write.hive_style_partitioning","true").
  option("hoodie.datasource.hive_sync.jdbcurl","jdbc:hive2://manager127:10000/").
  option("hoodie.datasource.hive_sync.database","default").
  option("hoodie.datasource.hive_sync.table","hudi_cow").
  option("hoodie.datasource.hive_sync.partition_fields","partitionId").
  option("hoodie.datasource.hive_sync.enable","true").
  option("hoodie.datasource.hive_sync.username","hdfs").
  mode(Overwrite).
  save(basePath)

Spark Hudi Hive Sync配置项含义如下:

  • hoodie.datasource.hive_sync.jdbcurl: Hive metastore连接JDBC URL。
  • hoodie.datasource.hive_sync.database: Hive database名称。
  • hoodie.datasource.hive_sync.table: Hive table名称。
  • hoodie.datasource.hive_sync.partition_fields: Hive分区字段名。
  • hoodie.datasource.hive_sync.enable: 是否启动Hive sync。
  • hoodie.datasource.hive_sync.username:访问Hive时使用的用户名。
  • hoodie.datasource.hive_sync.password:访问Hive使用的用户对应的密码。

和Flink一样,执行成功后可以使用Hive通过beeline查询Hudi表数据。

读优化表和实时表

MOR表元数据同步到Hive会出现一个ro表和一个rt表(RT表只有MOR表同步到Hive之后才会生成)。例如原Hudi表的名字为test,同步到Hive之后会出现如下3个表:

  • test
  • test_ro
  • test_rt

其中ro表为读优化表。在读取的时候忽略log文件,只读取base file(parquet)文件的内容,相当于忽略了上次compaction之后的更新数据(和snapshot query)类似。读取速度较快。

rt为实时表,在读取的时候会合并log文件和base file,返回最新版本的数据。读取速度较慢。

RO表(Read Optimized Table)特点:

  1. 压缩后的Parquet文件:RO表只暴露压缩后的Parquet文件,这意味着查询性能被优化,因为Parquet是一种列式存储格式,适合分析型查询。
  2. 查询性能:由于RO表优化了读取操作,它适合用于批量查询和分析场景。
  3. 查询方式:RO表的查询方式与普通Hive表类似,需要设置hive.input.formatorg.apache.hadoop.hive.ql.io.HiveInputFormat或者org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat来正确查询数据。

RT表(Real-Time Table)特点:

  1. 实时性:RT表提供了对Hudi表最新数据的实时视图,可以立即看到最近的写入操作。
  2. 增量查询:RT表支持增量查询,允许用户查询自上次查询以来的数据变更,这对于需要实时监控数据变化的场景非常有用。
  3. 查询模式:在查询RT表时,用户可以设置hive.input.formatorg.apache.hadoop.hive.ql.io.HiveInputFormat,并且需要设置增量查询相关的参数,如hoodie.mytableName.consume.modehoodie.mytableName.consume.start.timestamphoodie.mytableName.consume.max.commits
  4. 适用场景:RT表适合用于低延迟的数据访问和实时数据分析。

Hive增量查询RT表时候有如下配置项:

# 默认为SNAPSHOT,非增量,设置为INCREMENTAL模式支持增量读取
set hoodie.mytableName.consume.mode=INCREMENTAL; 
# 指定增量查询范围是下面timestamp之后的n次commit的数据
# 如果设置为-1,则增量范围是下面timestamp之后的所有数据
set hoodie.mytableName.consume.max.commits=3; 
# 指定从哪个timestamp开始增量查询
set hoodie.mytableName.consume.start.timestamp=commitTime;

需要注意的是,仅设置hoodie.mytableName.consume.start.timestamp是不够的,查询的时候需要指定where子句,限制_hoodie_commit_time的范围。例如:

select * from mytableName where _hoodie_commit_time > someTimestamp;

因为Hudi会定期将新老数据合并(compaction和小文件合并机制),hoodie.mytableName.consume.start.timestamp配置项是文件级别的过滤,对于合并之后的数据,文件级别的过滤无法精确过滤出满足条件的数据(文件级别过滤的作用是粗筛出不符合条件的文件,减少Hive的map数量,加快计算速度),因此SQL中仍需要加入where条件判断。

Kerberos环境Hive Sync

对于Flink引擎,需要额外配置属性hive_sync.conf.dir,指向hive-site.xml配置文件所在的目录。例如:

'hive_sync.conf.dir' = '/path/to/hive/conf'

Spark引擎目前没有找到类似的配置项。

参考链接

https://hudi.apache.org/docs/syncing_metastore

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,919评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,567评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,316评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,294评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,318评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,245评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,120评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,964评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,376评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,592评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,764评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,460评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,070评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,697评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,846评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,819评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,665评论 2 354

推荐阅读更多精彩内容