Flink 使用介绍相关文档目录
背景
Flink Kerberos认证的配置位于YAML配置文件中,带来的便利是用户不用反复去配置认证信息。但这样使用有局限性。用户如果需要频繁切换不同的用户提交任务,是否可以像spark-submit
那样通过--principal
和--keytab
参数来临时指定用户呢?答案是可以的。
使用方式
Flink 1.17版本支持使用-D 配置项
的方式在提交任务时指定认证配置。例如:
./flink run-application -t yarn-application -D security.kerberos.login.keytab=/etc/security/keytabs/hdfs.headless.keytab -D security.kerberos.login.principal=hdfs@PAUL.COM ../examples/batch/WordCount.jar
其中-D 配置项
的内容和YAML配置文件中的参数名相同。
需要注意的是,经本人验证Flink 1.13.2 和Flink 1.15.4版本存在问题,无法使用此方式。
经查询社区,和修复此问题相关联的Issue为:
- [FLINK-29435][client] SecurityConfiguration supports dynamic configuration
- [FLINK-31321][Deployment/YARN] Yarn-session mode, securityConfiguration supports dynamic configuration
如果需要在1.13.x和1.15.x版本中修复,可以将这两个commit cherrypick过去重新编译解决。
除此之外还有一个备用方法:动态指定Kerberos Cache。该方法对于Flink 1.13.x和1.15.x可用。
kinit hdfs@PAUL.COM -kt /etc/security/keytabs/hdfs.headless.keytab -c /opt/paul/krb_cache
export KRB5CCNAME=/opt/paul/krb_cache && ./flink run -m yarn-cluster ../examples/batch/WordCount.jar
kinit
命令可以通过-c
参数指定自定义的cache文件路径。在Flink使用的时候将其配置到KRB5CCNAME
环境变量对应起来。不同用户使用不同的cache文件可避免相互影响。
KRB5CCNAME
环境变量仅在当前shell中生效,不会干扰其他的shell提交作业。
源代码分析
security config动态参数解析部分
CliFrontend::mainInternal
方法try
块内容:
final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
CommandLine commandLine =
cli.getCommandLine(
new Options(),
Arrays.copyOfRange(args, min(args.length, 1), args.length),
true);
Configuration securityConfig = new Configuration(cli.configuration);
// 获取命令行中-D key=value形式的动态参数,将其合并入securityConfig(Configuration类型)中
DynamicPropertiesUtil.encodeDynamicProperties(commandLine, securityConfig);
// 生成安全相关配置(Hadoop认证还是JAAS)
SecurityUtils.install(new SecurityConfiguration(securityConfig));
// 执行认证过程
retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
上面生成安全相关配置和执行认证过程参见Flink 源码之安全认证。
动态指定Kerberos Cache
相关源代码位于:KerberosUtils::static
方法。相关代码片段为:
// 获取系统环境变量KRB5CCNAME
String ticketCache = System.getenv("KRB5CCNAME");
if (ticketCache != null) {
if (IBM_JAVA) {
System.setProperty("KRB5CCNAME", ticketCache);
} else {
// 添加到kerberosCacheOptions中
kerberosCacheOptions.put("ticketCache", ticketCache);
}
}