Apache Sentry初读

1. 背景

    Apache Sentry 是Cloudera公司发布的一个Hadoop开源组件,提供了细粒度级、基于角色的授权以及多租户的管理模式,主要针对存储在Hadoop集群上的数据和元数据。它可以和Hive/Hcatalog、Apache Solr 和Cloudera Impala等集成,未来可以扩展到其他Hadoop生态系统组件,如HDFS和HBase。

    Sentry旨在成为可插拔授权引擎的Hadoop组件。允许定义授权规则以验证用户或应用程序对Hadoop资源的访问请求。Sentry是高度模块化的,可以支持Hadoop中各种数据模型的授权。

2. 本文的目标

    本文目标人群为初次接触apache sentry的开发人员,帮助其找到代码的入口函数,快速摸清代码的架构,梳理系统的结构,追踪代码的调用路径,为之后的深入阅读打下基础。建议阅读之前,在网上搜索sentry相关的资料,了解其架构,原理,以便更好地了解源代码。写这篇文章时,本人也是刚接触sentry,有些心得,与大家分享。更多深入的理解,请关注下一次的分享

3. sentry简单介绍

    权限模型,在很多系统我们都见过,先是资源,然后是权限,权限是对资源的访问规则,然后有角色,角色是一组权限,最后是用户,角色赋予给用户,有时候也会有组的概念,相同属性的用户划成一组,角色赋予给组。sentry的权限模型也是同样的原理。它包括一下元素:

    a. Resource:权限的对象,包括server, 库,表,行,uri等

    b. Privilege:  权限,可以连接成一组访问规则。比如用户A可以对表T进行读访问,但不能删除。

    c. Role:角色是一组权限的集合

    d. Group:相当于用户这个概念,sentry不存在用户这个概念。都是以Group来表达。你可以理解成sentry的组就是用户或者账号的概念。角色赋予给Group。

4. 代码初读

4.1 入口函数

    阅读源代码,最好的方式是找到入口函数,从入口函数一步步往下阅读,自然能够梳理出代码的整体架构。

    Sentry入口函数位于:sentry-tools/src/main/java/org/apache/sentry/SentryMain.java

为什么我知道在这儿,根据启动时采用命令行的方式,带参数。一般都会有main函数,还是经验问题哈。



public static void main(String[] args)

            throws Exception {

        CommandLineParser parser = new GnuParser();

        Options options = new Options();

        options.addOption(HELP_SHORT, HELP_LONG, false, "Print this help text");

        options.addOption(VERSION_SHORT, VERSION_LONG, false,

                "Print Sentry version");

        options.addOption(HIVE_CONF, true, "Set hive configuration variables");

        options.addOption(null, COMMAND, true, "Command to run. Options: " + COMMANDS);

        options.addOption(null, LOG4J_CONF, true, "Location of log4j properties file");

        //Ignore unrecognized options: service and config-tool options

        CommandLine commandLine = parser.parse(options, args, true);

        String log4jconf = commandLine.getOptionValue(LOG4J_CONF);

        if (log4jconf != null && log4jconf.length() > 0) {

            Properties log4jProperties = new Properties();

            // Firstly load log properties from properties file

            try (InputStream istream = Files.newInputStream(Paths.get(log4jconf))) {

                log4jProperties.load(istream);

            }

            // Set the log level of DataNucleus.Query to INFO only if it is not set in the

            // properties file

            if (!log4jProperties.containsKey(LOG4J_DATANUCLEUS)) {

                log4jProperties.setProperty(LOG4J_DATANUCLEUS, "INFO");

                // Enable debug log for DataNucleus.Query only when log.threshold is TRACE

                String logThreshold = log4jProperties.getProperty("log.threshold");

                if (logThreshold != null && logThreshold.equalsIgnoreCase("TRACE")) {

                    log4jProperties.setProperty(LOG4J_DATANUCLEUS, "DEBUG");

                }

            }

            PropertyConfigurator.configure(log4jProperties);

            Logger sentryLogger = LoggerFactory.getLogger(SentryMain.class);

            sentryLogger.info("Configuring log4j to use [" + log4jconf + "]");

        }

        //Print sentry help only if commandName was not given,

        // otherwise we assume the help is for the sub command

        String commandName = commandLine.getOptionValue(COMMAND);

        if (commandName == null && (commandLine.hasOption(HELP_SHORT) ||

                commandLine.hasOption(HELP_LONG))) {

            printHelp(options, "Command name is missing.");

        } else if (commandLine.hasOption(VERSION_SHORT) ||

                commandLine.hasOption(VERSION_LONG)) {

            printVersion();

        }

        Command command = null;

        switch (commandName){

            case "service":

                command = new SentryService.CommandImpl();

                break;

            case "config-tool":

                command = new SentryConfigTool.CommandImpl();

                break;

            case "schema-tool":

                command = new SentrySchemaTool.CommandImpl();

                break;

            default:

                printHelp(options, "Unknown command " + commandName + "\n");

                break;

        }

        ((Command)command).run(commandLine.getArgs());

    }



