一、背景
1.1 为什么引入Hive?
最初提出Hive的主要目的在于:降低使用MapReduce完成查询任务的技术门槛。
在RDBMS中,开发人员或者用户通过执行SQL语句进行查询,SQL语言是开发人员大都熟悉的语言。在大数据发展的初期,大数据查询的计算任务需要通过MapReduce来完成,然而编写MapReduce的程序是件复杂繁琐的事情。Hive 可以实现将大家熟悉的SQL语句翻译成复杂的MapReduce程序,利用Hive非MapReduce开发人员也能够快速上手使用MapReduce完成查询任务。因此,大家经常会说Hive使用的是一种类SQL的HQL语言。
1.2 为什么引入Spark?
Hive底层计算使用的是Hadoop的MapReduce,由于需要繁的磁盘IO,其计算性能只适合于大文件的非实时的批处理操作。Spark基于内存计算,凭借着DAG和RDD特性(保证中间数据如果丢失可以重新计算恢复),可以将计算的中间结果以RDD的形式保存在内存中,而不需要频繁的磁盘IO,非常适合于交互式迭代计算。Spark的计算性能远高于Hadoop的MapReduce。
1.3 Hive的内部表、外部表以及元数据
- 未被external修饰的是内部表(managed table);
- 被external修饰的为外部表(external table);
Hive表的元数据:
- Hive表的元数据metadata包括:表名、表的类型(内部表还是外部表)、表的owner、字段类型、数据存储位置等信息。
- Hive的元数据都存储在metastore中,metastore的数据使用JPOX(Java Persistent Objects)对象关系映射解决方案进行持久化,所以任何被JPOX支持的存储都可以被Hive使用,包括大多数商业RDBMS和许多开源的数据存储。Hive支持三种不同的元存储服务器,分别为:内嵌式元存储(即Derby)、本地元存储(最常见的MYSQL)、远程元存储,每种存储方式使用不同的配置参数。关于Hive的metastore可以参考这篇博客:https://blog.csdn.net/skywalker_only/article/details/26219619。
- 在安装Hive时需要对Hive的metastore进行配置,关于使用MYSQL作为Hive的metastore的方法可以参考博客:https://www.jianshu.com/p/ce4c5826a078。
内部表 V.S. 外部表:
名称 | 内部表 | 外部表 |
---|---|---|
表数据由谁管理 | Hive自身管理 | HDFS管理 |
表数据存储位置 | 配置项hive.metastore.warehouse.dir(默认:/user/hive/warehouse) | 自己制定 |
删除表带来的影响 | 直接删除元数据(metadata)和存储的数据 | 仅删除元数据(metadata) |
修改表结构带来的影响 | 将修改同步给元数据(metadata) | 需要修复外部表 (MSCK REPAIR TABLE table_name;) |
了解了这些背景知识后,接下来比较下Spark on Hive 和 Hive on Spark 区别。
二、Spark on Hive 和 Hive on Spark 区别
2.1 Spark on Hive
顾名思义,即将Spark构建在Hive之上,Spark需要用到Hive,具体表现为:
- 就是通过Spark SQL,加载Hive的配置文件,获取到Hive的metastore信息,进而获得metadata,但底层运行的还是 Spark RDD;
- Spark SQL获取到metadata之后就可以去访问Hive的所有表的数据;
- 接下来就可以通过Spark SQL来操作Hive表中存储的数据。
总之,Spark使用Hive来提供表的metadata信息。
2.2 Hive on Spark
顾名思义,即将Hive构建在Spark之上(Hive的底层默认计算引擎为Hadoop的MapReduce),Hive需要用到Spark,具体表现为:
- Hive 的底层默认计算引擎从MapReduce改为Spark;
- 通过修改hive-site.xml配置项
hive.execution.engine
的值来修改执行引擎(默认为mapreduce,即mr):
<property>
<name>hive.execution.engine</name>
<value>spark</value>
<description>
Expects one of [mr, tez, spark].
Chooses execution engine. Options are: mr (Map reduce, default), tez, spark. While MR
remains the default engine for historical reasons, it is itself a historical engine
and is deprecated in Hive 2 line. It may be removed without further warning.
</description>
</property>
- 将spark 所需要的依赖(${spark_home}/jars/)拷贝到${hive_home}/lib/下,否则会报错(配置方法可以参考博客:https://blog.csdn.net/mycafe_/article/details/79132727),比如:
hive> insert into tbl1 values(2,'a', 'f', 2);
Query ID = ......
Total jobs = 1
Launching Job 1 out of 1
In order to change the average load for a reducer (in bytes):
set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
set mapreduce.job.reduces=<number>
java.lang.NoClassDefFoundError: io/netty/channel/EventLoopGroup
at org.apache.hive.spark.client.SparkClientFactory.initialize(SparkClientFactory.java:56)
at org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl.setup(SparkSessionManagerImpl.java:83)
at org.apache.hadoop.hive.ql.exec.spark.session.SparkSessionManagerImpl.getSession(SparkSessionManagerImpl.java:102)
at org.apache.hadoop.hive.ql.exec.spark.SparkUtilities.getSparkSession(SparkUtilities.java:125)
.....
Caused by: java.lang.ClassNotFoundException: io.netty.channel.EventLoopGroup
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 24 more
FAILED: Execution Error, return code -101 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. io/netty/channel/EventLoopGroup
开发过程中常采取Spark on Hive 方案,接下来给出 Spark on Hive 创建外部查询表的方法。
三、 Spark on Hive 创建外部查询表
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;
public class HiveUtil {
private static final Logger logger = LoggerFactory.getLogger(HiveUtil.class);
public void createSparkDatabase(String instanceName, String schemaName, String tableName)
throws Exception {
String databaseName = getDatabaseName(instanceName, schemaName);
String ddl = String.format("CREATE DATABASE IF NOT EXISTS %s", databaseName+"___s");
launchSparkTask(instanceName, schemaName, tableName, ddl);
}
public void createSparkTable(String instanceName, String schemaName,
String tableName, String tablePrefix) throws Exception {
String sourcePK = metaConfigDao.getSoucePk(instanceName, schemaName, tableName);
dropSparkTable(instanceName, schemaName, tableName);
List<Pair<String, String>> columnList = dbHelper.getColumnList(instanceName,
schemaName, tableName);
StringBuffer dbColumns = new StringBuffer();
for (int i=0; i<columnList.size(); i++) {
Pair<String, String> p = columnList.get(i);
if (i == columnList.size() -1 ) {
dbColumns.append(p.getLeft()+" "+p.getRight());
} else {
dbColumns.append(p.getLeft()+" "+p.getRight()+",");
}
}
String databaseName = ;
String tenantName = ;
String ddl = String.format("CREATE TABLE if not exists %s.%s (%s) "
+ " using org.apache.spark.sql.execution.datasources.parquet"
+ " options(\"tenant\" \'%s\',"
+ "\"instance\" \"%s\", "
+ "\"schema\" \"%s\", "
+ "\"table\" \"%s\", "
+ "\"prefix\" \"%s\", "
+ "\"sourcepk\" \"%s\") ",
databaseName+"___s", tableName, dbColumns, tenantName, instanceName,
schemaName, tableName,tablePrefix, sourcePK);
logger.info("Execute spark ddl, ddl is: [{}]", ddl);
launchSparkTask(instanceName, schemaName, tableName, ddl);
logger.info("Spark table created [{}.{}]", schemaName, tableName);
}
public void dropSparkTable(String instanceName, String schemaName, String tableName)
throws Exception {
String databaseName = getDatabaseName(instanceName, schemaName);
String ddl = String.format("DROP TABLE IF EXISTS %s.%s", databaseName+"___s", tableName);
launchSparkTask(instanceName, schemaName, tableName, ddl);
logger.info("Dropped spark table [{}.{}]", databaseName+"___s", tableName);
}
//launch spark task to execute spark jobs
public void launchSparkTask(String instanceName, String schemaName,
String tableName, String sqlddl) throws Exception{
String sparkTaskJarName = handler.getParquetJarName().trim();
String createSparkTableClassPath = handler.getCreateSparkTableClassPath().trim();
String sparkMaster = handler.getSparkMaster().trim();
System.out.println("sparkTaskJarName: " + sparkTaskJarName);
String keytabPath = (kerberosUtil.getKeytabPath() == null) ?
"" : kerberosUtil.getKeytabPath();
String principal = (kerberosUtil.getPrincipal() == null) ?
"" : kerberosUtil.getPrincipal();
SparkLauncher launcher = new SparkLauncher();
// To be launched spark-application jar
launcher.setAppResource(sparkTaskJarName);
launcher.setMainClass(createSparkTableClassPath);
launcher.addAppArgs(sqlddl, keytabPath, principal );
// master could be yarn or local[*]
//launcher.setMaster("local[*]");
launcher.setMaster(sparkMaster);
SparkAppHandle handle = launcher.startApplication();
int retrytimes = 0;
while (handle.getState() != SparkAppHandle.State.FINISHED) {
retrytimes ++;
Thread.sleep(5000L);
System.out.println("applicationId is: " + handle.getAppId());
System.out.println("current state: " + handle.getState());
boolean mark = (handle.getAppId() == null
&& handle.getState() == SparkAppHandle.State.FAILED )
&& retrytimes > 8;
if (mark) {
logger.info("can not start spark job for creating spark table. Creating spark table failed. ");
metaConfigDao.updateMetaFlag(instanceName, schemaName, tableName, Config.META_FLAG_TASK_FAILED);
failedflag = true;
break;
}
}
System.out.println("Launcher over");
}
}