
HBase源码分析之权限验证中讲过了自带的simple认证方式,Apache有个项目,也提供了权限验证,就是Ranger。Ranger的安装方式比较复杂,具体看:https://cwiki.apache.org/confluence/display/RANGER/Apache+Ranger+0.5.0+Installation
个人感觉Ranger还是略显粗糙,和我预期的Apache顶级项目有差距。
Ranger的权限管理是通过RangerAuthorizationCoprocessor来实现的,实现了MasterObserver、RegionServerObserver、RegionObserver、BulkLoadObserver,各种回调。
和HBase的grant、revoke同步
配置中配置了grant、revoke的时候,是否相应的刷新ranger的标记位UpdateRangerPoliciesOnGrantRevoke
UpdateRangerPoliciesOnGrantRevoke = RangerConfiguration.getInstance().getBoolean(RangerHadoopConstants.HBASE_UPDATE_RANGER_POLICIES_ON_GRANT_REVOKE_PROP, RangerHadoopConstants.HBASE_UPDATE_RANGER_POLICIES_ON_GRANT_REVOKE_DEFAULT_VALUE);
RangerAuthorizationCoprocessor实现了CoprocessorService接口,将自己注册进去,监听grant、revoke。
@Override
public Service getService() {
    return AccessControlProtos.AccessControlService.newReflectiveService(this);
}
实现了这2个方法,在这2个方法中判断UpdateRangerPoliciesOnGrantRevoke如果为true,就更新下自己的配置。
/**
 * <code>rpc Grant(.GrantRequest) returns (.GrantResponse);</code>
 */
public abstract void grant(
    com.google.protobuf.RpcController controller,
    org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GrantRequest request,
    com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.GrantResponse> done);
/**
 * <code>rpc Revoke(.RevokeRequest) returns (.RevokeResponse);</code>
 */
public abstract void revoke(
    com.google.protobuf.RpcController controller,
    org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.RevokeRequest request,
    com.google.protobuf.RpcCallback<org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.RevokeResponse> done);
