Spark HiveThriftServer高可用的问题
spark HiveThriftServer 继承了HiveServer2,但是却没有继承HiveServer2的HA机制,现在我们通过修改源码的方式来实现HiveThriftServer的高可用,基本原理是在zk上注册多个服务的连接地址,与HiveServer2的使用方式相同
涉及类及源码修改
- spark HiveThriftServer的入口类为HiveThriftServer2,该object有个main方法入口,我们看下这里的main方法做了什么
def main(args: Array[String]) {
// 解析命令行参数
Utils.initDaemon(log)
val optionsProcessor = new HiveServer2.ServerOptionsProcessor("HiveThriftServer2")
optionsProcessor.parse(args)
logInfo("Starting SparkContext")
// 初始化环境
SparkSQLEnv.init()
ShutdownHookManager.addShutdownHook { () =>
SparkSQLEnv.stop()
uiTab.foreach(_.detach())
}
val executionHive = HiveUtils.newClientForExecution(
SparkSQLEnv.sqlContext.sparkContext.conf,
SparkSQLEnv.sqlContext.sessionState.newHadoopConf())
try {
/**
* 实例化hiveThriftServer2
*/
val server = new HiveThriftServer2(SparkSQLEnv.sqlContext)
server.init(executionHive.conf)
server.start()
logInfo("HiveThriftServer2 started")
listener = new HiveThriftServer2Listener(server, SparkSQLEnv.sqlContext.conf)
SparkSQLEnv.sparkContext.addSparkListener(listener)
uiTab = if (SparkSQLEnv.sparkContext.getConf.getBoolean("spark.ui.enabled", true)) {
Some(new ThriftServerTab(SparkSQLEnv.sparkContext))
} else {
None
}
// If application was killed before HiveThriftServer2 start successfully then SparkSubmit
// process can not exit, so check whether if SparkContext was stopped.
if (SparkSQLEnv.sparkContext.stopped.get()) {
logError("SparkContext has stopped even if HiveServer2 has started, so exit")
System.exit(-1)
}
} catch {
case e: Exception =>
logError("Error starting HiveThriftServer2", e)
System.exit(-1)
}
}
- 我们看到这里new了一个HiveThriftServer2的对象,我们进这个对象看一下
private[hive] class HiveThriftServer2(sqlContext: SQLContext)
extends HiveServer2
with ReflectedCompositeService {
// state is tracked internally so that the server only attempts to shut down if it successfully
// started, and then once only.
private val started = new AtomicBoolean(false)
// todo: 新加的
var hiveConf:HiveConf = _
/**
* 初始化hiveThriftServer
*/
override def init(hiveConf: HiveConf) {
this.hiveConf = hiveConf
val sparkSqlCliService = new SparkSQLCLIService(this, sqlContext)
setSuperField(this, "cliService", sparkSqlCliService)
addService(sparkSqlCliService)
val thriftCliService = if (isHTTPTransportMode(hiveConf)) {
new ThriftHttpCLIService(sparkSqlCliService)
} else {
new ThriftBinaryCLIService(sparkSqlCliService)
}
setSuperField(this, "thriftCLIService", thriftCliService)
addService(thriftCliService)
initCompositeService(hiveConf)
}
private def isHTTPTransportMode(hiveConf: HiveConf): Boolean = {
val transportMode = hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE)
transportMode.toLowerCase(Locale.ROOT).equals("http")
}
override def start(): Unit = {
super.start()
started.set(true)
/**
* todo: 这里使用了HiveServer的高可用的配置
*/
if (this.hiveConf.getBoolVar(
ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
/*
* addServerInstanceToZooKeeper 把hiveServer2注册到zookeeper
* */
invoke(classOf[HiveServer2], this, "addServerInstanceToZooKeeper",
classOf[HiveConf] -> this.hiveConf)
}
}
override def stop(): Unit = {
/**
* todo: 停止的时候,将zookeeper上的注册信息删除
*/
if (started.getAndSet(false)) {
if (this.hiveConf.getBoolVar(
ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
invoke(classOf[HiveServer2], this, "removeServerInstanceFromZooKeeper")
}
super.stop()
}
}
}
- 这里的方法也很简单,一个初始化,一个start()方法,一个stop()方法,代码中todo的部分是我新加的代码,改动也很少
- 新加hiveConf的成员变量,记录初始化时的配置,以便后面用
- start方法: 在zookeeper中注册服务信息
- stop方法: 在zookeeper中删除注册信息
- 这里主要是应用了反射,在HiveServer中添加了注册和删除zk的信息,我们来到HiveServer2的代码中看一下如何修改
/*################################# 新增方法 ############################################*/
//获取thriftServer的IP:HOST
private String getServerInstanceURI() throws Exception {
if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) {
throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
}
return getHiveHost() + ":"
+ thriftCLIService.getPortNumber();
}
private String getHiveHost() {
HiveConf hiveConf = thriftCLIService.getHiveConf();
String hiveHost = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
if (hiveHost != null && !hiveHost.isEmpty()) {
return hiveHost;
} else {
return thriftCLIService.getServerIPAddress().getHostName();
}
}
/**
* 控制是否需要重新在zookeeper上注册HiveServer2
* */
private boolean deregisteredWithZooKeeper = false;
private void setDeregisteredWithZooKeeper(boolean deregisteredWithZooKeeper) {
this.deregisteredWithZooKeeper = deregisteredWithZooKeeper;
}
/**
* zk的监控者,如果发现注册信息为null,会触发监控,然后关掉当前注册hiveServer2的实例信息
*/
private PersistentEphemeralNode znode;
private class DeRegisterWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
if (znode != null) {
try {
znode.close();
LOG.warn("This HiveServer2 instance is now de-registered from ZooKeeper. "
+ "The server will be shut down after the last client sesssion completes.");
} catch (IOException e) {
LOG.error("Failed to close the persistent ephemeral znode", e);
} finally {
HiveServer2.this.setDeregisteredWithZooKeeper(true);
// 如果当前已经没有可用的服务,那么就把HiveServer2关闭掉
if (cliService.getSessionManager().getOpenSessionCount() == 0) {
LOG.warn("This instance of HiveServer2 has been removed from the list of server "
+ "instances available for dynamic service discovery. "
+ "The last client session has ended - will shutdown now.");
HiveServer2.this.stop();
}
}
}
}
}
}
private CuratorFramework zooKeeperClient;
private String znodePath;
/**
* 把服务注册到zookeeper中
* @param hiveConf
* @throws Exception
*/
private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
//从hiveConf中获取zookeeper地址
String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
//从hive-site.xml中获取hive.server2.zookeeper.namespace的配置信息
String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
//获取用户提供的thriftServer地址
String instanceURI = getServerInstanceURI();
//获取hive连接zookeeper的session超时时间
int sessionTimeout =
(int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
TimeUnit.MILLISECONDS);
//hive连接zookeeper的等待时间
int baseSleepTime =
(int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
TimeUnit.MILLISECONDS);
//hive连接zookeeper的最大重试次数
int maxRetries = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
// 获取zookeeper客户端
zooKeeperClient =
CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
.sessionTimeoutMs(sessionTimeout)
.retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)).build();
//启动zookeeper客户端
zooKeeperClient.start();
//TODO 在zookeeper上根据rootNamespace创建一个空间(用来存储数据的文件夹)
try {
zooKeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
.forPath(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2");
} catch (KeeperException e) {
if (e.code() != KeeperException.Code.NODEEXISTS) {
LOG.fatal("Unable to create HiveServer2 namespace: " + rootNamespace + " on ZooKeeper", e);
throw e;
}
}
//TODO 把hiveserver2的信息注册到rootNamespace下:
// serverUri=cdh1:10001;version=1.2.1.spark2;sequence=0000000005
try {
String pathPrefix =
ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
+ ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";"
+ "version=" + HiveVersionInfo.getVersion() + ";" + "sequence=";
byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8"));
znode =
new PersistentEphemeralNode(zooKeeperClient,
PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8);
znode.start();
// We'll wait for 120s for node creation
long znodeCreationTimeout = 120;
if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) {
throw new Exception("Max znode creation wait time: " + znodeCreationTimeout + "s exhausted");
}
setDeregisteredWithZooKeeper(false);
znodePath = znode.getActualPath();
// TODO 添加zk的watch , 如果服务不见了,需要第一时间watche到
if (zooKeeperClient.checkExists().usingWatcher(new DeRegisterWatcher()).forPath(znodePath) == null) {
// No node exists, throw exception
throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper.");
}
LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI);
} catch (Exception e) {
LOG.fatal("Unable to create a znode for this server instance", e);
if (znode != null) {
znode.close();
}
throw (e);
}
}
//移除znode,代表当前程序关闭
private void removeServerInstanceFromZooKeeper() throws Exception {
setDeregisteredWithZooKeeper(true);
if (znode != null) {
znode.close();
}
zooKeeperClient.close();
LOG.info("Server instance removed from ZooKeeper.");
}
- 这里主要是对HiveServer2添加了两个方法,及HiveThriftServer2中通过反射调用的两个方法,分别是在启动的时候,在zk指定的地址上注册连接信息,以及停止的时候删除对应位置的连接信息。
- 新建了一个zkWatcher监控
- 注意这里zk存储的数据信息,必须是:
serverUri=cdh1:10001;version=1.2.1.spark2;sequence=0000000005
的形式,主要是为了复用hiveServer2的高可用
编译
mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -Dscala-2.11 -DskipTests clean package
- 得到的hive-thriftServer.jar包替换spark lib目录下的jar包即可
相关配置
- 这里的高可用的配置与hiveServer2的高可用配置是一致的,在hive-site.xml文件中添加
<name>hive.server2.support.dynamic.service.discovery</name>
<value>true</value>
</property>
<property>
<name>hive.server2.zookeeper.namespace</name>
<value>hiveserver2_zk</value>
</property>
<property
<name>hive.zookeeper.quorum</name>
<value>cdh1:2181,cdh2:2181,cdh3:2181</value>
</property>
<property>
<name>hive.zookeeper.client.port</name>
<value>2181</value>
</property>
<property>
<name>hive.server2.thrift.bind.host</name>
<value>cdh1</value>
</property>
- 另外,因为我们需要以spark的
sbin/start-thriftserver.sh
脚本启动进程,脚本中调用了spark-daemon.sh
脚本实际启动任务,而这个脚本中的有关于重复应用的检查,即同一台机器上启动多个相同应用会报错,如果我们要在同一台机器上启动两个hiveThriftServer的话,我们需要对spark-daemon.sh
做一些修改
# 屏蔽以下脚本###########################################
# if [ -f "$pid" ]; then
# TARGET_ID="$(cat "$pid")"
# if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then
# echo "$command running as process $TARGET_ID. Stop it first."
# exit 1
# fi
# fi
########################################################
启动
- 启动命令
sbin/start-thriftserver.sh \
--master yarn \
--conf spark.driver.memory=1G \
--executor-memory 1G \
--num-executors 1 \
--hiveconf hive.server2.thrift.port=10002
- beeline连接命令
!connect jdbc:hive2://cdh1:2181,cdh2:2181,cdh3:2181/default;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2_zk