这段代码主要是根据传入的参数,解析参数,读取配置文件,初始化日志。其他的大致看一眼,最主要的是这个函数:command = new SentryService.CommandImpl();

4.2 启动过程:

    sentry启动过程主要看这个类:sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/SentryService.java。



    
public SentryService(Configuration conf) throws Exception {

    this.conf = conf;

    int port = conf

        .getInt(ServerConfig.RPC_PORT, ServerConfig.RPC_PORT_DEFAULT);

    if (port == 0) {

      port = findFreePort();

      conf.setInt(ServerConfig.RPC_PORT, port);

    }

    this.address = NetUtils.createSocketAddr(

        conf.get(ServerConfig.RPC_ADDRESS, ServerConfig.RPC_ADDRESS_DEFAULT),

        port);

    LOGGER.info("Configured on address {}", address);

    kerberos = ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase(

        conf.get(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_KERBEROS).trim());

    maxThreads = conf.getInt(ServerConfig.RPC_MAX_THREADS,

        ServerConfig.RPC_MAX_THREADS_DEFAULT);

    minThreads = conf.getInt(ServerConfig.RPC_MIN_THREADS,

        ServerConfig.RPC_MIN_THREADS_DEFAULT);

    maxMessageSize = conf.getLong(ServerConfig.SENTRY_POLICY_SERVER_THRIFT_MAX_MESSAGE_SIZE,

        ServerConfig.SENTRY_POLICY_SERVER_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);

    if (kerberos) {

      // Use Hadoop libraries to translate the _HOST placeholder with actual hostname

      try {

        String rawPrincipal = Preconditions.checkNotNull(conf.get(ServerConfig.PRINCIPAL), ServerConfig.PRINCIPAL + " is required");

        principal = SecurityUtil.getServerPrincipal(rawPrincipal, address.getAddress());

      } catch(IOException io) {

        throw new RuntimeException("Can't translate kerberos principal'", io);

      }

      LOGGER.info("Using kerberos principal: {}", principal);

      principalParts = SaslRpcServer.splitKerberosName(principal);

      Preconditions.checkArgument(principalParts.length == 3,

          "Kerberos principal should have 3 parts: " + principal);

      keytab = Preconditions.checkNotNull(conf.get(ServerConfig.KEY_TAB),

          ServerConfig.KEY_TAB + " is required");

      File keytabFile = new File(keytab);

      Preconditions.checkState(keytabFile.isFile() && keytabFile.canRead(),

          "Keytab %s does not exist or is not readable.", keytab);

    } else {

      principal = null;

      principalParts = null;

      keytab = null;

    }

    ThreadFactory sentryServiceThreadFactory = new ThreadFactoryBuilder()

        .setNameFormat(SENTRY_SERVICE_THREAD_NAME)

        .build();

    serviceExecutor = Executors.newSingleThreadExecutor(sentryServiceThreadFactory);

    this.sentryStore = getSentryStore(conf);

    sentryStore.setPersistUpdateDeltas(SentryServiceUtil.isHDFSSyncEnabled(conf));

    this.leaderMonitor = LeaderStatusMonitor.getLeaderStatusMonitor(conf);

    status = Status.NOT_STARTED;

    // Enable signal handler for HA leader/follower status if configured

    String sigName = conf.get(ServerConfig.SERVER_HA_STANDBY_SIG);

    if ((sigName != null) && !sigName.isEmpty()) {

      LOGGER.info("Registering signal handler {} for HA", sigName);

      try {

        registerSigListener(sigName, this);

      } catch (Exception e) {

        LOGGER.error("Failed to register signal", e);

      }

    }

  }



    首先看下这个类的构造函数,主要是根据传入的配置文件,确定thrift的端口,最大线程数,最小线程数,最大消息size。还有监控,kerberos验证,可用行HA的配置。最主要的是看: this.sentryStore = getSentryStore(conf);这个函数。由于是采用thrift这种rpc框架,所以要注册processor,以及根据thrift IDL生成实际的处理函数。要想弄懂Sentry,建议先去弄懂Thrift这个RPC框架。



