参考:http://liguo86.com/2017/09/25/spark-thriftserver-ha%E6%94%AF%E6%8C%81/
实际使用过程中发现这篇帖子的实现不够严谨,且有错误的地方
1、只考虑了在spark thrift server启动时将服务注册到zookeeper中,但当服务失效后没有考虑注销的问题。这样会导致服务失效后的几分钟后,zookeper内还有改服务的注册信息,这个时候如果客户端尝试通过zookeeper获取实例执行sql查询的话,会导致执行失败。
2、在两处(startWithContext,main)启动hiveserver的途径中增加zookeeper注册服务的代码,代码重复。可以考虑在hiveserver本身的start和stop两个方法入手
3、配置文件中hive.server2.thrift.bind.host属性不能设置为0.0.0.0,这会导致多个spark-thrift-server实例在zookeeper中注册的地址都为无意义的0.0.0.0。这个问题当你在服务启动的节点测试连接的时候是不会出现问题的,因为确实指向了本机……,但当你尝试远程连接的时候就会发现无法正常连接了
最后我是这么改的:
注意,以下修改都是针对spark-hive-thriftserver_XXX.jar所做的修改
1、修改org.apache.spark.sql.hive.thriftserver的内部类HiveThriftServer2
private[hive] class HiveThriftServer2(sqlContext: SQLContext)
extends HiveServer2
with ReflectedCompositeService {
// state is tracked internally so that the server only attempts to shut down if it successfully
// started, and then once only.
private val started = new AtomicBoolean(false)
var hiveConf: HiveConf = _
override def init(hiveConf: HiveConf) {
this.hiveConf = hiveConf;
val sparkSqlCliService = new SparkSQLCLIService(this, sqlContext)
setSuperField(this, "cliService", sparkSqlCliService)
addService(sparkSqlCliService)
val thriftCliService = if (isHTTPTransportMode(hiveConf)) {
new ThriftHttpCLIService(sparkSqlCliService)
} else {
new ThriftBinaryCLIService(sparkSqlCliService)
}
setSuperField(this, "thriftCLIService", thriftCliService)
addService(thriftCliService)
initCompositeService(hiveConf)
}
private def isHTTPTransportMode(hiveConf: HiveConf): Boolean = {
val transportMode = hiveConf.getVar(ConfVars.HIVE_SERVER2_TRANSPORT_MODE)
transportMode.toLowerCase(Locale.ROOT).equals("http")
}
override def start(): Unit = {
super.start()
started.set(true)
//增加高可用支持,向zookeeper中注册当前服务
if (this.hiveConf.getBoolVar(
ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
invoke(classOf[HiveServer2], this, "addServerInstanceToZooKeeper",
classOf[HiveConf] -> this.hiveConf)
}
//高可用支持代码结束
}
override def stop(): Unit = {
if (started.getAndSet(false)) {
//关闭zookeeper链接,注销服务
if (this.hiveConf.getBoolVar(
ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
invoke(classOf[HiveServer2], this, "removeServerInstanceFromZooKeeper")
}
super.stop()
}
}
}
2、修改org.apache.hive.service.server.HiveServer2,增加addServerInstanceToZooKeeper,removeServerInstanceFromZooKeeper两个方法,使第一步的修改能够通过反射调用到
/**
* StartOptionExecutor: starts HiveServer2.
* This is the default executor, when no option is specified.
*/
static class StartOptionExecutor implements ServerOptionsExecutor {
@Override
public void execute() {
try {
startHiveServer2();
} catch (Throwable t) {
LOG.fatal("Error starting HiveServer2", t);
System.exit(-1);
}
}
}
private String getServerInstanceURI() throws Exception {
if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) {
throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
}
return getHiveHost() + ":"
+ thriftCLIService.getPortNumber();
}
private String getHiveHost() {
HiveConf hiveConf = thriftCLIService.getHiveConf();
String hiveHost = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
if (hiveHost != null && !hiveHost.isEmpty()) {
return hiveHost;
} else {
return thriftCLIService.getServerIPAddress().getHostName();
}
}
/**
* For a kerberized cluster, we dynamically set up the client's JAAS conf.
*
* @param hiveConf
* @return
* @throws Exception
*/
private void setUpZooKeeperAuth(HiveConf hiveConf) throws Exception {
if (UserGroupInformation.isSecurityEnabled()) {
String principal = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL);
if (principal.isEmpty()) {
throw new IOException("HiveServer2 Kerberos principal is empty");
}
String keyTabFile = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
if (keyTabFile.isEmpty()) {
throw new IOException("HiveServer2 Kerberos keytab is empty");
}
// Install the JAAS Configuration for the runtime
Utils.setZookeeperClientKerberosJaasConfig(principal, keyTabFile);
}
}
/**
* ACLProvider for providing appropriate ACLs to CuratorFrameworkFactory
*/
private final ACLProvider zooKeeperAclProvider = new ACLProvider() {
@Override
public List<ACL> getDefaultAcl() {
List<ACL> nodeAcls = new ArrayList<ACL>();
if (UserGroupInformation.isSecurityEnabled()) {
// Read all to the world
nodeAcls.addAll(ZooDefs.Ids.READ_ACL_UNSAFE);
// Create/Delete/Write/Admin to the authenticated user
nodeAcls.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.AUTH_IDS));
} else {
// ACLs for znodes on a non-kerberized cluster
// Create/Read/Delete/Write/Admin to the world
nodeAcls.addAll(ZooDefs.Ids.OPEN_ACL_UNSAFE);
}
return nodeAcls;
}
@Override
public List<ACL> getAclForPath(String path) {
return getDefaultAcl();
}
};
private void setDeregisteredWithZooKeeper(boolean deregisteredWithZooKeeper) {
this.deregisteredWithZooKeeper = deregisteredWithZooKeeper;
}
/**
* The watcher class which sets the de-register flag when the znode corresponding to this server
* instance is deleted. Additionally, it shuts down the server if there are no more active client
* sessions at the time of receiving a 'NodeDeleted' notification from ZooKeeper.
*/
private class DeRegisterWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
if (znode != null) {
try {
znode.close();
LOG.warn("This HiveServer2 instance is now de-registered from ZooKeeper. "
+ "The server will be shut down after the last client sesssion completes.");
} catch (IOException e) {
LOG.error("Failed to close the persistent ephemeral znode", e);
} finally {
HiveServer2.this.setDeregisteredWithZooKeeper(true);
// If there are no more active client sessions, stop the server
if (cliService.getSessionManager().getOpenSessionCount() == 0) {
LOG.warn("This instance of HiveServer2 has been removed from the list of server "
+ "instances available for dynamic service discovery. "
+ "The last client session has ended - will shutdown now.");
HiveServer2.this.stop();
}
}
}
}
}
}
/**
* Adds a server instance to ZooKeeper as a znode.
*
* @param hiveConf
* @throws Exception
*/
private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
String instanceURI = getServerInstanceURI();
setUpZooKeeperAuth(hiveConf);
int sessionTimeout =
(int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
TimeUnit.MILLISECONDS);
int baseSleepTime =
(int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
TimeUnit.MILLISECONDS);
int maxRetries = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
// Create a CuratorFramework instance to be used as the ZooKeeper client
// Use the zooKeeperAclProvider to create appropriate ACLs
zooKeeperClient =
CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
.sessionTimeoutMs(sessionTimeout).aclProvider(zooKeeperAclProvider)
.retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)).build();
zooKeeperClient.start();
// Create the parent znodes recursively; ignore if the parent already exists.
try {
zooKeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
.forPath(ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2");
} catch (KeeperException e) {
if (e.code() != KeeperException.Code.NODEEXISTS) {
LOG.fatal("Unable to create HiveServer2 namespace: " + rootNamespace + " on ZooKeeper", e);
throw e;
}
}
// Create a znode under the rootNamespace parent for this instance of the server
// Znode name: serverUri=host:port;version=versionInfo;sequence=sequenceNumber
try {
String pathPrefix =
ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
+ ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";"
+ "version=" + HiveVersionInfo.getVersion() + ";" + "sequence=";
// String znodeData = "";
// if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_PUBLISH_CONFIGS)) {
// // HiveServer2 configs that this instance will publish to ZooKeeper,
// // so that the clients can read these and configure themselves properly.
// Map<String, String> confsToPublish = new HashMap<String, String>();
// addConfsToPublish(hiveConf, confsToPublish);
// // Publish configs for this instance as the data on the node
// znodeData = Joiner.on(';').withKeyValueSeparator("=").join(confsToPublish);
// } else {
// znodeData = instanceURI;
// }
byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8"));
znode =
new PersistentEphemeralNode(zooKeeperClient,
PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8);
znode.start();
// We'll wait for 120s for node creation
long znodeCreationTimeout = 120;
if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) {
throw new Exception("Max znode creation wait time: " + znodeCreationTimeout + "s exhausted");
}
setDeregisteredWithZooKeeper(false);
znodePath = znode.getActualPath();
// Set a watch on the znode
if (zooKeeperClient.checkExists().usingWatcher(new DeRegisterWatcher()).forPath(znodePath) == null) {
// No node exists, throw exception
throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper.");
}
LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI);
} catch (Exception e) {
LOG.fatal("Unable to create a znode for this server instance", e);
if (znode != null) {
znode.close();
}
throw (e);
}
}
private void removeServerInstanceFromZooKeeper() throws Exception {
setDeregisteredWithZooKeeper(true);
if (znode != null) {
znode.close();
}
zooKeeperClient.close();
LOG.info("Server instance removed from ZooKeeper.");
}
另外,如果考虑提供的spark-thrift-server服务可能为集群外调用,且无法通过hostname连接到服务,就需要修改zookeeper中的注册信息为ip
主要修改就是HiveServer2类中的getServerInstanceURI方法
private String getServerInstanceURI() throws Exception {
if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) {
throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
}
return getHiveHost() + ":"
+ thriftCLIService.getPortNumber();
}
private String getHiveHost() {
HiveConf hiveConf = thriftCLIService.getHiveConf();
String hiveHost = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST);
if (hiveHost != null && !hiveHost.isEmpty()) {
return hiveHost;
} else {
return thriftCLIService.getServerIPAddress().getHostName();
}
}