Flink 读写 Hive 表

背景

目前flink读写hive表一直是一个比较麻烦的事情。虽然flink1.10版本更新了hive table api,生产环境中可以使用。但测试过程中还是会遇到很多问题。这里介绍一些实战过程中hive source和 hive sink案例供大家来参考

方案

hive source:

flink 1.9和1.10中官网提供连接hive source的方式。1.10版本已经测试成功,1.9版本官网则不建议在生产环境中使用。顾下面代码实现是基于flink1.10环境。

 EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);
    String hiveConf = "/test";
    String hiveName = "test_source";
    String HIVE_VERSION = "1.1.0";

    // 注册catalog,使用hiveName的catalog
    HiveCatalog catalog = new HiveCatalog(hiveName, null, hiveConf, HIVE_VERSION);
    tableEnv.registerCatalog(hiveName,catalog);
    tableEnv.useCatalog(hiveName);
    Table table = tableEnv.sqlQuery(sql);

    // table to DataStream
    DataStream<Row> dataStream = tableEnv.toAppendStream(table, Row.class);

hive sink:

思路1:flink写数据到hdfs,jdbc访问hive metastore调用alter table语句将数据load到表中。

ALTER TABLE test.test_table ADD IF NOT EXISTS PARTITION (dt_=20200305) LOCATION '/test/20200305'";

思路2:flink写数据到hdfs,HiveMetastoreClient 调用add_partition,将数据添加到hive。HiveMetastoreClientWrapper 对象为flink-connect-hive.jar中的对象,大家也可以直接使用hive-exec.jar的HiveMetastoreClient。

    String HIVE_VERSION = "1.1.0";
    HiveConf hiveConf = createHiveConf("/test");
    hiveConf.set("hive-version", hiveConf);
    HiveMetastoreClientWrapper hMs = HiveMetastoreClientFactory.create(hiveConf, HIVE_VERSION);
    org.apache.hadoop.hive.metastore.api.Table table = hMs.getTable(database, tableName);

    // alter table add partition
    Partition pa = new Partition();
    pa.setDbName(table.getDbName());
    pa.setTableName(table.getTableName());
    pa.setValues(values); // 分区数值,eg dt = 20200305
    pa.setParameters(new HashMap<String, String>());
    pa.setSd(table.getSd());
    pa.getSd().setSerdeInfo(table.getSd().getSerdeInfo());
    pa.getSd().setLocation(location); // 分区的location路径
    Partition retp = hiveMetastore.add_partition(partition);

思路3:采用flink table api,将数据写入hive。此种思路只在flink 批环境中测试成功,在流环境中未测试成功。在这里吐槽一下,总感觉flink与blink合并之后存在很多不通用的代码实现。非常难以理解。


    HiveCatalog catalog = new HiveCatalog(hiveName, null, hiveConf, HIVE_VERSION);
    tableEnv.registerCatalog(hiveName,catalog);
    tableEnv.useCatalog(hiveName);
    // database.tableName 为hive中的表

    // 创建输入表(use catalog 放在前面)
    Table inputTable = tableEnv.fromDataSet(...);
    String table2= inputTable.toString();
    String sql = "insert overwrite database.tableName partition ( dt = 20200305 ) select * from " + table2;
    tableEnv.sqlUpdate(sql);

关于大数据方面技术问题可以咨询,替你解决你的苦恼。微信 hainanzhongjian

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 小米业务线众多,从信息流,电商,广告到金融等覆盖了众多领域,小米流式平台为小米集团各业务提供一体化的流式数据解决方...
    程序员66阅读 548评论 0 0
  • 1. Overview: Structured Streaming是基于Spark SQL引擎的可扩展、具有容错性...
    奉先阅读 2,912评论 0 1
  • 今天是我的课程,但是没什么思路,于是就邀约豆豆妈妈一起分享日记,豆豆妈妈真的非常用心,大中午的时间把ppt做好发给...
    林玉珍阅读 486评论 0 4
  • 1、其实五分钟问自己看了什么就是问自己有没有吸收,也是一种复盘。2、还是要往下背东西。一点点的背,一周一本书那也背很多。
    智囊团阅读 101评论 0 0
  • 杂货铺的老板娘又在给老板上政治课,絮絮叨叨的没完没了,老板低着头抽着闷烟。 闷了半天憋出一句话道:“实在过不下就离...
    秀火儒林阅读 661评论 13 17