1 问题
用户有大量的并发 beeline hive sql 任务,偶发 Unable to read HiveServer2 uri from ZooKeeper 报错。
hive 版本:hdp 1.2.1
2 解决方案
修改 beeline connect url 增加 retries。
beeline url 修改前:
jdbc:hive2://**:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2;principal=hive/_HOST@KDC;auth=kerberos
beeline url 修改后:
jdbc:hive2://**:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2;principal=hive/_HOST@KDC;auth=kerberos;retries=5;
3 问题原因及分析
通过阅读源码,hiveserver2 连接步骤大致如下:
1)获取连接 url,如果是 serviceDiscoveryMode=zooKeeper
,动态的方式获取真正的连接信息与配置
2)通过 ZK 获取 hiveserver2 所有节点,对应的路径为/hiveserver2/。路径格式为:"/" + zooKeeperNamespace
:这个 zooKeeperNamespace
就是连接串里面配置的 hiveserver2。
3)从 list 中随机取一个 Znode,获取 zk 中 Znode 的值
4)解析 url ,获取真正的 hiveserver2 的地址与端口等信息
5)创建连接,支持重试
问题在于:
- 通过代码逻辑判断,第 3 步获取 znode 数据为空(可能是高并发请求导致的问题,zk 日志没有相关的信息),导致抛出 unable to read HiveServer2 uri from ZooKeeper: 异常
- 连接串里 retries 没有配置,默认是 1,所以一次失败就返回退出了。
源码
static void configureConnParams(JdbcConnectionParams connParams)
throws ZooKeeperHiveClientException {
String zooKeeperEnsemble = connParams.getZooKeeperEnsemble();
String zooKeeperNamespace =
connParams.getSessionVars().get(JdbcConnectionParams.ZOOKEEPER_NAMESPACE);
if ((zooKeeperNamespace == null) || (zooKeeperNamespace.isEmpty())) {
zooKeeperNamespace = JdbcConnectionParams.ZOOKEEPER_DEFAULT_NAMESPACE;
}
List<String> serverHosts;
Random randomizer = new Random();
String serverNode;
CuratorFramework zooKeeperClient =
CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
try {
zooKeeperClient.start();
serverHosts = zooKeeperClient.getChildren().forPath("/" + zooKeeperNamespace);
// Remove the znodes we've already tried from this list
serverHosts.removeAll(connParams.getRejectedHostZnodePaths());
if (serverHosts.isEmpty()) {
throw new ZooKeeperHiveClientException(
"Tried all existing HiveServer2 uris from ZooKeeper.");
}
// Now pick a server node randomly
serverNode = serverHosts.get(randomizer.nextInt(serverHosts.size()));
connParams.setCurrentHostZnodePath(serverNode);
// Read data from the znode for this server node
// This data could be either config string (new releases) or server end
// point (old releases)
String dataStr =
new String(
zooKeeperClient.getData().forPath("/" + zooKeeperNamespace + "/" + serverNode),
Charset.forName("UTF-8"));
Matcher matcher = kvPattern.matcher(dataStr);
// If dataStr is not null and dataStr is not a KV pattern,
// it must be the server uri added by an older version HS2
if ((dataStr != null) && (!matcher.find())) {
String[] split = dataStr.split(":");
if (split.length != 2) {
throw new ZooKeeperHiveClientException("Unable to read HiveServer2 uri from ZooKeeper: "
+ dataStr);
}
connParams.setHost(split[0]);
connParams.setPort(Integer.parseInt(split[1]));
} else {
applyConfs(dataStr, connParams);
}
} catch (Exception e) {
throw new ZooKeeperHiveClientException("Unable to read HiveServer2 configs from ZooKeeper", e);
} finally {
// Close the client connection with ZooKeeper
if (zooKeeperClient != null) {
zooKeeperClient.close();
}
}
}
for (int numRetries = 0;;) {
try {
assumeSubject =
JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE_FROM_SUBJECT.equals(sessConfMap
.get(JdbcConnectionParams.AUTH_KERBEROS_AUTH_TYPE));
transport = isHttpTransportMode() ? createHttpTransport() : createBinaryTransport();
if (!transport.isOpen()) {
transport.open();
logZkDiscoveryMessage("Connected to " + connParams.getHost() + ":" + connParams.getPort());
}
break;
} catch (TTransportException e) {
LOG.warn("Failed to connect to " + connParams.getHost() + ":" + connParams.getPort());
String errMsg = null;
String warnMsg = "Could not open client transport with JDBC Uri: " + jdbcUriString + ": ";
if (isZkDynamicDiscoveryMode()) {
errMsg = "Could not open client transport for any of the Server URI's in ZooKeeper: ";
// Try next available server in zookeeper, or retry all the servers again if retry is enabled
while(!Utils.updateConnParamsFromZooKeeper(connParams) && ++numRetries < maxRetries) {
connParams.getRejectedHostZnodePaths().clear();
}
// Update with new values
jdbcUriString = connParams.getJdbcUriString();
host = connParams.getHost();
port = connParams.getPort();
} else {
errMsg = warnMsg;
++numRetries;
}
if (numRetries >= maxRetries) {
throw new SQLException(errMsg + e.getMessage(), " 08S01", e);
} else {
LOG.warn(warnMsg + e.getMessage() + " Retrying " + numRetries + " of " + maxRetries);
}
}
}
一个小建议:遇到此类问题,直接看源码是最好的, 不要去网上瞎找。
Hive 3.x 的版本请注意这个 issue:https://issues.apache.org/jira/browse/HIVE-19825,也可能会导致随机的失败