Flink集成Hive之快速入门--以Flink1.12为例

使用Hive构建数据仓库已经成为了比较普遍的一种解决方案。目前,一些比较常见的大数据处理引擎,都无一例外兼容Hive。Flink从1.9开始支持集成Hive,不过1.9版本为beta版,不推荐在生产环境中使用。在Flink1.10版本中,标志着对 Blink的整合宣告完成,对 Hive 的集成也达到了生产级别的要求。值得注意的是,不同版本的Flink对于Hive的集成有所差异,本文将以最新的Flink1.12版本为例,阐述Flink集成Hive的简单步骤,以下是全文,希望对你有所帮助。

公众号『大数据技术与数仓』,回复『资料』领取大数据资料包

Flink集成Hive的基本方式

Flink 与 Hive 的集成主要体现在以下两个方面:

  • 持久化元数据

Flink利用 Hive 的 MetaStore 作为持久化的 Catalog,我们可通过HiveCatalog将不同会话中的 Flink 元数据存储到 Hive Metastore 中。 例如,我们可以使用HiveCatalog将其 Kafka的数据源表存储在 Hive Metastore 中,这样该表的元数据信息会被持久化到Hive的MetaStore对应的元数据库中,在后续的 SQL 查询中,我们可以重复使用它们。

  • 利用 Flink 来读写 Hive 的表。

Flink打通了与Hive的集成,如同使用SparkSQL或者Impala操作Hive中的数据一样,我们可以使用Flink直接读写Hive中的表。

HiveCatalog的设计提供了与 Hive 良好的兼容性,用户可以”开箱即用”的访问其已有的 Hive表。 不需要修改现有的 Hive Metastore,也不需要更改表的数据位置或分区。

Flink集成Hive的步骤

Flink支持的Hive版本

大版本 V1 V2 V3 V4 V5 V6 V7
1.0 1.0.0 1.0.1
1.1 1.1.0 1.1.1
1.2 1.2.0 1.2.1 1.2.2
2.0 2.0.0 2.0.1
2.1 2.1.0 2.1.1
2.2 2.2.0
2.3 2.3.0 2.3.1 2.3.2 2.3.3 2.3.4 2.3.5 2.3.6
3.1 3.1.0 3.1.1 3.1.2

值得注意的是,对于不同的Hive版本,可能在功能方面有所差异,这些差异取决于你使用的Hive版本,而不取决于Flink,一些版本的功能差异如下:

  • Hive 内置函数在使用 Hive-1.2.0 及更高版本时支持。
  • 列约束,也就是 PRIMARY KEY 和 NOT NULL,在使用 Hive-3.1.0 及更高版本时支持。
  • 更改表的统计信息,在使用 Hive-1.2.0 及更高版本时支持。
  • DATE列统计信息,在使用 Hive-1.2.0 及更高版时支持。
  • 使用 Hive-2.0.x 版本时不支持写入 ORC 表。

依赖项

本文以Flink1.12为例,集成的Hive版本为Hive2.3.4。集成Hive需要额外添加一些依赖jar包,并将其放置在Flink安装目录下的lib文件夹下,这样我们才能通过 Table API 或 SQL Client 与 Hive 进行交互。

另外,Apache Hive 是基于 Hadoop 之上构建的, 所以还需要 Hadoop 的依赖,配置好HADOOP_CLASSPATH即可。这一点非常重要,否则在使用FlinkSQL Cli查询Hive中的表时,会报如下错误:

java.lang.ClassNotFoundException: org.apache.hadoop.mapred.JobConf

配置HADOOP_CLASSPATH,需要在/etc/profile文件中配置如下的环境变量

export HADOOP_CLASSPATH=`hadoop classpath`

Flink官网提供了两种方式添加Hive的依赖项。第一种是使用 Flink 提供的 Hive Jar包(根据使用的 Metastore 的版本来选择对应的 Hive jar),建议优先使用Flink提供的Hive jar包,这种方式比较简单方便。本文使用的就是此种方式。当然,如果你使用的Hive版本与Flink提供的Hive jar包兼容的版本不一致,你可以选择第二种方式,即别添加每个所需的 jar 包。

下面列举了可用的jar包及其适用的Hive版本,我们可以根据使用的Hive版本,下载对应的jar包即可。比如本文使用的Hive版本为Hive2.3.4,所以只需要下载flink-sql-connector-hive-2.3.6即可,并将其放置在Flink安装目录的lib文件夹下。

