前言
Flink配合Hadoop使用的时候获取配置文件的方式非常之多,官网没有统一的总结。本篇将这些获取配置的方法梳理总结是为了:
- 掌握多种指定Flink Hadoop配置的方式。
- 对于一个较乱的环境,Flink无法正确读取Hadoop配置的时候,提供一个排查问题思路。
获取Flink conf目录
Flink获取conf目录(Flink配置)的顺序:
- 查找
FLINK_CONF_DIR
环境变量。 - 查找
../conf
目录。 - 查找
conf
目录。
代码位于CliFrontend
的getConfigurationDirectoryFromEnv
方法:
public static String getConfigurationDirectoryFromEnv() {
// 从FLINK_CONF_DIR环境变量获取conf路径
String location = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
if (location != null) {
if (new File(location).exists()) {
return location;
} else {
throw new RuntimeException(
"The configuration directory '"
+ location
+ "', specified in the '"
+ ConfigConstants.ENV_FLINK_CONF_DIR
+ "' environment variable, does not exist.");
}
} else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {
// 尝试查找../conf目录是否存在
location = CONFIG_DIRECTORY_FALLBACK_1;
} else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {
// 尝试查找conf目录是否存在
location = CONFIG_DIRECTORY_FALLBACK_2;
} else {
throw new RuntimeException(
"The configuration directory was not specified. "
+ "Please specify the directory containing the configuration file through the '"
+ ConfigConstants.ENV_FLINK_CONF_DIR
+ "' environment variable.");
}
return location;
}
获取log4j配置文件
YarnLogConfigUtil
的discoverLogConfigFile
方法从flink配置文件目录中查找log4j.properties
和logback.xml
。如果两者都存在,优先使用log4j.properties
。
private static Optional<File> discoverLogConfigFile(final String configurationDirectory) {
Optional<File> logConfigFile = Optional.empty();
// 从Flink配置文件目录中查找log4j.properties文件
final File log4jFile =
new File(configurationDirectory + File.separator + CONFIG_FILE_LOG4J_NAME);
if (log4jFile.exists()) {
logConfigFile = Optional.of(log4jFile);
}
// 从Flink配置文件目录中查找logback.xml文件
final File logbackFile =
new File(configurationDirectory + File.separator + CONFIG_FILE_LOGBACK_NAME);
if (logbackFile.exists()) {
if (logConfigFile.isPresent()) {
// 如果两个配置文件都存在,打印告警
LOG.warn(
"The configuration directory ('"
+ configurationDirectory
+ "') already contains a LOG4J config file."
+ "If you want to use logback, then please delete or rename the log configuration file.");
} else {
logConfigFile = Optional.of(logbackFile);
}
}
return logConfigFile;
}
Kerberos相关配置
KerberosUtils
的static
方法中。可以使用KRB5CCNAME
环境变量,指定Flink Kerberos认证使用的ticket cache路径。
String ticketCache = System.getenv("KRB5CCNAME");
if (ticketCache != null) {
if (IBM_JAVA) {
System.setProperty("KRB5CCNAME", ticketCache);
} else {
kerberosCacheOptions.put("ticketCache", ticketCache);
}
}
KerberosLoginProvider
的doLoginAndReturnUGI
使用了KRB5PRINCIPAL
环境变量指定principal。代码如下所示:
public UserGroupInformation doLoginAndReturnUGI() throws IOException {
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
if (principal != null) {
LOG.info(
"Attempting to login to KDC using principal: {} keytab: {}", principal, keytab);
UserGroupInformation ugi =
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);
LOG.info("Successfully logged into KDC");
return ugi;
} else if (!HadoopUserUtils.isProxyUser(currentUser)) {
LOG.info("Attempting to load user's ticket cache");
final String ccache = System.getenv("KRB5CCNAME");
final String user =
Optional.ofNullable(System.getenv("KRB5PRINCIPAL"))
.orElse(currentUser.getUserName());
UserGroupInformation ugi = UserGroupInformation.getUGIFromTicketCache(ccache, user);
LOG.info("Loaded user's ticket cache successfully");
return ugi;
} else {
throwProxyUserNotSupported();
return currentUser;
}
}
Hadoop相关配置
读取Hadoop配置文件的流程位于HadoopUtils
的getHadoopConfiguration
方法。流程如下:
- 从class path中读取hdfs-default.xml和hdfs-site.xml文件。
- 从
$HADOOP_HOME/conf
和$HADOOP_HOME/etc/hadoop
中读取。 - 从Flink配置文件中
fs.hdfs.hdfsdefault
,fs.hdfs.hdfssite
和fs.hdfs.hadoopconf
配置读取。此方法已废弃不建议使用。 - 从
HADOOP_CONF_DIR
读取。 - 从Flink配置文件读取
flink.hadoop.
开头的配置作为Hadoop的配置项。
@SuppressWarnings("deprecation")
public static Configuration getHadoopConfiguration(
org.apache.flink.configuration.Configuration flinkConfiguration) {
// Instantiate an HdfsConfiguration to load the hdfs-site.xml and hdfs-default.xml
// from the classpath
// 从classpath中读取hdfs-default.xml和hdfs-site.xml文件
Configuration result = new HdfsConfiguration();
boolean foundHadoopConfiguration = false;
// We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
// the hdfs configuration.
// The properties of a newly added resource will override the ones in previous resources, so
// a configuration
// file with higher priority should be added later.
// Approach 1: HADOOP_HOME environment variables
String[] possibleHadoopConfPaths = new String[2];
// 从HADOOP_HOME环境变量获取
// $HADOOP_HOME/conf和$HADOOP_HOME/etc/hadoop
final String hadoopHome = System.getenv("HADOOP_HOME");
if (hadoopHome != null) {
LOG.debug("Searching Hadoop configuration files in HADOOP_HOME: {}", hadoopHome);
possibleHadoopConfPaths[0] = hadoopHome + "/conf";
possibleHadoopConfPaths[1] = hadoopHome + "/etc/hadoop"; // hadoop 2.2
}
for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
if (possibleHadoopConfPath != null) {
foundHadoopConfiguration = addHadoopConfIfFound(result, possibleHadoopConfPath);
}
}
// Approach 2: Flink configuration (deprecated)
// 从Flink配置文件中fs.hdfs.hdfsdefault配置项获取
final String hdfsDefaultPath =
flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
if (hdfsDefaultPath != null) {
result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
LOG.debug(
"Using hdfs-default configuration-file path from Flink config: {}",
hdfsDefaultPath);
foundHadoopConfiguration = true;
}
// 从Flink配置文件中fs.hdfs.hdfssite配置项获取
final String hdfsSitePath =
flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
if (hdfsSitePath != null) {
result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
LOG.debug(
"Using hdfs-site configuration-file path from Flink config: {}", hdfsSitePath);
foundHadoopConfiguration = true;
}
// 从Flink配置文件中fs.hdfs.hadoopconf配置项获取
final String hadoopConfigPath =
flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
if (hadoopConfigPath != null) {
LOG.debug("Searching Hadoop configuration files in Flink config: {}", hadoopConfigPath);
foundHadoopConfiguration =
addHadoopConfIfFound(result, hadoopConfigPath) || foundHadoopConfiguration;
}
// Approach 3: HADOOP_CONF_DIR environment variable
// 从HADOOP_CONF_DIR环境变量获取
String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
if (hadoopConfDir != null) {
LOG.debug("Searching Hadoop configuration files in HADOOP_CONF_DIR: {}", hadoopConfDir);
foundHadoopConfiguration =
addHadoopConfIfFound(result, hadoopConfDir) || foundHadoopConfiguration;
}
// Approach 4: Flink configuration
// add all configuration key with prefix 'flink.hadoop.' in flink conf to hadoop conf
// 读取Flink配置文件中flink.hadoop.开头的配置项,去掉该前缀的内容作为key放入configuration
for (String key : flinkConfiguration.keySet()) {
for (String prefix : FLINK_CONFIG_PREFIXES) {
if (key.startsWith(prefix)) {
String newKey = key.substring(prefix.length());
String value = flinkConfiguration.getString(key, null);
result.set(newKey, value);
LOG.debug(
"Adding Flink config entry for {} as {}={} to Hadoop config",
key,
newKey,
value);
foundHadoopConfiguration = true;
}
}
}
// 如果以上途径均未发现Hadoop配置,打印警告
if (!foundHadoopConfiguration) {
LOG.warn(
"Could not find Hadoop configuration via any of the supported methods "
+ "(Flink configuration, environment variables).");
}
return result;
}
Hadoop class path配置
构建Flink Hadoop classpath的相关代码位于config.sh
:
INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"
可使用export HADOOP_CLASSPATH=xxx
方式为Flink指定Hadoop的class path。
通过这行代码还可以得知Flink的class path包含HADOOP_CONF_DIR
和YARN_CONF_DIR
,分别对应Hadoop和Yarn配置文件目录。可以通过HADOOP_CONF_DIR
和YARN_CONF_DIR
分别指定Hadoop和Yarn的配置文件路径。
Yarn相关配置
YarnClusterDescriptor::isReadyForDeployment
方法中检测HADOOP_CONF_DIR
和YARN_CONF_DIR
。如果HADOOP_CONF_DIR
或者YARN_CONF_DIR
都没有配置,打印警告。
// check if required Hadoop environment variables are set. If not, warn user
if (System.getenv("HADOOP_CONF_DIR") == null && System.getenv("YARN_CONF_DIR") == null) {
LOG.warn(
"Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. "
+ "The Flink YARN Client needs one of these to be set to properly load the Hadoop "
+ "configuration for accessing YARN.");
}
一般来说hadoop classpath
命令的返回结果中包含了Hadoop的conf目录,所以没有指定HADOOP_CONF_DIR
和YARN_CONF_DIR
也不影响Flink作业访问HDFS和提交Yarn集群。
Yarn配置文件的读取逻辑为:
- 从class path中读取
yarn-site.xml
和yarn-default.xml
。因为HADOOP_CLASSPATH
,HADOOP_CONF_DIR
和YARN_CONF_DIR
环境变量都可以修改Flink Hadoop的class path,这三个环境变量的配置均会影响此步骤。 - 通过Flink配置文件中
flink.yarn
开头的配置指定Yarn配置项。
下面是源代码分析。
提交Yarn作业之前通过Utils::getYarnAndHadoopConfiguration
方法读取Hadoop和Yarn的配置文件。
public static YarnConfiguration getYarnAndHadoopConfiguration(
org.apache.flink.configuration.Configuration flinkConfig) {
final YarnConfiguration yarnConfig = getYarnConfiguration(flinkConfig);
// 获取Hadoop配置,前面已分析过
yarnConfig.addResource(HadoopUtils.getHadoopConfiguration(flinkConfig));
return yarnConfig;
}
Utils
的getYarnConfiguration
方法从Flink配置文件中读取flink.yarn
开头的配置,去掉前缀之后作为key放入configuration。
public static YarnConfiguration getYarnConfiguration(
org.apache.flink.configuration.Configuration flinkConfig) {
// 从class path中获取yarn-default.xml和yarn-site.xml文件
final YarnConfiguration yarnConfig = new YarnConfiguration();
for (String key : flinkConfig.keySet()) {
for (String prefix : FLINK_CONFIG_PREFIXES) {
if (key.startsWith(prefix)) {
String newKey = key.substring("flink.".length());
String value = flinkConfig.getString(key, null);
yarnConfig.set(newKey, value);
LOG.debug(
"Adding Flink config entry for {} as {}={} to Yarn config",
key,
newKey,
value);
}
}
}
return yarnConfig;
}
附录:日志不打印问题排查
检查的顺序为:
检查$FLINK_HOME/conf/log4j*.properties
配置文件是否正确。
检查$FLINK_HOME/lib/
中的日志相关jar包是否存在或是否冲突。
检查打包进作业内的日志相关jar包,需要都排除使用Flink框架自带的日志依赖库。