Apache Sentry初读

1. 背景

    Apache Sentry 是Cloudera公司发布的一个Hadoop开源组件,提供了细粒度级、基于角色的授权以及多租户的管理模式,主要针对存储在Hadoop集群上的数据和元数据。它可以和Hive/Hcatalog、Apache Solr 和Cloudera Impala等集成,未来可以扩展到其他Hadoop生态系统组件,如HDFS和HBase。

    Sentry旨在成为可插拔授权引擎的Hadoop组件。允许定义授权规则以验证用户或应用程序对Hadoop资源的访问请求。Sentry是高度模块化的,可以支持Hadoop中各种数据模型的授权。

2. 本文的目标

    本文目标人群为初次接触apache sentry的开发人员,帮助其找到代码的入口函数,快速摸清代码的架构,梳理系统的结构,追踪代码的调用路径,为之后的深入阅读打下基础。建议阅读之前,在网上搜索sentry相关的资料,了解其架构,原理,以便更好地了解源代码。写这篇文章时,本人也是刚接触sentry,有些心得,与大家分享。更多深入的理解,请关注下一次的分享

3. sentry简单介绍

    权限模型,在很多系统我们都见过,先是资源,然后是权限,权限是对资源的访问规则,然后有角色,角色是一组权限,最后是用户,角色赋予给用户,有时候也会有组的概念,相同属性的用户划成一组,角色赋予给组。sentry的权限模型也是同样的原理。它包括一下元素:

    a. Resource:权限的对象,包括server, 库,表,行,uri等

    b. Privilege:  权限,可以连接成一组访问规则。比如用户A可以对表T进行读访问,但不能删除。

    c. Role:角色是一组权限的集合

    d. Group:相当于用户这个概念,sentry不存在用户这个概念。都是以Group来表达。你可以理解成sentry的组就是用户或者账号的概念。角色赋予给Group。

4. 代码初读

4.1 入口函数

    阅读源代码,最好的方式是找到入口函数,从入口函数一步步往下阅读,自然能够梳理出代码的整体架构。

    Sentry入口函数位于:sentry-tools/src/main/java/org/apache/sentry/SentryMain.java

为什么我知道在这儿,根据启动时采用命令行的方式,带参数。一般都会有main函数,还是经验问题哈。



public static void main(String[] args)

            throws Exception {

        CommandLineParser parser = new GnuParser();

        Options options = new Options();

        options.addOption(HELP_SHORT, HELP_LONG, false, "Print this help text");

        options.addOption(VERSION_SHORT, VERSION_LONG, false,

                "Print Sentry version");

        options.addOption(HIVE_CONF, true, "Set hive configuration variables");

        options.addOption(null, COMMAND, true, "Command to run. Options: " + COMMANDS);

        options.addOption(null, LOG4J_CONF, true, "Location of log4j properties file");

        //Ignore unrecognized options: service and config-tool options

        CommandLine commandLine = parser.parse(options, args, true);

        String log4jconf = commandLine.getOptionValue(LOG4J_CONF);

        if (log4jconf != null && log4jconf.length() > 0) {

            Properties log4jProperties = new Properties();

            // Firstly load log properties from properties file

            try (InputStream istream = Files.newInputStream(Paths.get(log4jconf))) {

                log4jProperties.load(istream);

            }

            // Set the log level of DataNucleus.Query to INFO only if it is not set in the

            // properties file

            if (!log4jProperties.containsKey(LOG4J_DATANUCLEUS)) {

                log4jProperties.setProperty(LOG4J_DATANUCLEUS, "INFO");

                // Enable debug log for DataNucleus.Query only when log.threshold is TRACE

                String logThreshold = log4jProperties.getProperty("log.threshold");

                if (logThreshold != null && logThreshold.equalsIgnoreCase("TRACE")) {

                    log4jProperties.setProperty(LOG4J_DATANUCLEUS, "DEBUG");

                }

            }

            PropertyConfigurator.configure(log4jProperties);

            Logger sentryLogger = LoggerFactory.getLogger(SentryMain.class);

            sentryLogger.info("Configuring log4j to use [" + log4jconf + "]");

        }

        //Print sentry help only if commandName was not given,

        // otherwise we assume the help is for the sub command

        String commandName = commandLine.getOptionValue(COMMAND);

        if (commandName == null && (commandLine.hasOption(HELP_SHORT) ||

                commandLine.hasOption(HELP_LONG))) {

            printHelp(options, "Command name is missing.");

        } else if (commandLine.hasOption(VERSION_SHORT) ||

                commandLine.hasOption(VERSION_LONG)) {

            printVersion();

        }

        Command command = null;

        switch (commandName){

            case "service":

                command = new SentryService.CommandImpl();

                break;

            case "config-tool":

                command = new SentryConfigTool.CommandImpl();

                break;

            case "schema-tool":

                command = new SentrySchemaTool.CommandImpl();

                break;

            default:

                printHelp(options, "Unknown command " + commandName + "\n");

                break;

        }

        ((Command)command).run(commandLine.getArgs());

    }



