1. 背景
Apache Sentry 是Cloudera公司发布的一个Hadoop开源组件,提供了细粒度级、基于角色的授权以及多租户的管理模式,主要针对存储在Hadoop集群上的数据和元数据。它可以和Hive/Hcatalog、Apache Solr 和Cloudera Impala等集成,未来可以扩展到其他Hadoop生态系统组件,如HDFS和HBase。
Sentry旨在成为可插拔授权引擎的Hadoop组件。允许定义授权规则以验证用户或应用程序对Hadoop资源的访问请求。Sentry是高度模块化的,可以支持Hadoop中各种数据模型的授权。
2. 本文的目标
本文目标人群为初次接触apache sentry的开发人员,帮助其找到代码的入口函数,快速摸清代码的架构,梳理系统的结构,追踪代码的调用路径,为之后的深入阅读打下基础。建议阅读之前,在网上搜索sentry相关的资料,了解其架构,原理,以便更好地了解源代码。写这篇文章时,本人也是刚接触sentry,有些心得,与大家分享。更多深入的理解,请关注下一次的分享
3. sentry简单介绍
权限模型,在很多系统我们都见过,先是资源,然后是权限,权限是对资源的访问规则,然后有角色,角色是一组权限,最后是用户,角色赋予给用户,有时候也会有组的概念,相同属性的用户划成一组,角色赋予给组。sentry的权限模型也是同样的原理。它包括一下元素:
a. Resource:权限的对象,包括server, 库,表,行,uri等
b. Privilege: 权限,可以连接成一组访问规则。比如用户A可以对表T进行读访问,但不能删除。
c. Role:角色是一组权限的集合
d. Group:相当于用户这个概念,sentry不存在用户这个概念。都是以Group来表达。你可以理解成sentry的组就是用户或者账号的概念。角色赋予给Group。
4. 代码初读
4.1 入口函数
阅读源代码,最好的方式是找到入口函数,从入口函数一步步往下阅读,自然能够梳理出代码的整体架构。
Sentry入口函数位于:sentry-tools/src/main/java/org/apache/sentry/SentryMain.java
为什么我知道在这儿,根据启动时采用命令行的方式,带参数。一般都会有main函数,还是经验问题哈。
public static void main(String[] args)
throws Exception {
CommandLineParser parser = new GnuParser();
Options options = new Options();
options.addOption(HELP_SHORT, HELP_LONG, false, "Print this help text");
options.addOption(VERSION_SHORT, VERSION_LONG, false,
"Print Sentry version");
options.addOption(HIVE_CONF, true, "Set hive configuration variables");
options.addOption(null, COMMAND, true, "Command to run. Options: " + COMMANDS);
options.addOption(null, LOG4J_CONF, true, "Location of log4j properties file");
//Ignore unrecognized options: service and config-tool options
CommandLine commandLine = parser.parse(options, args, true);
String log4jconf = commandLine.getOptionValue(LOG4J_CONF);
if (log4jconf != null && log4jconf.length() > 0) {
Properties log4jProperties = new Properties();
// Firstly load log properties from properties file
try (InputStream istream = Files.newInputStream(Paths.get(log4jconf))) {
log4jProperties.load(istream);
}
// Set the log level of DataNucleus.Query to INFO only if it is not set in the
// properties file
if (!log4jProperties.containsKey(LOG4J_DATANUCLEUS)) {
log4jProperties.setProperty(LOG4J_DATANUCLEUS, "INFO");
// Enable debug log for DataNucleus.Query only when log.threshold is TRACE
String logThreshold = log4jProperties.getProperty("log.threshold");
if (logThreshold != null && logThreshold.equalsIgnoreCase("TRACE")) {
log4jProperties.setProperty(LOG4J_DATANUCLEUS, "DEBUG");
}
}
PropertyConfigurator.configure(log4jProperties);
Logger sentryLogger = LoggerFactory.getLogger(SentryMain.class);
sentryLogger.info("Configuring log4j to use [" + log4jconf + "]");
}
//Print sentry help only if commandName was not given,
// otherwise we assume the help is for the sub command
String commandName = commandLine.getOptionValue(COMMAND);
if (commandName == null && (commandLine.hasOption(HELP_SHORT) ||
commandLine.hasOption(HELP_LONG))) {
printHelp(options, "Command name is missing.");
} else if (commandLine.hasOption(VERSION_SHORT) ||
commandLine.hasOption(VERSION_LONG)) {
printVersion();
}
Command command = null;
switch (commandName){
case "service":
command = new SentryService.CommandImpl();
break;
case "config-tool":
command = new SentryConfigTool.CommandImpl();
break;
case "schema-tool":
command = new SentrySchemaTool.CommandImpl();
break;
default:
printHelp(options, "Unknown command " + commandName + "\n");
break;
}
((Command)command).run(commandLine.getArgs());
}
这段代码主要是根据传入的参数,解析参数,读取配置文件,初始化日志。其他的大致看一眼,最主要的是这个函数:command = new SentryService.CommandImpl();
4.2 启动过程:
sentry启动过程主要看这个类:sentry-service/sentry-service-server/src/main/java/org/apache/sentry/service/thrift/SentryService.java。
public SentryService(Configuration conf) throws Exception {
this.conf = conf;
int port = conf
.getInt(ServerConfig.RPC_PORT, ServerConfig.RPC_PORT_DEFAULT);
if (port == 0) {
port = findFreePort();
conf.setInt(ServerConfig.RPC_PORT, port);
}
this.address = NetUtils.createSocketAddr(
conf.get(ServerConfig.RPC_ADDRESS, ServerConfig.RPC_ADDRESS_DEFAULT),
port);
LOGGER.info("Configured on address {}", address);
kerberos = ServerConfig.SECURITY_MODE_KERBEROS.equalsIgnoreCase(
conf.get(ServerConfig.SECURITY_MODE, ServerConfig.SECURITY_MODE_KERBEROS).trim());
maxThreads = conf.getInt(ServerConfig.RPC_MAX_THREADS,
ServerConfig.RPC_MAX_THREADS_DEFAULT);
minThreads = conf.getInt(ServerConfig.RPC_MIN_THREADS,
ServerConfig.RPC_MIN_THREADS_DEFAULT);
maxMessageSize = conf.getLong(ServerConfig.SENTRY_POLICY_SERVER_THRIFT_MAX_MESSAGE_SIZE,
ServerConfig.SENTRY_POLICY_SERVER_THRIFT_MAX_MESSAGE_SIZE_DEFAULT);
if (kerberos) {
// Use Hadoop libraries to translate the _HOST placeholder with actual hostname
try {
String rawPrincipal = Preconditions.checkNotNull(conf.get(ServerConfig.PRINCIPAL), ServerConfig.PRINCIPAL + " is required");
principal = SecurityUtil.getServerPrincipal(rawPrincipal, address.getAddress());
} catch(IOException io) {
throw new RuntimeException("Can't translate kerberos principal'", io);
}
LOGGER.info("Using kerberos principal: {}", principal);
principalParts = SaslRpcServer.splitKerberosName(principal);
Preconditions.checkArgument(principalParts.length == 3,
"Kerberos principal should have 3 parts: " + principal);
keytab = Preconditions.checkNotNull(conf.get(ServerConfig.KEY_TAB),
ServerConfig.KEY_TAB + " is required");
File keytabFile = new File(keytab);
Preconditions.checkState(keytabFile.isFile() && keytabFile.canRead(),
"Keytab %s does not exist or is not readable.", keytab);
} else {
principal = null;
principalParts = null;
keytab = null;
}
ThreadFactory sentryServiceThreadFactory = new ThreadFactoryBuilder()
.setNameFormat(SENTRY_SERVICE_THREAD_NAME)
.build();
serviceExecutor = Executors.newSingleThreadExecutor(sentryServiceThreadFactory);
this.sentryStore = getSentryStore(conf);
sentryStore.setPersistUpdateDeltas(SentryServiceUtil.isHDFSSyncEnabled(conf));
this.leaderMonitor = LeaderStatusMonitor.getLeaderStatusMonitor(conf);
status = Status.NOT_STARTED;
// Enable signal handler for HA leader/follower status if configured
String sigName = conf.get(ServerConfig.SERVER_HA_STANDBY_SIG);
if ((sigName != null) && !sigName.isEmpty()) {
LOGGER.info("Registering signal handler {} for HA", sigName);
try {
registerSigListener(sigName, this);
} catch (Exception e) {
LOGGER.error("Failed to register signal", e);
}
}
}
首先看下这个类的构造函数,主要是根据传入的配置文件,确定thrift的端口,最大线程数,最小线程数,最大消息size。还有监控,kerberos验证,可用行HA的配置。最主要的是看: this.sentryStore = getSentryStore(conf);这个函数。由于是采用thrift这种rpc框架,所以要注册processor,以及根据thrift IDL生成实际的处理函数。要想弄懂Sentry,建议先去弄懂Thrift这个RPC框架。
private void runServer() throws Exception {
startSentryStoreCleaner(conf);
startHMSFollower(conf);
Iterable<String> processorFactories = ConfUtilties.CLASS_SPLITTER
.split(conf.get(ServerConfig.PROCESSOR_FACTORIES,
ServerConfig.PROCESSOR_FACTORIES_DEFAULT).trim());
TMultiplexedProcessor processor = new TMultiplexedProcessor();
boolean registeredProcessor = false;
for (String processorFactory : processorFactories) {
Class<?> clazz = conf.getClassByName(processorFactory);
if (!ProcessorFactory.class.isAssignableFrom(clazz)) {
throw new IllegalArgumentException("Processor Factory "
+ processorFactory + " is not a "
+ ProcessorFactory.class.getName());
}
try {
Constructor<?> constructor = clazz
.getConstructor(Configuration.class);
LOGGER.info("ProcessorFactory being used: " + clazz.getCanonicalName());
ProcessorFactory factory = (ProcessorFactory) constructor
.newInstance(conf);
boolean registerStatus = factory.register(processor, sentryStore);
if (!registerStatus) {
LOGGER.error("Failed to register " + clazz.getCanonicalName());
}
registeredProcessor = registerStatus || registeredProcessor;
} catch (Exception e) {
throw new IllegalStateException("Could not create "
+ processorFactory, e);
}
}
if (!registeredProcessor) {
throw new IllegalStateException(
"Failed to register any processors from " + processorFactories);
}
addSentryServiceGauge();
TServerTransport serverTransport = new TServerSocket(address);
TTransportFactory transportFactory = null;
if (kerberos) {
TSaslServerTransport.Factory saslTransportFactory = new TSaslServerTransport.Factory();
saslTransportFactory.addServerDefinition(AuthMethod.KERBEROS
.getMechanismName(), principalParts[0], principalParts[1],
ServerConfig.SASL_PROPERTIES, new GSSCallback(conf));
transportFactory = saslTransportFactory;
} else {
transportFactory = new TTransportFactory();
}
TThreadPoolServer.Args args = new TThreadPoolServer.Args(
serverTransport).processor(processor)
.transportFactory(transportFactory)
.protocolFactory(new TBinaryProtocol.Factory(true, true, maxMessageSize, maxMessageSize))
.minWorkerThreads(minThreads).maxWorkerThreads(maxThreads);
thriftServer = new TThreadPoolServer(args);
LOGGER.info("Serving on {}", address);
startSentryWebServer();
// thriftServer.serve() does not return until thriftServer is stopped. Need to log before
// calling thriftServer.serve()
LOGGER.info("Sentry service is ready to serve client requests");
// Allow clients/users watching the console to know when sentry is ready
System.out.println("Sentry service is ready to serve client requests");
SentryStateBank.enableState(SentryServiceState.COMPONENT, SentryServiceState.SERVICE_RUNNING);
thriftServer.serve();
}
这段代码其他的不太重要,主要的看看,接受到RPC请求后怎么处理。既然采用了Thrift框架,必然对应的有根据IDL生成处理函数。processor就是处理函数,进一步去看看processFactories,从配置文件中可以看到采用的这个processorFactory:org.apache.sentry.api.service.thrift.SentryPolicyStoreProcessorFactory,进去看看里面干了啥?
public SentryPolicyStoreProcessorFactory(Configuration conf) {
super(conf);
}
public boolean register(TMultiplexedProcessor multiplexedProcessor,
SentryStoreInterface sentryStore) throws Exception {
SentryPolicyStoreProcessor sentryServiceHandler =
new SentryPolicyStoreProcessor(SentryPolicyServiceConstants.SENTRY_POLICY_SERVICE_NAME,
conf, sentryStore);
TProcessor processor =
new SentryProcessorWrapper<SentryPolicyService.Iface>(sentryServiceHandler);
multiplexedProcessor.registerProcessor(
SentryPolicyServiceConstants.SENTRY_POLICY_SERVICE_NAME, processor);
return true;
}
这个类在注册时,指定具体的处理类:SentryPolicyStoreProcessor,这个类里面都是thrift生成的每个接口的处理函数,如:drop_sentry_role,这个接口时删除角色的RPC接口。之后的逻辑,大家可以看具体的函数了,再次我不进一步分享了
5. 总结
今天主要时跟大家简单分享了下sentry的背景和原理,以及对读源码进行一件简单的引路。本人能力有限,提出了粗鄙的见解,有不足之处,请大家指出,分享交流!