默认的方式
hbase提供了对访问hbase时,使用doas impersonate的访问方式是通过thrift和rest两种方式,分别可以配置:
- thrift配置
<property>
<name>hbase.regionserver.thrift.http</name>
<value>true</value>
</property>
<property>
<name>hbase.thrift.support.proxyuser</name>
<value>true/value>
</property>
- rest配置
<property>
<name>hbase.rest.support.proxyuser</name>
<value>true</value>
</property>
但是如果我们使用hbase的java api来访问hbase,应该怎么办呢?
使用客户端实现对hbase的doas impersonate
参考org.apache.hadoop.hbase.rest.RESTServlet
对impersonate的实现方式,发现了一个非常nice的类:org.apache.hadoop.hbase.util.ConnectionCache
,使用方法非常简便:
private HbaseService(Configuration config) throws IOException {
this.config = config;
int cleanInterval = config.getInt(CLEANUP_INTERVAL, 10 * 1000);
int maxIdleTime = config.getInt(MAX_IDLETIME, 10 * 60 * 1000);
UserProvider userProvider = UserProvider.instantiate(config);
realUser = userProvider.getCurrentUserName();
connectionCache = new ConnectionCache(
config, userProvider, cleanInterval, maxIdleTime);
if (supportsProxyuser()) {
ProxyUsers.refreshSuperUserGroupsConfiguration(config);
}
}
public synchronized Table getTable(String tableName) throws IOException {
return connectionCache.getTable(tableName);
}
public synchronized Admin getAdmin() throws IOException {
return connectionCache.getAdmin();
}
public synchronized void setEffectiveUser(String user) {
if (supportsProxyuser()) {
connectionCache.setEffectiveUser(user);
}
}
- 使用
setEffectiveUser()
方法设置需要impersonate的用户 - 调用
getAdmin()
获得hbase的Admin对象 - 调用getTable() 获得Table对象;
ConnectionCache
方法实现非常巧妙:
private final ThreadLocal<String> effectiveUserNames =
new ThreadLocal<String>() {
@Override
protected String initialValue() {
return realUserName;
}
};
private final Map<String, ConnectionInfo> connections = new ConcurrentHashMap<>();
/**
* Caller closes the table afterwards.
*/
public Table getTable(String tableName) throws IOException {
ConnectionInfo connInfo = getCurrentConnection();
return connInfo.connection.getTable(TableName.valueOf(tableName));
}
/**
* Get the cached connection for the current user.
* If none or timed out, create a new one.
*/
ConnectionInfo getCurrentConnection() throws IOException {
String userName = getEffectiveUser();
ConnectionInfo connInfo = connections.get(userName);
if (connInfo == null || !connInfo.updateAccessTime()) {
Lock lock = locker.acquireLock(userName);
try {
connInfo = connections.get(userName);
if (connInfo == null) {
UserGroupInformation ugi = realUser;
if (!userName.equals(realUserName)) {
ugi = UserGroupInformation.createProxyUser(userName, realUser);
}
User user = userProvider.create(ugi);
Connection conn = ConnectionFactory.createConnection(conf, user);
connInfo = new ConnectionInfo(conn, userName);
connections.put(userName, connInfo);
}
} finally {
lock.unlock();
}
}
return connInfo;
}
- 通过
effectiveUserNames
存放每个线程私有的effectiveUser; - 通过
connections
持久保存每个user对应的connection;
设置可以作为代理的用户权限
如果要使用自定义用户作为代理用户,在core-site.xml或者hbase-site.xml中增加对于username的配置(如果已经配置则不需要二次配置):
<property>
<name>hadoop.security.authorization</name>
<value>true</value>
</property>
<property>
<name>hadoop.proxyuser.username.groups</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.username.hosts</name>
<value>*</value>
</property>