这段代码主要是根据传入的参数,解析参数,读取配置文件,初始化日志。其他的大致看一眼,最主要的是这个函数:command = new SentryService.CommandImpl();

4.2 启动过程:

    sentry启动过程主要看这个类:sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/SentryService.java。



    
public SentryService(Configuration conf) throws Exception {

    this.conf = conf;

    int port = conf

        .getInt(ServerConfig.RPC_PORT, ServerConfig.RPC_PORT_DEFAULT);

    if (port == 0) {

      port = findFreePort();

      conf.setInt(ServerConfig.RPC_PORT, port);

    }

    this.address = NetUtils.createSocketAddr(

        conf.get(ServerConfig.RPC_ADDRESS, ServerConfig.RPC_ADDRESS_DEFAULT),

        port);

    LOGGER.info("Configured on address {}", address);

    kerberos = ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase(

        conf.get(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_KERBEROS).trim());

    maxThreads = conf.getInt(ServerConfig.RPC_MAX_THREADS,

        ServerConfig.RPC_MAX_THREADS_DEFAULT);

    minThreads = conf.getInt(ServerConfig.RPC_MIN_THREADS,

        ServerConfig.RPC_MIN_THREADS_DEFAULT);

    maxMessageSize = conf.getLong(ServerConfig.SENTRY_POLICY_SERVER_THRIFT_MAX_MESSAGE_SIZE,

        ServerConfig.SENTRY_POLICY_SERVER_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);

    if (kerberos) {

      // Use Hadoop libraries to translate the _HOST placeholder with actual hostname

      try {

        String rawPrincipal = Preconditions.checkNotNull(conf.get(ServerConfig.PRINCIPAL), ServerConfig.PRINCIPAL + " is required");

        principal = SecurityUtil.getServerPrincipal(rawPrincipal, address.getAddress());

      } catch(IOException io) {

        throw new RuntimeException("Can't translate kerberos principal'", io);

      }

      LOGGER.info("Using kerberos principal: {}", principal);

      principalParts = SaslRpcServer.splitKerberosName(principal);

      Preconditions.checkArgument(principalParts.length == 3,

          "Kerberos principal should have 3 parts: " + principal);

      keytab = Preconditions.checkNotNull(conf.get(ServerConfig.KEY_TAB),

          ServerConfig.KEY_TAB + " is required");

      File keytabFile = new File(keytab);

      Preconditions.checkState(keytabFile.isFile() && keytabFile.canRead(),

          "Keytab %s does not exist or is not readable.", keytab);

    } else {

      principal = null;

      principalParts = null;

      keytab = null;

    }

    ThreadFactory sentryServiceThreadFactory = new ThreadFactoryBuilder()

        .setNameFormat(SENTRY_SERVICE_THREAD_NAME)

        .build();

    serviceExecutor = Executors.newSingleThreadExecutor(sentryServiceThreadFactory);

    this.sentryStore = getSentryStore(conf);

    sentryStore.setPersistUpdateDeltas(SentryServiceUtil.isHDFSSyncEnabled(conf));

    this.leaderMonitor = LeaderStatusMonitor.getLeaderStatusMonitor(conf);

    status = Status.NOT_STARTED;

    // Enable signal handler for HA leader/follower status if configured

    String sigName = conf.get(ServerConfig.SERVER_HA_STANDBY_SIG);

    if ((sigName != null) && !sigName.isEmpty()) {

      LOGGER.info("Registering signal handler {} for HA", sigName);

      try {

        registerSigListener(sigName, this);

      } catch (Exception e) {

        LOGGER.error("Failed to register signal", e);

      }

    }

  }



    首先看下这个类的构造函数,主要是根据传入的配置文件,确定thrift的端口,最大线程数,最小线程数,最大消息size。还有监控,kerberos验证,可用行HA的配置。最主要的是看: this.sentryStore = getSentryStore(conf);这个函数。由于是采用thrift这种rpc框架,所以要注册processor,以及根据thrift IDL生成实际的处理函数。要想弄懂Sentry,建议先去弄懂Thrift这个RPC框架。



