Flink 源码之自动认证

Flink源码分析系列文档目录

请点击:Flink 源码分析系列文档目录

Kerberos认证时间限制问题

Kerberos认证限制了TGT的最大有效时间(max_lifeticket_lifitime),单次认证后超过该期限,TGT会失效,需要重新认证。
Kerberos提供了续期机制,在TGT过期的时候可以通过类似kinit -R方式续期,续期之后会再次获得长度为ticket_lifitime的有效时间。但是续期并不是无限的,最大可续期时间受到max_renewable_liferenew_lifetime的限制。

Kerberos的上述机制确保了认证之后不会长期有效,一定程度上提高了安全性。但是针对Flink流计算这种需要长期运行的任务,Kerbero的认证时间限制阻碍了其稳定运行。
Flink为了解决这个问题,在1.17及其之后的版本增加了定期自动认证功能。本篇为大家带来该功能相关代码的分析。

自动续期/认证相关代码

Flink安全认证的代码参见Flink 源码之安全认证

本篇作为其补充,分析1.17版本之后Kerberos认证逻辑以及自动续期模块。

我们从分析flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.javainstall方法开始分析。该方法使用Hadoop提供的UserGroupInformation进行Kerberos认证。

@Override
public void install() throws SecurityInstallException {
    // UserGroupInformation以下简称UGI
    UserGroupInformation.setConfiguration(hadoopConfiguration);

    UserGroupInformation loginUser;

    try {
        // KerberosLoginProvider包装了UGI的login操作,以及Flink的Security相关配置信息
        KerberosLoginProvider kerberosLoginProvider = new KerberosLoginProvider(securityConfig);
        // 如果启用了安全配置并且principal等配置合法
        if (kerberosLoginProvider.isLoginPossible(true)) {
            // 调用UGI的login...方法,执行认证
            kerberosLoginProvider.doLogin(true);
            // 获取loginUser
            loginUser = UserGroupInformation.getLoginUser();

            // 对于代理用户,Flink只支持delegation token在Flink外部维护
            if (HadoopUserUtils.isProxyUser((loginUser))
                    && securityConfig
                            .getFlinkConfig()
                            .get(SecurityOptions.DELEGATION_TOKENS_ENABLED)) {
                throw new UnsupportedOperationException(
                        "Hadoop Proxy user is supported only when"
                                + " delegation tokens fetch is managed outside of Flink!"
                                + " Please try again with "
                                + SecurityOptions.DELEGATION_TOKENS_ENABLED.key()
                                + " config set to false!");
            }

            // 如果是通过keytab认证的话
            if (loginUser.isFromKeytab()) {
                // user凭据中加入token file对应的凭据
                String fileLocation =
                        System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
                if (fileLocation != null) {
                    Credentials credentials =
                            Credentials.readTokenStorageFile(
                                    new File(fileLocation), hadoopConfiguration);
                    loginUser.addCredentials(credentials);
                }
                // 这一步是关键,增加了定期TGT续约方法调用
                tgtRenewalExecutorService =
                        Executors.newSingleThreadScheduledExecutor(
                                new ExecutorThreadFactory("TGTRenewalExecutorService"));
                // 启动定期续约服务
                startTGTRenewal(tgtRenewalExecutorService, loginUser);
            }
        } else {
            loginUser = UserGroupInformation.getLoginUser();
        }

        LOG.info("Hadoop user set to {}", loginUser);
        boolean isKerberosSecurityEnabled =
                HadoopUserUtils.hasUserKerberosAuthMethod(loginUser);
        LOG.info(
                "Kerberos security is {}.", isKerberosSecurityEnabled ? "enabled" : "disabled");
        if (isKerberosSecurityEnabled) {
            LOG.info(
                    "Kerberos credentials are {}.",
                    loginUser.hasKerberosCredentials() ? "valid" : "invalid");
        }
    } catch (Throwable ex) {
        throw new SecurityInstallException("Unable to set the Hadoop login user", ex);
    }
}

在上面代码中,自动续期的关键是startTGTRenewal方法,它启动一个周期执行的任务。该任务周期执行checkTGTAndReloginFromKeytab方法。周期触发时间由配置项security.kerberos.relogin.period指定,默认为1分钟。