Policy生效规则
各种操作之前调用evaluateAccess,代码简直裹脚布,总结起来就是判断了Namespace、table、column、qualifier的设置,将所有设置集中到AuthorizationSession中,然后调用AuthorizationSession的authorize,判断权限。
ColumnFamilyAccessResult evaluateAccess(String operation, Action action, 
                                    final RegionCoprocessorEnvironment env,
                                    final Map<byte[], ? extends Collection<?>> familyMap) 
  throws AccessDeniedException {
    String access = _authUtils.getAccess(action);
    User user = getActiveUser();
    String userName = _userUtils.getUserAsString(user);
    byte[] tableBytes = getTableName(env);
    if (tableBytes == null || tableBytes.length == 0) {
        throw new AccessDeniedException("Insufficient permissions for operation '" + operation + "',action: " + action);
    }
    String table = Bytes.toString(tableBytes);
    String clusterName = hbasePlugin.getClusterName();
    final String messageTemplate = "evaluateAccess: exiting: user[%s], Operation[%s], access[%s], families[%s], verdict[%s]";
    ColumnFamilyAccessResult result;
    if (canSkipAccessCheck(operation, access, table) || canSkipAccessCheck(operation, access, env)) {
        result = new ColumnFamilyAccessResult(true, true, null, null, null, null, null, null);
        return result;
    }
    // let's create a session that would be reused.  Set things on it that won't change.
    HbaseAuditHandler auditHandler = _factory.getAuditHandler();
    AuthorizationSession session = new AuthorizationSession(hbasePlugin)
            .operation(operation)
            .remoteAddress(getRemoteAddress())
            .auditHandler(auditHandler)
            .user(user)
            .access(access)
            .table(table)
            .clusterName(clusterName);
    Map<String, Set<String>> families = getColumnFamilies(familyMap);
    if (families == null || families.isEmpty()) {
        session.buildRequest()
            .authorize();
        boolean authorized = session.isAuthorized();
        String reason = "";
        if (!authorized) {
            reason = String.format("Insufficient permissions for user ‘%s',action: %s, tableName:%s, no column families found.", user.getName(), operation, table);
        }
        AuthzAuditEvent event = auditHandler.getAndDiscardMostRecentEvent(); // this could be null, of course, depending on audit settings of table.
        // if authorized then pass captured events as access allowed set else as access denied set.
        result = new ColumnFamilyAccessResult(authorized, authorized,
                    authorized ? Collections.singletonList(event) : null,
                    null, authorized ? null : event, reason, null, clusterName);
        return result;
    }
    boolean everythingIsAccessible = true;
    boolean somethingIsAccessible = false;
    /*
     * we would have to accumulate audits of all successful accesses and any one denial (which in our case ends up being the last denial)
     * We need to keep audit events for family level access check seperate because we don't want them logged in some cases.
     */
    List<AuthzAuditEvent> authorizedEvents = new ArrayList<AuthzAuditEvent>();
    List<AuthzAuditEvent> familyLevelAccessEvents = new ArrayList<AuthzAuditEvent>();
    AuthzAuditEvent deniedEvent = null;
    String denialReason = null;
    // we need to cache the auths results so that we can create a filter, if needed
    Map<String, Set<String>> columnsAccessAllowed = new HashMap<String, Set<String>>();
    Set<String> familesAccessAllowed = new HashSet<String>();
    Set<String> familesAccessDenied = new HashSet<String>();
    Set<String> familesAccessIndeterminate = new HashSet<String>();
    for (Map.Entry<String, Set<String>> anEntry : families.entrySet()) {
        String family = anEntry.getKey();
        session.columnFamily(family);
        Set<String> columns = anEntry.getValue();
        if (columns == null || columns.isEmpty()) {
            session.column(null) // zap stale column from prior iteration of this loop, if any
                .buildRequest()
                .authorize();
            AuthzAuditEvent auditEvent = auditHandler.getAndDiscardMostRecentEvent(); // capture it only for success
            if (session.isAuthorized()) {
                // we need to do 3 things: housekeeping, decide about audit events, building the results cache for filter
                somethingIsAccessible = true;
                familesAccessAllowed.add(family);
                if (auditEvent != null) {
                    familyLevelAccessEvents.add(auditEvent);
                }
            } else {
                everythingIsAccessible = false;
                if (auditEvent != null && deniedEvent == null) { // we need to capture just one denial event
                    deniedEvent = auditEvent;
                }
                session.resourceMatchingScope(RangerAccessRequest.ResourceMatchingScope.SELF_OR_DESCENDANTS)
                        .buildRequest()
                        .authorize();
                auditEvent = auditHandler.getAndDiscardMostRecentEvent(); // capture it only for failure
                if (session.isAuthorized()) {
                    // we need to do 3 things: housekeeping, decide about audit events, building the results cache for filter
                    somethingIsAccessible = true;
                    familesAccessIndeterminate.add(family);
                } else {
                    familesAccessDenied.add(family);
                    denialReason = String.format("Insufficient permissions for user ‘%s',action: %s, tableName:%s, family:%s.", user.getName(), operation, table, family);
                    if (auditEvent != null && deniedEvent == null) { // we need to capture just one denial event
                        deniedEvent = auditEvent;
                    }
                }
                // Restore the headMatch setting
                session.resourceMatchingScope(RangerAccessRequest.ResourceMatchingScope.SELF);
            }
        } else {
            Set<String> accessibleColumns = new HashSet<String>(); // will be used in to populate our results cache for the filter
            for (String column : columns) {
                session.column(column)
                    .buildRequest()
                    .authorize();
                AuthzAuditEvent auditEvent = auditHandler.getAndDiscardMostRecentEvent();
                if (session.isAuthorized()) {
                    // we need to do 3 things: housekeeping, capturing audit events, building the results cache for filter
                    somethingIsAccessible = true;
                    accessibleColumns.add(column);
                    if (auditEvent != null) {
                        authorizedEvents.add(auditEvent);
                    }
                } else {
                    everythingIsAccessible = false;
                    denialReason = String.format("Insufficient permissions for user ‘%s',action: %s, tableName:%s, family:%s, column: %s", user.getName(), operation, table, family, column);
                    if (auditEvent != null && deniedEvent == null) { // we need to capture just one denial event
                        deniedEvent = auditEvent;
                    }
                }
                if (!accessibleColumns.isEmpty()) {
                    columnsAccessAllowed.put(family, accessibleColumns);
                }
            }
        }
    }
    // Cache of auth results are encapsulated the in the filter. Not every caller of the function uses it - only preGet and preOpt will.
    RangerAuthorizationFilter filter = new RangerAuthorizationFilter(session, familesAccessAllowed, familesAccessDenied, familesAccessIndeterminate, columnsAccessAllowed);
    result = new ColumnFamilyAccessResult(everythingIsAccessible, somethingIsAccessible, authorizedEvents, familyLevelAccessEvents, deniedEvent, denialReason, filter, clusterName);
    return result;
}
authorize里会调用到RangerPolicyEngineImpl#isAccessAllowed(RangerAccessRequest request, RangerAccessResultProcessor resultProcessor)方法
@Override
public RangerAccessResult isAccessAllowed(RangerAccessRequest request, RangerAccessResultProcessor resultProcessor) {
   RangerAccessResult ret = isAccessAllowedNoAudit(request);
   updatePolicyUsageCounts(request, ret);
   if (resultProcessor != null) {
      resultProcessor.processResult(ret);
   }
   return ret;
}
RangerPolicyEngineImpl#isAccessAllowed中会从RangerPolicyRepository中查找该资源的所有Policy,遍历执行RangerDefaultPolicyEvaluator#evaluatePolicyItems,来进行评估是否有权限访问。遍历过程中如果发现了匹配的规则,决定了deny还是allow,遍历就会break。每一次的遍历先从denyEvaluators里查找匹配的deny权限,如果没有找到,就从allowEvaluators里查找匹配的allow权限。
protected void evaluatePolicyItems(RangerAccessRequest request, RangerAccessResult result, boolean isResourceMatch) {
    // 先看有没有匹配的deny记录
    RangerPolicyItemEvaluator matchedPolicyItem = getMatchingPolicyItem(request, denyEvaluators, denyExceptionEvaluators);
    // 再看有没有匹配的allow记录
    if (matchedPolicyItem == null && !result.getIsAllowed()) {
        matchedPolicyItem = getMatchingPolicyItem(request, allowEvaluators, allowExceptionEvaluators);
    }
    if (matchedPolicyItem != null) {
        RangerPolicy policy = getPolicy();
        if (matchedPolicyItem.getPolicyItemType() == RangerPolicyItemEvaluator.POLICY_ITEM_TYPE_DENY) {
            if (isResourceMatch) {
                result.setIsAllowed(false);
                result.setPolicyId(policy.getId());
                result.setReason(matchedPolicyItem.getComments());
            }
        } else {
            if (!result.getIsAllowed()) {
                result.setIsAllowed(true);
                result.setPolicyId(policy.getId());
                result.setReason(matchedPolicyItem.getComments());
            }
        }
    }
}
配置更新
在RangerAuthorizationCoprocessor的start中创建了RangerHBasePlugin
@Override
public void start(CoprocessorEnvironment env) throws IOException {
   ...
   // create and initialize the plugin class
   RangerHBasePlugin plugin = hbasePlugin;
   if(plugin == null) {
      synchronized(RangerAuthorizationCoprocessor.class) {
         plugin = hbasePlugin;
         if(plugin == null) {
            plugin = new RangerHBasePlugin(appType);
            plugin.init();
            UpdateRangerPoliciesOnGrantRevoke = RangerConfiguration.getInstance().getBoolean(RangerHadoopConstants.HBASE_UPDATE_RANGER_POLICIES_ON_GRANT_REVOKE_PROP, RangerHadoopConstants.HBASE_UPDATE_RANGER_POLICIES_ON_GRANT_REVOKE_DEFAULT_VALUE);
            hbasePlugin = plugin;
         }
      }
   }
  ...
}
RangerHBasePlugin的init方法中创建了PolicyRefresher用于同步权限配置,默认刷新时间为30*1000ms,即30s一次主动拉取配置。
public void init() {
    ...
    long   pollingIntervalMs = RangerConfiguration.getInstance().getLong(propertyPrefix + ".policy.pollIntervalMs", 30 * 1000);
    ...
    refresher = new PolicyRefresher(this, serviceType, appId, serviceName, admin, pollingIntervalMs, cacheDir);
    refresher.setDaemon(true);
    refresher.startRefresher();
    ...
}
PolicyRefresher本质是一个Thread,在start之后,会执行run()方法,这里进入了一个loop,执行完一次配置拉取猴,线程sleep 30s。
public void run() {
   while(true) {
      loadPolicy();
      try {
         Thread.sleep(pollingIntervalMs);
      } catch(InterruptedException excp) {
         break;
      }
   }
}
private void loadPolicy() {
    try {
        // 拉取一次配置
        ServicePolicies svcPolicies = loadPolicyfromPolicyAdmin();
        if (svcPolicies == null) {
            // 启动时拉取失败会从缓存中再读取一次
            if (!policiesSetInPlugin) {
                svcPolicies = loadFromCache();
            }
        } else {
            // 写到缓存中
            saveToCache(svcPolicies);
        }
        // 生效配置
        if (svcPolicies != null) {
            plugIn.setPolicies(svcPolicies);
            policiesSetInPlugin = true;
            setLastActivationTimeInMillis(System.currentTimeMillis());
            lastKnownVersion = svcPolicies.getPolicyVersion();
        } else {
            if (!policiesSetInPlugin && !serviceDefSetInPlugin) {
                plugIn.setPolicies(null);
                serviceDefSetInPlugin = true;
            }
        }
    } catch (Exception excp) {
    }
}
配置读取到之后,会写入RangerBasePlugin,并重新new一个RangerPolicyRepository实例,配置作为构造函数的参数,放入了RangerPolicyRepository。
-END-