flink1.10连接hive1.1.0的填坑历程

最近有个项目,需要读取hive表中的数据写到Hbase中,正好flink1.10版本刚出来,而且支持连接hive,那就他了。

哪想到这才是噩梦的开始。确定了技术栈,就开始着手写代码,按照习惯,官网打开瞅一眼案例:

这是依赖:

依赖

这是代码:

code

是不是觉得太简单了,太天真了。

开始构建工程,添加依赖,修改地址,开始本地跑

exception

第一个,这个问题很明显是kerberos的问题,但是,我对Kerberos的问题一直很头疼,所以这个问题是同事解决的,给他点个赞。

加上关于kerberos的相关代码后,本地算是勉强调通了。

最终本地调试代码:

最终本地调试代码

本地可以了,上集群测试,先来个简单的脚本吧

shell

启动,报错

error

这个错误也很明确,你需要设置hadoop的配置文件的目录和依赖的目录(原因是flink从某版本(具体哪个版本记不清了)不再提供hadoop的依赖,所以需要手动添加,具体解决方案,在flink官网说的很清楚,传送门:https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/hadoop.html),好办,修改脚本

shell

重新启动,紧接着异常

exception

这个问题困惑了我好久,我知道是jar冲突,也知道是哪个jar包,但是不知道排除哪些依赖,后来晚上回家睡觉流程有捋了一下,才恍然大悟

我打包的时候,把flink-shaded-hadoop2的jar包打进了我的依赖中, 导致他和集群上hadoop的jar包冲突,只需要把这个包排掉就行

解决办法:上传集群时,去掉这个jar包

<dependency>

    <groupId>org.apache.flink</groupId>

    <artifactId>flink-shaded-hadoop-2-uber</artifactId>

    <version>2.6.5-8.0</version>

    <scope>${scope.flink.lib}</scope>

</dependency>

启动,又见异常:

exception

看到这个我心里有点数了,我导进来hadoop jar包和flink1.10的jar包冲突了,我懵逼了,这咋排??于是果断换个方案,下载flink-shaded-hadoop-2-uber-2.6.5-7.0.jar直接扔进flink/lib下,解决。

继续启动,异常又来:

exception

解决办法:

<dependency>

            <groupId>org.apache.hive</groupId>

            <artifactId>hive-exec</artifactId>

            <version>1.2.1</version>

            <exclusions>

                <exclusion>

                    <groupId>org.codehaus.janino</groupId>

                    <artifactId>janino</artifactId>

                </exclusion>

                <exclusion>

                    <groupId>org.codehaus.janino</groupId>

                    <artifactId>commons-compiler</artifactId>

                </exclusion>

            </exclusions>

        </dependency>

紧接着,又报错,

exception

添加依赖:

<dependency>

    <groupId>org.apache.thrift</groupId>

    <artifactId>libfb303</artifactId>

    <version>0.9.2</version>

</dependency>

我以为就到此为止了吧,不,还有个绝的:

exception

看到这个异常,我都懵了。这是啥问题??心态有点崩,找了各种资料,最终终于知道问题所在:

我的hive是cdh版本的,但是我添加的hadoop依赖flink-shaded-hadoop2是apache的。

解决办法:添加依赖

<dependency>

    <groupId>org.apache.hadoop</groupId>

    <artifactId>hadoop-core</artifactId>

    <version>2.6.0-mr1-cdh5.8.0</version>

</dependency>

至此,我的任务终于跑起来了

result

看到结果,我倍感欣慰,这一路走来太不容易了,心态崩过好几次,幸好结果是好的。解决问题的过程虽然很痛苦,但是解决了问题后的感觉真的是超级棒!!

note:

flink连接hive处理流任务,读取hive表的数据时,即使加了limit的字段,也会将全表数据加载到state中,如果表很大,比如全量表,读起来会很慢,而且超级耗资源,慎用!!

但是,我想总会有解决的办法(有时间再更新)。期待flink对接hive越来越完善!!

贴出最终code和pom:

1
2
3

<properties>

    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

    <flink.version>1.10.0</flink.version>

    <scala.binary.version>2.11</scala.binary.version>

    <scala.version>2.11.12</scala.version>

    <hive.version>1.1.0-cdh5.8.0</hive.version>

    <scope.flink.lib>compile</scope.flink.lib> 

</properties>

<dependencies>

    <dependency>

        <groupId>org.apache.flink</groupId>

        <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>

        <version>1.10.0</version>

    </dependency>

    <dependency>

        <groupId>org.apache.flink</groupId>

        <artifactId>flink-shaded-hadoop-2-uber</artifactId>

        <version>2.6.5-8.0</version>

        <scope>${scope.flink.lib}</scope>

    </dependency>

    <dependency>

        <groupId>org.apache.hadoop</groupId>

        <artifactId>hadoop-mapreduce-client-common</artifactId>

        <version>2.6.0-cdh5.8.0</version>

        <scope>${scope.flink.lib}</scope>

    </dependency>

    <dependency>

        <groupId>org.apache.hadoop</groupId>

        <artifactId>hadoop-core</artifactId>

        <version>2.6.0-mr1-cdh5.8.0</version>

    </dependency>

    <dependency>

        <groupId>org.apache.flink</groupId>

        <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>

        <version>1.10.0</version>

        <scope>${scope.flink.lib}</scope>

    </dependency>

    <dependency>

        <groupId>org.apache.flink</groupId>

        <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>

        <version>1.10.0</version>

        <scope>${scope.flink.lib}</scope>

    </dependency>

    <dependency>

        <groupId>org.apache.hive</groupId>

        <artifactId>hive-exec</artifactId>

        <version>${hive.version}</version>

        <exclusions>

            <exclusion>

                <groupId>org.codehaus.janino</groupId>

                <artifactId>janino</artifactId>

            </exclusion>

            <exclusion>

                <groupId>org.codehaus.janino</groupId>

                <artifactId>commons-compiler</artifactId>

            </exclusion>

        </exclusions>

    </dependency>

    <dependency>

        <groupId>org.projectlombok</groupId>

        <artifactId>lombok</artifactId>

        <version>1.16.18</version>

        <scope>${scope.flink.lib}</scope>

    </dependency>

    <dependency>

        <groupId>org.apache.thrift</groupId>

        <artifactId>libfb303</artifactId>

        <version>0.9.2</version>

    </dependency>

    <dependency>

        <groupId>org.apache.flink</groupId>

        <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>

        <version>${flink.version}</version>

        <exclusions>

            <exclusion>

                <artifactId>slf4j-api</artifactId>

                <groupId>org.slf4j</groupId>

            </exclusion>

            <exclusion>

                <artifactId>slf4j-log4j12</artifactId>

                <groupId>org.slf4j</groupId>

            </exclusion>

        </exclusions>

    </dependency>

</dependencies>

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容