定义一个基类:
package org.jeff.r.tools;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.security.UserGroupInformation;
/**
* @author Jeff.R 2020/1/1
*/
public class MSBase {
public static String jdbcUri;
public static String userName = System.getenv("HADOOP_USER_NAME");
public static String password = System.getenv("HADOOP_USER_PASSWORD");
public static String hints;
public static UserGroupInformation ugi;
public static UserGroupInformation getUgi() throws Exception {
if(hints == null || hints.isEmpty()){
hints = String.format("%s%s%s", userName, "##", userPassword);
}
if(ugi == null){
ugi = UserGroupInformation.createRemoteUser(userName,
password, hints);
}
return ugi;
}
public static IMetaStoreClient getMSClient() throws MetaException {
if(jdbcUri == null || jdbcUri.isEmpty()){
throw new MetaException("jdbcUri must not be empty...");
}
HiveConf hiveConf = new HiveConf();
if (StringUtils.isNotBlank(jdbcUri)) {
hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, jdbcUri);
}
// 下面两种常规构建MS Client的方式, 不支持并发
//return new HiveMetaStoreClient(hiveConf);
//return RetryingMetaStoreClient.getProxy(hiveConf);
try {
// 通过下面构建支持并发,该并发访问patch自HIVE-10956引入
return Hive.get(hiveConf).getMSC();
} catch (HiveException e) {
e.printStackTrace();
return null;
}
}
}
定义一个并发测试:
package org.jeff.r.tools;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.thrift.TException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
/**
* @author Jeff.R 2020/1/1
*/
public class HIVE_10956_Test extends MSBase{
private IMetaStoreClient metaStoreClient;
public static void main(String[] args) throws Exception {
if(args.length < 1){
System.out.println("Usage: <jdbc_url> <thread_num>");
System.exit(1);
}
jdbcUri = args[0];
int thread_num = Integer.parseInt(args[1]);
try {
getUgi().doAs(new PrivilegedExceptionAction<Object>() {
@Override public Object run() throws Exception {
internal(thread_num);
return null;
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
public static void internal(int thread_num) throws Exception {
HIVE_10956_Test test = new HIVE_10956_Test();
test.metaStoreClient = getMSClient();
List<Thread> threads = new ArrayList();
for(int i = 0; i < thread_num; i++){
threads.add(new MyThread("Thread-" + i, test.metaStoreClient));
}
threads.forEach(o -> o.start());
threads.forEach(o-> {
try {
o.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
class MyThread extends Thread {
private String threadName;
private IMetaStoreClient metaStoreClient;
public MyThread(String threadName, IMetaStoreClient metaStoreClient){
this.threadName = threadName;
this.metaStoreClient = metaStoreClient;
}
@Override public void run() {
try {
System.out.println(threadName + " running...");
metaStoreClient.getAllDatabases();
} catch (TException e) {
System.out.println(threadName + " throw Exception...");
e.printStackTrace();
}
}
}
说明: 如果你的hive没有UGI鉴权,可以忽略或去除UGI鉴权部分.