@VisibleForTesting
void startTGTRenewal(
        ScheduledExecutorService tgtRenewalExecutorService, UserGroupInformation loginUser) {
    LOG.info("Starting TGT renewal task");

    long tgtRenewalPeriod = securityConfig.getTgtRenewalPeriod().toMillis();
    tgtRenewalExecutorService.scheduleAtFixedRate(
            () -> {
                // In Hadoop 2.x, renewal of the keytab-based login seems to be automatic, but
                // in Hadoop
                // 3.x, it is configurable (see
                // hadoop.kerberos.keytab.login.autorenewal.enabled, added
                // in HADOOP-9567). This task will make sure that the user stays logged in
                // regardless of
                // that configuration's value. Note that checkTGTAndReloginFromKeytab() is a
                // no-op if
                // the TGT does not need to be renewed yet.
                try {
                    LOG.debug("Renewing TGT");
                    loginUser.checkTGTAndReloginFromKeytab();
                    LOG.debug("TGT renewed successfully");
                } catch (Exception e) {
                    LOG.warn("Error while renewing TGT", e);
                }
            },
            tgtRenewalPeriod,
            tgtRenewalPeriod,
            TimeUnit.MILLISECONDS);

    LOG.info("TGT renewal task started and reoccur in {} ms", tgtRenewalPeriod);
}

代码中有一大段注释,提及了Hadoop2和3支持keytab的自动认证续约。Flink 1.17及其以后的版本将周期认证逻辑搬到Flink内完成。这样无论Hadoop如何配置,Flink均可以自动续约,避免了长期运行的作业因lifetime到期认证失效而停止的问题。

另外,Hadoop的UserGroupInformation的自动续约实际执行的是kinit -R,该方法可最大续约的时间仍然受Kerberos的max_renewable_liferenew_lifetime的限制。checkTGTAndReloginFromKeytab方法事实上是重新认证而不是续约,不存在这个问题。

到此为止Flink自动认证相关逻辑已分析完毕。接下来读者可能会有如下问题:

  1. Flink自带的周期性自动认证功能,认证频率远比TGT可能过期的频率高,是否会影响性能?
  2. 为什么不依赖Hadoop的自动认证功能,而需要Flink自己去周期执行认证?

这两个问题的答案在附录代码分析中。

附录

checkTGTAndReloginFromKeytab方法

该方法首先检查是否有必要去重新认证。所以说前面Flink周期调用该方法,即便是调用的周期远远超过认证过期的频率,也不会影响性能。具体条件在代码分析的注释中。

public synchronized void checkTGTAndReloginFromKeytab() throws IOException {
    // 检查是否启用安全认证,是否通过kerberos的keytab认证
    if (isSecurityEnabled() && this.user.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS && this.isKeytab) {
        KerberosTicket tgt = this.getTGT();
        // 检查是否需要刷新
        // 即当前时刻是否大于TGT的(end - start) * 0.8 + start。换句话说,在可用时间段的后20%部分才需要refresh
        if (tgt == null || shouldRenewImmediatelyForTests || Time.now() >= this.getRefreshTime(tgt)) {
            this.reloginFromKeytab();
        }
    }
}

其中getRefreshTime逻辑已在前面代码注释中给出,感兴趣的读者可以看它的源代码,如下所示:

  private long getRefreshTime(KerberosTicket tgt) {
    long start = tgt.getStartTime().getTime();
    long end = tgt.getEndTime().getTime();
    return start + (long) ((end - start) * TICKET_RENEW_WINDOW);
  }

这个方法是确保再次认证不会过于频繁的关键。

reloginFromKeytab方法,再次检查满足条件时,登出当前用户并再次登录。

@Public
@Evolving
public void reloginFromKeytab() throws IOException {
    this.reloginFromKeytab(false);
}