Metastore version Maven dependency SQL Client JAR
1.0.0 ~ 1.2.2 flink-sql-connector-hive-1.2.2 Download
2.0.0 ~2.2.0 flink-sql-connector-hive-2.2.0 Download
2.3.0 ~2.3.6 flink-sql-connector-hive-2.3.6 Download
3.0.0 ~ 3.1.2 flink-sql-connector-hive-3.1.2 Download

上面列举的jar包,是我们在使用Flink SQL Cli所需要的jar包,除此之外,根据不同的Hive版本,还需要添加如下jar包。以Hive2.3.4为例,除了上面的一个jar包之外,还需要添加下面两个jar包:

flink-connector-hive_2.11-1.12.0.jarhive-exec-2.3.4.jar。其中hive-exec-2.3.4.jar包存在于Hive安装路径下的lib文件夹。flink-connector-hive_2.11-1.12.0.jar的下载地址为:

https://repo1.maven.org/maven2/org/apache/flink/flink-connector-hive_2.11/1.12.0/

NOTE:black_nib::Flink1.12集成Hive只需要添加如下三个jar包,以Hive2.3.4为例,分别为:

flink-sql-connector-hive-2.3.6

flink-connector-hive_2.11-1.12.0.jar

hive-exec-2.3.4.jar

Flink SQL Cli集成Hive

将上面的三个jar包添加至Flink的lib目录下之后,就可以使用Flink操作Hive的数据表了。以FlinkSQL Cli为例:

配置sql-client-defaults.yaml

该文件时Flink SQL Cli启动时使用的配置文件,该文件位于Flink安装目录的conf/文件夹下,具体的配置如下,主要是配置catalog:

image

除了上面的一些配置参数,Flink还提供了下面的一些其他配置参数:

参数 必选 默认值 类型 描述
type (无) String Catalog 的类型。 创建 HiveCatalog 时,该参数必须设置为'hive'
name (无) String Catalog 的名字。仅在使用 YAML file 时需要指定。
hive-conf-dir (无) String 指向包含 hive-site.xml 目录的 URI。 该 URI 必须是 Hadoop 文件系统所支持的类型。 如果指定一个相对 URI,即不包含 scheme,则默认为本地文件系统。如果该参数没有指定,我们会在 class path 下查找hive-site.xml。
default-database default String 当一个catalog被设为当前catalog时,所使用的默认当前database。
hive-version (无) String HiveCatalog 能够自动检测使用的 Hive 版本。我们建议不要手动设置 Hive 版本,除非自动检测机制失败。
hadoop-conf-dir (无) String Hadoop 配置文件目录的路径。目前仅支持本地文件系统路径。我们推荐使用 HADOOP_CONF_DIR 环境变量来指定 Hadoop 配置。因此仅在环境变量不满足您的需求时再考虑使用该参数,例如当您希望为每个 HiveCatalog 单独设置 Hadoop 配置时。

操作Hive中的表

首先启动FlinkSQL Cli,命令如下:

./bin/sql-client.sh embedded

接下来,我们可以查看注册的catalog

Flink SQL> show catalogs;
default_catalog
myhive

使用注册的myhive catalog

Flink SQL> use catalog myhive;

假设Hive中有一张users表,在Hive中查询该表:

hive (default)> select * from users;
OK
users.id        users.mame
1       jack
2       tom
3       robin
4       haha
5       haha

查看对应的数据库表,我们可以看到Hive中已经存在的表,这样就可以使用FlinkSQL操作Hive中的表,比如查询,写入数据。

Flink SQL> show tables;
Flink SQL> select * from users;
image

向Hive表users中插入一条数据:

Flink SQL> insert into users select 6,'bob';

再次使用Hive客户端去查询该表的数据,会发现写入了一条数据。

接下来,我们再在FlinkSQL Cli中创建一张kafka的数据源表:

