Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
Kerberos认证时间限制问题
Kerberos认证限制了TGT的最大有效时间(max_life和ticket_lifitime),单次认证后超过该期限,TGT会失效,需要重新认证。
Kerberos提供了续期机制,在TGT过期的时候可以通过类似kinit -R方式续期,续期之后会再次获得长度为ticket_lifitime的有效时间。但是续期并不是无限的,最大可续期时间受到max_renewable_life和renew_lifetime的限制。
Kerberos的上述机制确保了认证之后不会长期有效,一定程度上提高了安全性。但是针对Flink流计算这种需要长期运行的任务,Kerbero的认证时间限制阻碍了其稳定运行。
Flink为了解决这个问题,在1.17及其之后的版本增加了定期自动认证功能。本篇为大家带来该功能相关代码的分析。
自动续期/认证相关代码
Flink安全认证的代码参见Flink 源码之安全认证。
本篇作为其补充,分析1.17版本之后Kerberos认证逻辑以及自动续期模块。
我们从分析flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java的install方法开始分析。该方法使用Hadoop提供的UserGroupInformation进行Kerberos认证。
@Override
public void install() throws SecurityInstallException {
// UserGroupInformation以下简称UGI
UserGroupInformation.setConfiguration(hadoopConfiguration);
UserGroupInformation loginUser;
try {
// KerberosLoginProvider包装了UGI的login操作,以及Flink的Security相关配置信息
KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(securityConfig);
// 如果启用了安全配置并且principal等配置合法
if (kerberosLoginProvider.isLoginPossible(true)) {
// 调用UGI的login...方法,执行认证
kerberosLoginProvider.doLogin(true);
// 获取loginUser
loginUser = UserGroupInformation.getLoginUser();
// 对于代理用户,Flink只支持delegation token在Flink外部维护
if (HadoopUserUtils.isProxyUser((loginUser))
&& securityConfig
.getFlinkConfig()
.get(SecurityOptions.DELEGATION_TOKENS_ENABLED)) {
throw new UnsupportedOperationException(
"Hadoop Proxy user is supported only when"
+ " delegation tokens fetch is managed outside of Flink!"
+ " Please try again with "
+ SecurityOptions.DELEGATION_TOKENS_ENABLED.key()
+ " config set to false!");
}
// 如果是通过keytab认证的话
if (loginUser.isFromKeytab()) {
// user凭据中加入token file对应的凭据
String fileLocation =
System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
if (fileLocation != null) {
Credentials credentials =
Credentials.readTokenStorageFile(
new File(fileLocation), hadoopConfiguration);
loginUser.addCredentials(credentials);
}
// 这一步是关键,增加了定期TGT续约方法调用
tgtRenewalExecutorService =
Executors.newSingleThreadScheduledExecutor(
new ExecutorThreadFactory("TGTRenewalExecutorService"));
// 启动定期续约服务
startTGTRenewal(tgtRenewalExecutorService, loginUser);
}
} else {
loginUser = UserGroupInformation.getLoginUser();
}
LOG.info("Hadoop user set to {}", loginUser);
boolean isKerberosSecurityEnabled =
HadoopUserUtils.hasUserKerberosAuthMethod(loginUser);
LOG.info(
"Kerberos security is {}.", isKerberosSecurityEnabled ? "enabled" : "disabled");
if (isKerberosSecurityEnabled) {
LOG.info(
"Kerberos credentials are {}.",
loginUser.hasKerberosCredentials() ? "valid" : "invalid");
}
} catch (Throwable ex) {
throw new SecurityInstallException("Unable to set the Hadoop login user", ex);
}
}
在上面代码中,自动续期的关键是startTGTRenewal方法,它启动一个周期执行的任务。该任务周期执行checkTGTAndReloginFromKeytab方法。周期触发时间由配置项security.kerberos.relogin.period指定,默认为1分钟。
@VisibleForTesting
void startTGTRenewal(
ScheduledExecutorService tgtRenewalExecutorService, UserGroupInformation loginUser) {
LOG.info("Starting TGT renewal task");
long tgtRenewalPeriod = securityConfig.getTgtRenewalPeriod().toMillis();
tgtRenewalExecutorService.scheduleAtFixedRate(
() -> {
// In Hadoop 2.x, renewal of the keytab-based login seems to be automatic, but
// in Hadoop
// 3.x, it is configurable (see
// hadoop.kerberos.keytab.login.autorenewal.enabled, added
// in HADOOP-9567). This task will make sure that the user stays logged in
// regardless of
// that configuration's value. Note that checkTGTAndReloginFromKeytab() is a
// no-op if
// the TGT does not need to be renewed yet.
try {
LOG.debug("Renewing TGT");
loginUser.checkTGTAndReloginFromKeytab();
LOG.debug("TGT renewed successfully");
} catch (Exception e) {
LOG.warn("Error while renewing TGT", e);
}
},
tgtRenewalPeriod,
tgtRenewalPeriod,
TimeUnit.MILLISECONDS);
LOG.info("TGT renewal task started and reoccur in {} ms", tgtRenewalPeriod);
}
代码中有一大段注释,提及了Hadoop2和3支持keytab的自动认证续约。Flink 1.17及其以后的版本将周期认证逻辑搬到Flink内完成。这样无论Hadoop如何配置,Flink均可以自动续约,避免了长期运行的作业因lifetime到期认证失效而停止的问题。
另外,Hadoop的UserGroupInformation的自动续约实际执行的是kinit -R,该方法可最大续约的时间仍然受Kerberos的max_renewable_life和renew_lifetime的限制。checkTGTAndReloginFromKeytab方法事实上是重新认证而不是续约,不存在这个问题。
到此为止Flink自动认证相关逻辑已分析完毕。接下来读者可能会有如下问题:
- Flink自带的周期性自动认证功能,认证频率远比TGT可能过期的频率高,是否会影响性能?
- 为什么不依赖Hadoop的自动认证功能,而需要Flink自己去周期执行认证?
这两个问题的答案在附录代码分析中。
附录
checkTGTAndReloginFromKeytab方法
该方法首先检查是否有必要去重新认证。所以说前面Flink周期调用该方法,即便是调用的周期远远超过认证过期的频率,也不会影响性能。具体条件在代码分析的注释中。
public synchronized void checkTGTAndReloginFromKeytab() throws IOException {
// 检查是否启用安全认证,是否通过kerberos的keytab认证
if (isSecurityEnabled() && this.user.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS && this.isKeytab) {
KerberosTicket tgt = this.getTGT();
// 检查是否需要刷新
// 即当前时刻是否大于TGT的(end - start) * 0.8 + start。换句话说,在可用时间段的后20%部分才需要refresh
if (tgt == null || shouldRenewImmediatelyForTests || Time.now() >= this.getRefreshTime(tgt)) {
this.reloginFromKeytab();
}
}
}
其中getRefreshTime逻辑已在前面代码注释中给出,感兴趣的读者可以看它的源代码,如下所示:
private long getRefreshTime(KerberosTicket tgt) {
long start = tgt.getStartTime().getTime();
long end = tgt.getEndTime().getTime();
return start + (long) ((end - start) * TICKET_RENEW_WINDOW);
}
这个方法是确保再次认证不会过于频繁的关键。
reloginFromKeytab方法,再次检查满足条件时,登出当前用户并再次登录。
@Public
@Evolving
public void reloginFromKeytab() throws IOException {
this.reloginFromKeytab(false);
}
private synchronized void reloginFromKeytab(boolean ignoreTimeElapsed) throws IOException {
// 检查使用使用kerberos的keytab来认证
if (isSecurityEnabled() && this.user.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS && this.isKeytab) {
long now = Time.now();
// 检查和上次login的时间差是否过小,由配置项hadoop.kerberos.min.seconds.before.relogin控制
if (shouldRenewImmediatelyForTests || ignoreTimeElapsed || this.hasSufficientTimeElapsed(now)) {
KerberosTicket tgt = this.getTGT();
// 检查是否需要刷新
// 即当前时刻是否大于TGT的(end - start) * 0.8 + start。换句话说,在可用时间段的后20%部分才需要refresh
if (tgt == null || shouldRenewImmediatelyForTests || now >= this.getRefreshTime(tgt)) {
LoginContext login = this.getLogin();
if (login != null && keytabFile != null) {
long start = 0L;
this.user.setLastLogin(now);
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Initiating logout for " + this.getUserName());
}
// 登出并重新登录
synchronized(UserGroupInformation.class) {
login.logout();
login = newLoginContext("hadoop-keytab-kerberos", this.getSubject(), new HadoopConfiguration());
if (LOG.isDebugEnabled()) {
LOG.debug("Initiating re-login for " + keytabPrincipal);
}
start = Time.now();
login.login();
this.fixKerberosTicketOrder();
metrics.loginSuccess.add(Time.now() - start);
this.setLogin(login);
}
} catch (LoginException le) {
if (start > 0L) {
metrics.loginFailure.add(Time.now() - start);
}
KerberosAuthException kae = new KerberosAuthException("Login failure", le);
kae.setPrincipal(keytabPrincipal);
kae.setKeytabFile(keytabFile);
throw kae;
}
} else {
throw new KerberosAuthException("loginUserFromKeyTab must be done first");
}
}
}
}
}
spawnAutoRenewalThreadForUserCreds
Hadoop自身提供的自动续约在spawnAutoRenewalThreadForUserCreds方法中。如下所示:
@InterfaceAudience.Private
@InterfaceStability.Unstable
@VisibleForTesting
void spawnAutoRenewalThreadForUserCreds(boolean force) {
if (!force && (!shouldRelogin() || isFromKeytab())) {
return;
}
//spawn thread only if we have kerb credentials
KerberosTicket tgt = getTGT();
if (tgt == null) {
return;
}
String cmd = conf.get("hadoop.kerberos.kinit.command", "kinit");
long nextRefresh = getRefreshTime(tgt);
executeAutoRenewalTask(getUserName(),
new TicketCacheRenewalRunnable(tgt, cmd, nextRefresh));
}
该方法最后调用了executeAutoRenewalTask,创建一个定期执行的TicketCacheRenewalRunnable任务。
executeAutoRenewalTask启动一个单线程线程池,用来执行TicketCacheRenewalRunnable逻辑。
private void executeAutoRenewalTask(final String userName,
AutoRenewalForUserCredsRunnable task) {
kerberosLoginRenewalExecutor = Optional.of(
Executors.newSingleThreadExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("TGT Renewer for " + userName);
return t;
}
}
));
kerberosLoginRenewalExecutor.get().submit(task);
}
TicketCacheRenewalRunnable继承了AutoRenewalForUserCredsRunnable。AutoRenewalForUserCredsRunnable会周期调用relogin方法。
TicketCacheRenewalRunnable周期执行kinit -R命令续约,然后执行reloginFromTicketCache,从ticket cache重新登录。
@InterfaceAudience.Private
@InterfaceStability.Unstable
@VisibleForTesting
final class TicketCacheRenewalRunnable
extends AutoRenewalForUserCredsRunnable {
private String kinitCmd;
TicketCacheRenewalRunnable(KerberosTicket tgt, String kinitCmd,
long nextRefresh) {
super(tgt, nextRefresh);
this.kinitCmd = kinitCmd;
}
@Override
public void relogin() throws IOException {
String output = Shell.execCommand(kinitCmd, "-R");
if (LOG.isDebugEnabled()) {
LOG.debug("Renewed ticket. kinit output: {}", output);
}
reloginFromTicketCache();
}
}
到这里可以Hadoop自己的自动认证实际上是kinit -R自动续约。根据前言中所讲,续约有最长期限限制,不能无限续约。参见:解决kinit -R因ticket not renewable无法刷新Kerberos凭证-开发者社区-阿里云。