Spark on Hive 和 Hive on Spark 区别

一、背景

1.1 为什么引入Hive?

最初提出Hive的主要目的在于:降低使用MapReduce完成查询任务的技术门槛
在RDBMS中,开发人员或者用户通过执行SQL语句进行查询,SQL语言是开发人员大都熟悉的语言。在大数据发展的初期,大数据查询的计算任务需要通过MapReduce来完成,然而编写MapReduce的程序是件复杂繁琐的事情。Hive 可以实现将大家熟悉的SQL语句翻译成复杂的MapReduce程序,利用Hive非MapReduce开发人员也能够快速上手使用MapReduce完成查询任务。因此,大家经常会说Hive使用的是一种类SQL的HQL语言。

Hive查询原理示意图

1.2 为什么引入Spark?

Hive底层计算使用的是Hadoop的MapReduce,由于需要繁的磁盘IO,其计算性能只适合于大文件的非实时的批处理操作。Spark基于内存计算,凭借着DAG和RDD特性(保证中间数据如果丢失可以重新计算恢复),可以将计算的中间结果以RDD的形式保存在内存中,而不需要频繁的磁盘IO,非常适合于交互式迭代计算。Spark的计算性能远高于Hadoop的MapReduce。

1.3 Hive的内部表、外部表以及元数据

  • 未被external修饰的是内部表(managed table);
  • 被external修饰的为外部表(external table);

Hive表的元数据:

  • Hive表的元数据metadata包括:表名、表的类型(内部表还是外部表)、表的owner、字段类型、数据存储位置等信息。
  • Hive的元数据都存储在metastore中,metastore的数据使用JPOX(Java Persistent Objects)对象关系映射解决方案进行持久化,所以任何被JPOX支持的存储都可以被Hive使用,包括大多数商业RDBMS和许多开源的数据存储。Hive支持三种不同的元存储服务器,分别为:内嵌式元存储(即Derby)、本地元存储(最常见的MYSQL)、远程元存储,每种存储方式使用不同的配置参数。关于Hive的metastore可以参考这篇博客:https://blog.csdn.net/skywalker_only/article/details/26219619
  • 在安装Hive时需要对Hive的metastore进行配置,关于使用MYSQL作为Hive的metastore的方法可以参考博客:https://www.jianshu.com/p/ce4c5826a078

内部表 V.S. 外部表:

名称 内部表 外部表
表数据由谁管理 Hive自身管理 HDFS管理
表数据存储位置 配置项hive.metastore.warehouse.dir(默认:/user/hive/warehouse) 自己制定
删除表带来的影响 直接删除元数据(metadata)和存储的数据 仅删除元数据(metadata)
修改表结构带来的影响 将修改同步给元数据(metadata) 需要修复外部表 (MSCK REPAIR TABLE table_name;)

了解了这些背景知识后,接下来比较下Spark on Hive 和 Hive on Spark 区别。


二、Spark on Hive 和 Hive on Spark 区别

2.1 Spark on Hive

顾名思义,即将Spark构建在Hive之上,Spark需要用到Hive,具体表现为:

  • 就是通过Spark SQL,加载Hive的配置文件,获取到Hive的metastore信息,进而获得metadata,但底层运行的还是 Spark RDD;
  • Spark SQL获取到metadata之后就可以去访问Hive的所有表的数据;
  • 接下来就可以通过Spark SQL来操作Hive表中存储的数据。

总之,Spark使用Hive来提供表的metadata信息

2.2 Hive on Spark

顾名思义,即将Hive构建在Spark之上(Hive的底层默认计算引擎为Hadoop的MapReduce),Hive需要用到Spark,具体表现为:

  • Hive 的底层默认计算引擎从MapReduce改为Spark;
  • 通过修改hive-site.xml配置项hive.execution.engine的值来修改执行引擎(默认为mapreduce,即mr):
<property>
    <name>hive.execution.engine</name>
    <value>spark</value>
    <description>
      Expects one of [mr, tez, spark].
      Chooses execution engine. Options are: mr (Map reduce, default), tez, spark. While MR
      remains the default engine for historical reasons, it is itself a historical engine
      and is deprecated in Hive 2 line. It may be removed without further warning.
    </description>
  </property>

hive> insert into tbl1 values(2,'a', 'f', 2);

Query ID = ......
Total jobs = 1
Launching Job 1 out of 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
java.lang.NoClassDefFoundError: io/netty/channel/EventLoopGroup
    at org.apache.hive.spark.client.SparkClientFactory.initialize(SparkClientFactory.java:56)
    at org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl.setup(SparkSessionManagerImpl.java:83)
    at org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl.getSession(SparkSessionManagerImpl.java:102)
    at org.apache.hadoop.hive.ql.exec.spark.SparkUtilities.getSparkSession(SparkUtilities.java:125)
    .....