CREATE TABLE user_behavior ( 
    `user_id` BIGINT, -- 用户id
    `item_id` BIGINT, -- 商品id
    `cat_id` BIGINT, -- 品类id
    `action` STRING, -- 用户行为
    `province` INT, -- 用户所在的省份
    `ts` BIGINT, -- 用户行为发生的时间戳
    `proctime` AS PROCTIME(), -- 通过计算列产生一个处理时间列
    `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间
     WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND  -- 定义watermark
 ) WITH ( 
    'connector' = 'kafka', -- 使用 kafka connector
    'topic' = 'user_behavior', -- kafka主题
    'scan.startup.mode' = 'earliest-offset', -- 偏移量
    'properties.group.id' = 'group1', -- 消费者组
    'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 
    'format' = 'json', -- 数据源格式为json
    'json.fail-on-missing-field' = 'true',
    'json.ignore-parse-errors' = 'false'
);

查看表结构

Flink SQL> DESCRIBE user_behavior;
image

我们可以在Hive的客户端中执行下面命令查看刚刚在Flink SQLCli中创建的表

hive (default)> desc formatted  user_behavior;
# Detailed Table Information             
Database:               default                  
Owner:                  null                     
CreateTime:             Sun Dec 20 16:04:59 CST 2020     
LastAccessTime:         UNKNOWN                  
Retention:              0                        
Location:               hdfs://kms-1.apache.com:8020/user/hive/warehouse/user_behavior   
Table Type:             MANAGED_TABLE            
Table Parameters:                
        flink.connector         kafka               
        flink.format            json                
        flink.json.fail-on-missing-field        true                
        flink.json.ignore-parse-errors  false               
        flink.properties.bootstrap.servers      kms-2:9092,kms-3:9092,kms-4:9092
        flink.properties.group.id       group1              
        flink.scan.startup.mode earliest-offset     
        flink.schema.0.data-type        BIGINT              
        flink.schema.0.name     user_id             
        flink.schema.1.data-type        BIGINT              
        flink.schema.1.name     item_id             
        flink.schema.2.data-type        BIGINT              
        flink.schema.2.name     cat_id              
        flink.schema.3.data-type        VARCHAR(2147483647) 
        flink.schema.3.name     action              
        flink.schema.4.data-type        INT                 
        flink.schema.4.name     province            
        flink.schema.5.data-type        BIGINT              
        flink.schema.5.name     ts                  
        flink.schema.6.data-type        TIMESTAMP(3) NOT NULL
        flink.schema.6.expr     PROCTIME()          
        flink.schema.6.name     proctime            
        flink.schema.7.data-type        TIMESTAMP(3)        
        flink.schema.7.expr     TO_TIMESTAMP(FROM_UNIXTIME(`ts`, 'yyyy-MM-dd HH:mm:ss'))
        flink.schema.7.name     eventTime           
        flink.schema.watermark.0.rowtime        eventTime           
        flink.schema.watermark.0.strategy.data-type     TIMESTAMP(3)        
        flink.schema.watermark.0.strategy.expr  `eventTime` - INTERVAL '5' SECOND
        flink.topic             user_behavior       
        is_generic              true                
        transient_lastDdlTime   1608451499          
                 
# Storage Information            
SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe       
InputFormat:            org.apache.hadoop.mapred.TextInputFormat         
OutputFormat:           org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat   
Compressed:             No                       
Num Buckets:            -1                       
Bucket Columns:         []                       
Sort Columns:           []                       
Storage Desc Params:             
        serialization.format    1                   

NOTE:black_flag::在Flink中创建一张表,会把该表的元数据信息持久化到Hive的metastore中,我们可以在Hive的metastore中查看该表的元数据信息

进入Hive的元数据信息库,本文使用的是MySQL。执行下面的命令:

SELECT 
    a.tbl_id, -- 表id
    from_unixtime(create_time) AS create_time, -- 创建时间
    a.db_id, -- 数据库id
    b.name AS db_name, -- 数据库名称
    a.tbl_name -- 表名称
FROM TBLS AS a
LEFT JOIN DBS AS b ON a.db_id =b.db_id
WHERE a.tbl_name = "user_behavior";
image

使用代码连接到 Hive

maven依赖

<!-- Flink Dependency -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hive_2.11</artifactId>
  <version>1.12.0</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.12.0</version>
</dependency>
<!-- Hive Dependency -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>2.3.4</version>
</dependency>

代码

public class HiveIntegrationDemo {
    public static void main(String[] args) {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        String name            = "myhive";
        String defaultDatabase = "default";
        String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf";
        
        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
        tableEnv.registerCatalog("myhive", hive);
        // 使用注册的catalog
        tableEnv.useCatalog("myhive");
        // 向Hive表中写入一条数据 
        String insertSQL = "insert into users select 10,'lihua'";

        TableResult result2 = tableEnv.executeSql(insertSQL);
        System.out.println(result2.getJobClient().get().getJobStatus());

    }
}

提交程序,观察Hive表的变化:

bin/flink run -m kms-1:8081 \
-c com.flink.sql.hiveintegration.HiveIntegrationDemo \
./original-study-flink-sql-1.0-SNAPSHOT.jar

总结

本文以最新的Flink1.12为例,阐述了Flink集成Hive的基本步骤,并对其注意事项进行了说明。文中也给出了如何通过FlinkSQL Cli和代码去操作Hive表的步骤。下一篇,将介绍Hive Catalog与Hive Dialect。

公众号『大数据技术与数仓』,回复『资料』领取大数据资料包

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容