private void runServer() throws Exception {

    startSentryStoreCleaner(conf);

    startHMSFollower(conf);

    Iterable<String> processorFactories = ConfUtilties.CLASS_SPLITTER

        .split(conf.get(ServerConfig.PROCESSOR_FACTORIES,

            ServerConfig.PROCESSOR_FACTORIES_DEFAULT).trim());

    TMultiplexedProcessor processor = new TMultiplexedProcessor();

    boolean registeredProcessor = false;

    for (String processorFactory : processorFactories) {

      Class<?> clazz = conf.getClassByName(processorFactory);

      if (!ProcessorFactory.class.isAssignableFrom(clazz)) {

        throw new IllegalArgumentException("Processor Factory "

            + processorFactory + " is not a "

            + ProcessorFactory.class.getName());

      }

      try {

        Constructor<?> constructor = clazz

            .getConstructor(Configuration.class);

        LOGGER.info("ProcessorFactory being used: " + clazz.getCanonicalName());

        ProcessorFactory factory = (ProcessorFactory) constructor

            .newInstance(conf);

        boolean registerStatus = factory.register(processor, sentryStore);

        if (!registerStatus) {

          LOGGER.error("Failed to register " + clazz.getCanonicalName());

        }

        registeredProcessor = registerStatus || registeredProcessor;

      } catch (Exception e) {

        throw new IllegalStateException("Could not create "

            + processorFactory, e);

      }

    }

    if (!registeredProcessor) {

      throw new IllegalStateException(

          "Failed to register any processors from " + processorFactories);

    }

    addSentryServiceGauge();

    TServerTransport serverTransport = new TServerSocket(address);

    TTransportFactory transportFactory = null;

    if (kerberos) {

      TSaslServerTransport.Factory saslTransportFactory = new TSaslServerTransport.Factory();

      saslTransportFactory.addServerDefinition(AuthMethod.KERBEROS

          .getMechanismName(), principalParts[0], principalParts[1],

              ServerConfig.SASL_PROPERTIES, new GSSCallback(conf));

      transportFactory = saslTransportFactory;

    } else {

      transportFactory = new TTransportFactory();

    }

    TThreadPoolServer.Args args = new TThreadPoolServer.Args(

        serverTransport).processor(processor)

        .transportFactory(transportFactory)

        .protocolFactory(new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize))

        .minWorkerThreads(minThreads).maxWorkerThreads(maxThreads);

    thriftServer = new TThreadPoolServer(args);

    LOGGER.info("Serving on {}", address);

    startSentryWebServer();

    // thriftServer.serve() does not return until thriftServer is stopped. Need to log before

    // calling thriftServer.serve()

    LOGGER.info("Sentry service is ready to serve client requests");

    // Allow clients/users watching the console to know when sentry is ready

    System.out.println("Sentry service is ready to serve client requests");

    SentryStateBank.enableState(SentryServiceState.COMPONENT, SentryServiceState.SERVICE_RUNNING);

    thriftServer.serve();

  }


    这段代码其他的不太重要,主要的看看,接受到RPC请求后怎么处理。既然采用了Thrift框架,必然对应的有根据IDL生成处理函数。processor就是处理函数,进一步去看看processFactories,从配置文件中可以看到采用的这个processorFactory:org.apache.sentry.api.service.thrift.SentryPolicyStoreProcessorFactory,进去看看里面干了啥?



public SentryPolicyStoreProcessorFactory(Configuration conf) {

    super(conf);

  }

  public boolean register(TMultiplexedProcessor multiplexedProcessor,

                          SentryStoreInterface sentryStore) throws Exception {

    SentryPolicyStoreProcessor sentryServiceHandler =

        new SentryPolicyStoreProcessor(SentryPolicyServiceConstants.SENTRY_POLICY_SERVICE_NAME,

            conf, sentryStore);

    TProcessor processor =

      new SentryProcessorWrapper<SentryPolicyService.Iface>(sentryServiceHandler);

    multiplexedProcessor.registerProcessor(

      SentryPolicyServiceConstants.SENTRY_POLICY_SERVICE_NAME, processor);

    return true;

  }



    这个类在注册时,指定具体的处理类:SentryPolicyStoreProcessor,这个类里面都是thrift生成的每个接口的处理函数,如:drop_sentry_role,这个接口时删除角色的RPC接口。之后的逻辑,大家可以看具体的函数了,再次我不进一步分享了


5. 总结

    今天主要时跟大家简单分享了下sentry的背景和原理,以及对读源码进行一件简单的引路。本人能力有限,提出了粗鄙的见解,有不足之处,请大家指出,分享交流!

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。