private synchronized void reloginFromKeytab(boolean ignoreTimeElapsed) throws IOException {
    // 检查使用使用kerberos的keytab来认证
    if (isSecurityEnabled() && this.user.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS && this.isKeytab) {
        long now = Time.now();
        // 检查和上次login的时间差是否过小,由配置项hadoop.kerberos.min.seconds.before.relogin控制
        if (shouldRenewImmediatelyForTests || ignoreTimeElapsed || this.hasSufficientTimeElapsed(now)) {
            KerberosTicket tgt = this.getTGT();
            // 检查是否需要刷新
            // 即当前时刻是否大于TGT的(end - start) * 0.8 + start。换句话说,在可用时间段的后20%部分才需要refresh
            if (tgt == null || shouldRenewImmediatelyForTests || now >= this.getRefreshTime(tgt)) {
                LoginContext login = this.getLogin();
                if (login != null && keytabFile != null) {
                    long start = 0L;
                    this.user.setLastLogin(now);

                    try {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Initiating logout for " + this.getUserName());
                        }
                        // 登出并重新登录
                        synchronized(UserGroupInformation.class) {
                            login.logout();
                            login = newLoginContext("hadoop-keytab-kerberos", this.getSubject(), new HadoopConfiguration());
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Initiating re-login for " + keytabPrincipal);
                            }

                            start = Time.now();
                            login.login();
                            this.fixKerberosTicketOrder();
                            metrics.loginSuccess.add(Time.now() - start);
                            this.setLogin(login);
                        }
                    } catch (LoginException le) {
                        if (start > 0L) {
                            metrics.loginFailure.add(Time.now() - start);
                        }

                        KerberosAuthException kae = new KerberosAuthException("Login failure", le);
                        kae.setPrincipal(keytabPrincipal);
                        kae.setKeytabFile(keytabFile);
                        throw kae;
                    }
                } else {
                    throw new KerberosAuthException("loginUserFromKeyTab must be done first");
                }
            }
        }
    }
}

spawnAutoRenewalThreadForUserCreds

Hadoop自身提供的自动续约在spawnAutoRenewalThreadForUserCreds方法中。如下所示:

  @InterfaceAudience.Private
  @InterfaceStability.Unstable
  @VisibleForTesting
  void spawnAutoRenewalThreadForUserCreds(boolean force) {
    if (!force && (!shouldRelogin() || isFromKeytab())) {
      return;
    }

    //spawn thread only if we have kerb credentials
    KerberosTicket tgt = getTGT();
    if (tgt == null) {
      return;
    }
    String cmd = conf.get("hadoop.kerberos.kinit.command", "kinit");
    long nextRefresh = getRefreshTime(tgt);
    executeAutoRenewalTask(getUserName(),
            new TicketCacheRenewalRunnable(tgt, cmd, nextRefresh));
  }

该方法最后调用了executeAutoRenewalTask,创建一个定期执行的TicketCacheRenewalRunnable任务。

executeAutoRenewalTask启动一个单线程线程池,用来执行TicketCacheRenewalRunnable逻辑。

  private void executeAutoRenewalTask(final String userName,
                                      AutoRenewalForUserCredsRunnable task) {
    kerberosLoginRenewalExecutor = Optional.of(
            Executors.newSingleThreadExecutor(
                  new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                      Thread t = new Thread(r);
                      t.setDaemon(true);
                      t.setName("TGT Renewer for " + userName);
                      return t;
                    }
                  }
            ));
    kerberosLoginRenewalExecutor.get().submit(task);
  }

TicketCacheRenewalRunnable继承了AutoRenewalForUserCredsRunnableAutoRenewalForUserCredsRunnable会周期调用relogin方法。

TicketCacheRenewalRunnable周期执行kinit -R命令续约,然后执行reloginFromTicketCache,从ticket cache重新登录。

  @InterfaceAudience.Private
  @InterfaceStability.Unstable
  @VisibleForTesting
  final class TicketCacheRenewalRunnable
      extends AutoRenewalForUserCredsRunnable {
    private String kinitCmd;

    TicketCacheRenewalRunnable(KerberosTicket tgt, String kinitCmd,
        long nextRefresh) {
      super(tgt, nextRefresh);
      this.kinitCmd = kinitCmd;
    }

    @Override
    public void relogin() throws IOException {
      String output = Shell.execCommand(kinitCmd, "-R");
      if (LOG.isDebugEnabled()) {
        LOG.debug("Renewed ticket. kinit output: {}", output);
      }
      reloginFromTicketCache();
    }
  }

到这里可以Hadoop自己的自动认证实际上是kinit -R自动续约。根据前言中所讲,续约有最长期限限制,不能无限续约。参见:解决kinit -R因ticket not renewable无法刷新Kerberos凭证-开发者社区-阿里云

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容