Caused by: java.lang.ClassNotFoundException: io.netty.channel.EventLoopGroup
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 24 more
FAILED: Execution Error, return code -101 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. io/netty/channel/EventLoopGroup

开发过程中常采取Spark on Hive 方案,接下来给出 Spark on Hive 创建外部查询表的方法。


三、 Spark on Hive 创建外部查询表


import org.apache.hadoop.conf.Configuration;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;


public class HiveUtil {

  private static final Logger logger = LoggerFactory.getLogger(HiveUtil.class);

  public void createSparkDatabase(String instanceName, String schemaName, String tableName)
          throws Exception {
    String databaseName = getDatabaseName(instanceName, schemaName);
    String ddl = String.format("CREATE DATABASE IF NOT EXISTS %s", databaseName+"___s");
    launchSparkTask(instanceName, schemaName, tableName, ddl);
  }

  public void createSparkTable(String instanceName, String schemaName,
                              String tableName, String tablePrefix) throws Exception {
    String sourcePK = metaConfigDao.getSoucePk(instanceName, schemaName, tableName);
    dropSparkTable(instanceName, schemaName, tableName);
    List<Pair<String, String>> columnList = dbHelper.getColumnList(instanceName,
            schemaName, tableName);
    StringBuffer dbColumns = new StringBuffer();
    for (int i=0; i<columnList.size(); i++) {
      Pair<String, String> p = columnList.get(i);
      if (i == columnList.size() -1 ) {
        dbColumns.append(p.getLeft()+" "+p.getRight());
      } else {
        dbColumns.append(p.getLeft()+" "+p.getRight()+",");
      }
    }

    String databaseName = ;
    String tenantName = ;

    String ddl = String.format("CREATE TABLE if not exists %s.%s (%s) "
                    + " using org.apache.spark.sql.execution.datasources.parquet"
                    + " options(\"tenant\" \'%s\',"
                    + "\"instance\" \"%s\", "
                    + "\"schema\" \"%s\", "
                    + "\"table\" \"%s\", "
                    + "\"prefix\" \"%s\", "
                    + "\"sourcepk\" \"%s\") ",
            databaseName+"___s", tableName, dbColumns, tenantName, instanceName,
            schemaName, tableName,tablePrefix, sourcePK);

    logger.info("Execute spark ddl, ddl is: [{}]", ddl);
    launchSparkTask(instanceName, schemaName, tableName, ddl);
    logger.info("Spark table created [{}.{}]", schemaName, tableName);
  }

  public void dropSparkTable(String instanceName, String schemaName, String tableName)
          throws Exception {
    String databaseName = getDatabaseName(instanceName, schemaName);
    String ddl = String.format("DROP TABLE IF EXISTS %s.%s", databaseName+"___s", tableName);
    launchSparkTask(instanceName, schemaName, tableName, ddl);
    logger.info("Dropped spark table [{}.{}]", databaseName+"___s", tableName);
  }


//launch spark task to execute spark jobs
  public void launchSparkTask(String instanceName, String schemaName,
                              String tableName, String sqlddl) throws Exception{
    String sparkTaskJarName = handler.getParquetJarName().trim();
    String createSparkTableClassPath = handler.getCreateSparkTableClassPath().trim();
    String sparkMaster = handler.getSparkMaster().trim();
    System.out.println("sparkTaskJarName: " + sparkTaskJarName);
    String keytabPath = (kerberosUtil.getKeytabPath() == null) ?
            "" : kerberosUtil.getKeytabPath();
    String principal = (kerberosUtil.getPrincipal() == null) ?
            "" : kerberosUtil.getPrincipal();
    SparkLauncher launcher = new SparkLauncher();
    // To be launched spark-application jar
    launcher.setAppResource(sparkTaskJarName);
    launcher.setMainClass(createSparkTableClassPath);
    launcher.addAppArgs(sqlddl, keytabPath, principal );
    // master could be yarn or local[*]
    //launcher.setMaster("local[*]");
    launcher.setMaster(sparkMaster);
    SparkAppHandle handle = launcher.startApplication();
    int retrytimes = 0;
    while (handle.getState() != SparkAppHandle.State.FINISHED) {
      retrytimes ++;
      Thread.sleep(5000L);
      System.out.println("applicationId is: " + handle.getAppId());
      System.out.println("current state: " + handle.getState());
      boolean mark = (handle.getAppId() == null
              && handle.getState() == SparkAppHandle.State.FAILED )
              && retrytimes > 8;
      if (mark) {
        logger.info("can not start spark job for creating spark table. Creating spark table failed. ");
        metaConfigDao.updateMetaFlag(instanceName, schemaName, tableName, Config.META_FLAG_TASK_FAILED);
        failedflag = true;
        break;
      }
    }
    System.out.println("Launcher over");
  }
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,186评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,858评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,620评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,888评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,009评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,149评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,204评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,956评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,385评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,698评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,863评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,544评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,185评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,899评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,141评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,684评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,750评论 2 351

推荐阅读更多精彩内容