事件背景
我在Spring boot服务里以local模式启动了一个Spark任务,在初始化SparkSession的时候报错:
Exception in thread "main" org.apache.hadoop.security.AccessControlException: Permission denied: user=root, access=WRITE, inode="/tmp/hive":hive:supergroup:drwxr-xr-x at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:279)
这个事情很令人费解的有两点:
- 本地启动SparkSession,只是会连接Metastore。明明上一步Metastore已经连接成功了,为什么要往这个目录下写操作?
- 它到底在写什么?
- 我是在java代码里写的启动,不是使用spark-submit,怎么切换用户?
探寻真相
借此机会好好研究了一下SparkSession的启动过程。尤其是与hive有关的这部分内容。
以下是我从倒序的方式研究出来的正序的结果,毕竟还是要为了好阅读嘛!
创建SparkSession的时候,如果写了.enableHiveSupport()
意味着使用Hive Metastore作为catalog的提供方。这一步与使用.config("spark.sql.catalogImplementation","hive")
是一个意思。
如果确定了使用hiveMetaStore作为catalog,SparkSession初始化的时候就会使用"org.apache.spark.sql.hive.HiveSessionStateBuilder"
做实例,当然是通过反射创建的。这一步可以看SparkSession源码里的下面这段代码
// 以下代码来自Spark github源码,branch-3.1分支
/**
* State isolated across sessions, including SQL configurations, temporary tables, registered
* functions, and everything else that accepts a [[org.apache.spark.sql.internal.SQLConf]].
* If `parentSessionState` is not null, the `SessionState` will be a copy of the parent.
*
* This is internal to Spark and there is no guarantee on interface stability.
*
* @since 2.2.0
*/
@Unstable
@transient
lazy val sessionState: SessionState = { // 初始化SessionState
parentSessionState
.map(_.clone(this))
.getOrElse {
val state = SparkSession.instantiateSessionState(
SparkSession.sessionStateClassName(sparkContext.conf), // 根据不同的className创建实例
self,
initialSessionOptions)
state
}
}
...
private val HIVE_SESSION_STATE_BUILDER_CLASS_NAME =
"org.apache.spark.sql.hive.HiveSessionStateBuilder"
private def sessionStateClassName(conf: SparkConf): String = { // 如果是hive的话,就使用上面这个类
conf.get(CATALOG_IMPLEMENTATION) match { // 这个常量就是"spark.sql.catalogImplementation"
case "hive" => HIVE_SESSION_STATE_BUILDER_CLASS_NAME
case "in-memory" => classOf[SessionStateBuilder].getCanonicalName
}
}
...
/**
* Helper method to create an instance of `SessionState` based on `className` from conf.
* The result is either `SessionState` or a Hive based `SessionState`.
*/
private def instantiateSessionState(
className: String,
sparkSession: SparkSession,
options: Map[String, String]): SessionState = {
try {
// invoke new [Hive]SessionStateBuilder(
// SparkSession,
// Option[SessionState],
// Map[String, String])
val clazz = Utils.classForName(className) // 反射创建实例
val ctor = clazz.getConstructors.head
ctor.newInstance(sparkSession, None, options).asInstanceOf[BaseSessionStateBuilder].build()
} catch {
case NonFatal(e) =>
throw new IllegalArgumentException(s"Error while instantiating '$className':", e)
}
}
我们经常在代码中使用sparksession.sessionstate.catalog()
用的起始就是"org.apache.spark.sql.hive.HiveSessionStateBuilder"
的catalog
变量。这个变量的初始化会触发HiveExternalCatalog
初始化。
这个初始化就开始有趣了。我原先一直以为,Spark持有的仅是HiveMetastore的client端,也就是仅能通过thrift 连接联通HiveMetaStore,获取对应的元数据。但是仔细看HiveExternalCatalog
的初始化过程,是直接持有了HiveClient,说白了,拥有这个Client,是可以直接连HiveServer2执行SQL和其他Hive操作的。
喏,就是下面这段代码
// 以下代码来自Spark github源码,branch-3.1分支
/**
* A Hive client used to interact with the metastore.
*/
lazy val client: HiveClient = {
HiveUtils.newClientForMetadata(conf, hadoopConf) // 这个client是主要连接MetaStore的地方,每次切换Metastore就是重刷这个就可以了
}
// 下面是HiveUtils的源码
/**
* Create a [[HiveClient]] used to retrieve metadata from the Hive MetaStore.
*
* The version of the Hive client that is used here must match the metastore that is configured
* in the hive-site.xml file.
*/
protected[hive] def newClientForMetadata(
conf: SparkConf,
hadoopConf: Configuration): HiveClient = {
val configurations = formatTimeVarsForHiveClient(hadoopConf)
newClientForMetadata(conf, hadoopConf, configurations) //很明显,核心在这里了
}
下面这个方法太长,我就不粘上来了,有兴趣的自己去看源码哈。简而言之呢,就是这个Client会根据Spark提交任务的时候的参数,嗯,就下面这仨
spark.sql.hive.metastore.version metastore的版本
spark.sql.hive.metastore.jars metastore相关依赖jar包的来源
spark.sql.hive.metastore.jars.path metastore jar包路径
根据这仨参数来确定metastore的依赖包在哪里。然后在初始化HiveClient的时候使用classloader加载这些包,找到对应的类,然后反射初始化实例,最终生成HiveClientImpl
所以,原来想要实现自动适配不同版本的hive,是需要这样实现的。。。
你可能会问了,然后呢?这跟你遇到的问题有什么关系?嗯,关系大了。。。在HiveClientImpl
初始化的时候,会需要初始化一个SessionState。要注意,这个是Hive的SessionState,不是上面那个Spark的,这俩除了名字一样啥都不一样。这个SessionState在使用.start()
方法时,会做以下的操作:
* 获取当前账户
* 使用账户创建scratch dir
* ...
这就是重点了。scratch dir的路径是四个:
* local:/tmp + /`最终user`+/当前sessionid
* local:/tmp + /当前sessionid
* hdfs:/tmp/hive/+/`最终user`+/当前sessionid
* hdfs:/tmp/hive/+/`最终user`+/当前sessionid/+_tmp_space.db
所以最初说的那个报错就在这里,root用户试图建立HDFS目录/tmp/hive/+/`最终user`+/当前sessionid的时候,因为没有/tmp/hive/的权限而失败。
为什么要建立四个?因为有一些文件需要上传到hdfs中,执行任务的时候随用随取。有的文件不需要上传,是client本地写的临时文件,session关闭了就自动删除了。
解决方案
问题到这里解决了嘛?并没有。
问题的症结其实在获取当前账户
那里。
Hive的SessionState使用Hadoop的方法获取当前账户:
获取System.getEnv("HADOOP_USER_NAME"),如果设置了值,就使用这个用户名作为代理用户,参与构建刚刚的hdfs目录;如果没有设置值,就使用启动用户,对我而言就是启动SpringBoot项目的用户root,参与构建hdfs目录,所以就报错了。**
所以解决方案就是,在启动之前,设置HADOOP_USER_NAME为其他有权限的用户。因为我启动的是local模式,所以直接在java代码里设置是有效的。如果是cluster模式,需要在driver节点设置。
还有点关联内容
为什么HiveClient要这样处理用户
往下看。
doAs的账户体系与大账户体系逻辑
HiveServer2有一个doAs配置。这个配置的逻辑其实体现在很多Hadoop生态服务中。
一个用户发起的连接,带来了其账户和密码。HiveServer2校验了账户和密码与其SQL的权限关系后(这个事情可以查看上个系列文章),如果权限通过,会转而使用一个统一的最高权限的账户去实际的执行。
说白了就是你以为你使用的是user账户,但是实际执行的是admin账户。这样的设计可以很好的减少切换账户带来的开销,让权限体系更简单些。hive都校验过了,hdfs就没必要再校验一次了。
当然如果你想卡的更死,有参数可以关掉,请自行百度doAs。
HiveServer2的启动
在上面两个问题的基础上,理解下面的原理。
HiveServer2的部署一般要求在其所在机器,hadoop环境是搭建好的。启动HiveServer2的账户一般就是拥有最高权限的admin账户,如果不是也要求HADOOP_USER_NAME是配置的这个值的。因为这样可以保证,在每一次获取用户的时候,不论环境变量是否完备,都能使用有权限的账户建立临时目录。