private void runServer() throws Exception {

    startSentryStoreCleaner(conf);

    startHMSFollower(conf);

    Iterable<String> processorFactories = ConfUtilties.CLASS_SPLITTER

        .split(conf.get(ServerConfig.PROCESSOR_FACTORIES,

            ServerConfig.PROCESSOR_FACTORIES_DEFAULT).trim());

    TMultiplexedProcessor processor = new TMultiplexedProcessor();

    boolean registeredProcessor = false;

    for (String processorFactory : processorFactories) {

      Class<?> clazz = conf.getClassByName(processorFactory);

      if (!ProcessorFactory.class.isAssignableFrom(clazz)) {

        throw new IllegalArgumentException("Processor Factory "

            + processorFactory + " is not a "

            + ProcessorFactory.class.getName());

      }

      try {

        Constructor<?> constructor = clazz

            .getConstructor(Configuration.class);

        LOGGER.info("ProcessorFactory being used: " + clazz.getCanonicalName());

        ProcessorFactory factory = (ProcessorFactory) constructor

            .newInstance(conf);

        boolean registerStatus = factory.register(processor, sentryStore);

        if (!registerStatus) {

          LOGGER.error("Failed to register " + clazz.getCanonicalName());

        }

        registeredProcessor = registerStatus || registeredProcessor;

      } catch (Exception e) {

        throw new IllegalStateException("Could not create "

            + processorFactory, e);

      }

    }

    if (!registeredProcessor) {

      throw new IllegalStateException(

          "Failed to register any processors from " + processorFactories);

    }

    addSentryServiceGauge();

    TServerTransport serverTransport = new TServerSocket(address);

    TTransportFactory transportFactory = null;

    if (kerberos) {

      TSaslServerTransport.Factory saslTransportFactory = new TSaslServerTransport.Factory();

      saslTransportFactory.addServerDefinition(AuthMethod.KERBEROS

          .getMechanismName(), principalParts[0], principalParts[1],

              ServerConfig.SASL_PROPERTIES, new GSSCallback(conf));

      transportFactory = saslTransportFactory;

    } else {

      transportFactory = new TTransportFactory();

    }

    TThreadPoolServer.Args args = new TThreadPoolServer.Args(

        serverTransport).processor(processor)

        .transportFactory(transportFactory)

        .protocolFactory(new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize))

        .minWorkerThreads(minThreads).maxWorkerThreads(maxThreads);

    thriftServer = new TThreadPoolServer(args);

    LOGGER.info("Serving on {}", address);

    startSentryWebServer();

    // thriftServer.serve() does not return until thriftServer is stopped. Need to log before

    // calling thriftServer.serve()

    LOGGER.info("Sentry service is ready to serve client requests");

    // Allow clients/users watching the console to know when sentry is ready

    System.out.println("Sentry service is ready to serve client requests");

    SentryStateBank.enableState(SentryServiceState.COMPONENT, SentryServiceState.SERVICE_RUNNING);

    thriftServer.serve();

  }


    这段代码其他的不太重要,主要的看看,接受到RPC请求后怎么处理。既然采用了Thrift框架,必然对应的有根据IDL生成处理函数。processor就是处理函数,进一步去看看processFactories,从配置文件中可以看到采用的这个processorFactory:org.apache.sentry.api.service.thrift.SentryPolicyStoreProcessorFactory,进去看看里面干了啥?



public SentryPolicyStoreProcessorFactory(Configuration conf) {

    super(conf);

  }

  public boolean register(TMultiplexedProcessor multiplexedProcessor,

                          SentryStoreInterface sentryStore) throws Exception {

    SentryPolicyStoreProcessor sentryServiceHandler =

        new SentryPolicyStoreProcessor(SentryPolicyServiceConstants.SENTRY_POLICY_SERVICE_NAME,

            conf, sentryStore);

    TProcessor processor =

      new SentryProcessorWrapper<SentryPolicyService.Iface>(sentryServiceHandler);

    multiplexedProcessor.registerProcessor(

      SentryPolicyServiceConstants.SENTRY_POLICY_SERVICE_NAME, processor);

    return true;

  }



    这个类在注册时,指定具体的处理类:SentryPolicyStoreProcessor,这个类里面都是thrift生成的每个接口的处理函数,如:drop_sentry_role,这个接口时删除角色的RPC接口。之后的逻辑,大家可以看具体的函数了,再次我不进一步分享了


5. 总结

    今天主要时跟大家简单分享了下sentry的背景和原理,以及对读源码进行一件简单的引路。本人能力有限,提出了粗鄙的见解,有不足之处,请大家指出,分享交流!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351