Flink 获取配置的途径

前言

Flink配合Hadoop使用的时候获取配置文件的方式非常之多,官网没有统一的总结。本篇将这些获取配置的方法梳理总结是为了:

  1. 掌握多种指定Flink Hadoop配置的方式。
  2. 对于一个较乱的环境,Flink无法正确读取Hadoop配置的时候,提供一个排查问题思路。

获取Flink conf目录

Flink获取conf目录(Flink配置)的顺序:

  1. 查找FLINK_CONF_DIR环境变量。
  2. 查找../conf目录。
  3. 查找conf目录。

代码位于CliFrontendgetConfigurationDirectoryFromEnv方法:

public static String getConfigurationDirectoryFromEnv() {
    // 从FLINK_CONF_DIR环境变量获取conf路径
    String location = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);

    if (location != null) {
        if (new File(location).exists()) {
            return location;
        } else {
            throw new RuntimeException(
                    "The configuration directory '"
                            + location
                            + "', specified in the '"
                            + ConfigConstants.ENV_FLINK_CONF_DIR
                            + "' environment variable, does not exist.");
        }
    } else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {
        // 尝试查找../conf目录是否存在
        location = CONFIG_DIRECTORY_FALLBACK_1;
    } else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {
        // 尝试查找conf目录是否存在
        location = CONFIG_DIRECTORY_FALLBACK_2;
    } else {
        throw new RuntimeException(
                "The configuration directory was not specified. "
                        + "Please specify the directory containing the configuration file through the '"
                        + ConfigConstants.ENV_FLINK_CONF_DIR
                        + "' environment variable.");
    }
    return location;
}

获取log4j配置文件

YarnLogConfigUtildiscoverLogConfigFile方法从flink配置文件目录中查找log4j.propertieslogback.xml。如果两者都存在,优先使用log4j.properties

private static Optional<File> discoverLogConfigFile(final String configurationDirectory) {
    Optional<File> logConfigFile = Optional.empty();

    // 从Flink配置文件目录中查找log4j.properties文件
    final File log4jFile =
            new File(configurationDirectory + File.separator + CONFIG_FILE_LOG4J_NAME);
    if (log4jFile.exists()) {
        logConfigFile = Optional.of(log4jFile);
    }

    // 从Flink配置文件目录中查找logback.xml文件
    final File logbackFile =
            new File(configurationDirectory + File.separator + CONFIG_FILE_LOGBACK_NAME);
    if (logbackFile.exists()) {
        if (logConfigFile.isPresent()) {
            // 如果两个配置文件都存在,打印告警
            LOG.warn(
                    "The configuration directory ('"
                            + configurationDirectory
                            + "') already contains a LOG4J config file."
                            + "If you want to use logback, then please delete or rename the log configuration file.");
        } else {
            logConfigFile = Optional.of(logbackFile);
        }
    }
    return logConfigFile;
}

Kerberos相关配置

KerberosUtilsstatic方法中。可以使用KRB5CCNAME环境变量,指定Flink Kerberos认证使用的ticket cache路径。

String ticketCache = System.getenv("KRB5CCNAME");
if (ticketCache != null) {
    if (IBM_JAVA) {
        System.setProperty("KRB5CCNAME", ticketCache);
    } else {
        kerberosCacheOptions.put("ticketCache", ticketCache);
    }
}

KerberosLoginProviderdoLoginAndReturnUGI使用了KRB5PRINCIPAL环境变量指定principal。代码如下所示:

public UserGroupInformation doLoginAndReturnUGI() throws IOException {
    UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();

    if (principal != null) {
        LOG.info(
                "Attempting to login to KDC using principal: {} keytab: {}", principal, keytab);
        UserGroupInformation ugi =
                UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);
        LOG.info("Successfully logged into KDC");
        return ugi;
    } else if (!HadoopUserUtils.isProxyUser(currentUser)) {
        LOG.info("Attempting to load user's ticket cache");
        final String ccache = System.getenv("KRB5CCNAME");
        final String user =
                Optional.ofNullable(System.getenv("KRB5PRINCIPAL"))
                        .orElse(currentUser.getUserName());
        UserGroupInformation ugi = UserGroupInformation.getUGIFromTicketCache(ccache, user);
        LOG.info("Loaded user's ticket cache successfully");
        return ugi;
    } else {
        throwProxyUserNotSupported();
        return currentUser;
    }
}

Hadoop相关配置

读取Hadoop配置文件的流程位于HadoopUtilsgetHadoopConfiguration方法。流程如下:

  1. 从class path中读取hdfs-default.xml和hdfs-site.xml文件。
  2. $HADOOP_HOME/conf$HADOOP_HOME/etc/hadoop中读取。
  3. 从Flink配置文件中fs.hdfs.hdfsdefaultfs.hdfs.hdfssitefs.hdfs.hadoopconf配置读取。此方法已废弃不建议使用。
  4. HADOOP_CONF_DIR读取。
  5. 从Flink配置文件读取flink.hadoop.开头的配置作为Hadoop的配置项。
@SuppressWarnings("deprecation")
public static Configuration getHadoopConfiguration(
        org.apache.flink.configuration.Configuration flinkConfiguration) {

    // Instantiate an HdfsConfiguration to load the hdfs-site.xml and hdfs-default.xml
    // from the classpath
    // 从classpath中读取hdfs-default.xml和hdfs-site.xml文件
    Configuration result = new HdfsConfiguration();
    boolean foundHadoopConfiguration = false;

    // We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
    // the hdfs configuration.
    // The properties of a newly added resource will override the ones in previous resources, so
    // a configuration
    // file with higher priority should be added later.

    // Approach 1: HADOOP_HOME environment variables
    String[] possibleHadoopConfPaths = new String[2];

    // 从HADOOP_HOME环境变量获取
    // $HADOOP_HOME/conf和$HADOOP_HOME/etc/hadoop
    final String hadoopHome = System.getenv("HADOOP_HOME");
    if (hadoopHome != null) {
        LOG.debug("Searching Hadoop configuration files in HADOOP_HOME: {}", hadoopHome);
        possibleHadoopConfPaths[0] = hadoopHome + "/conf";
        possibleHadoopConfPaths[1] = hadoopHome + "/etc/hadoop"; // hadoop 2.2
    }

    for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
        if (possibleHadoopConfPath != null) {
            foundHadoopConfiguration = addHadoopConfIfFound(result, possibleHadoopConfPath);
        }
    }

    // Approach 2: Flink configuration (deprecated)
    // 从Flink配置文件中fs.hdfs.hdfsdefault配置项获取
    final String hdfsDefaultPath =
            flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
    if (hdfsDefaultPath != null) {
        result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
        LOG.debug(
                "Using hdfs-default configuration-file path from Flink config: {}",
                hdfsDefaultPath);
        foundHadoopConfiguration = true;
    }

    // 从Flink配置文件中fs.hdfs.hdfssite配置项获取
    final String hdfsSitePath =
            flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
    if (hdfsSitePath != null) {
        result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
        LOG.debug(
                "Using hdfs-site configuration-file path from Flink config: {}", hdfsSitePath);
        foundHadoopConfiguration = true;
    }

    // 从Flink配置文件中fs.hdfs.hadoopconf配置项获取
    final String hadoopConfigPath =
            flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
    if (hadoopConfigPath != null) {
        LOG.debug("Searching Hadoop configuration files in Flink config: {}", hadoopConfigPath);
        foundHadoopConfiguration =
                addHadoopConfIfFound(result, hadoopConfigPath) || foundHadoopConfiguration;
    }

    // Approach 3: HADOOP_CONF_DIR environment variable
    // 从HADOOP_CONF_DIR环境变量获取
    String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
    if (hadoopConfDir != null) {
        LOG.debug("Searching Hadoop configuration files in HADOOP_CONF_DIR: {}", hadoopConfDir);
        foundHadoopConfiguration =
                addHadoopConfIfFound(result, hadoopConfDir) || foundHadoopConfiguration;
    }

    // Approach 4: Flink configuration
    // add all configuration key with prefix 'flink.hadoop.' in flink conf to hadoop conf
    // 读取Flink配置文件中flink.hadoop.开头的配置项,去掉该前缀的内容作为key放入configuration
    for (String key : flinkConfiguration.keySet()) {
        for (String prefix : FLINK_CONFIG_PREFIXES) {
            if (key.startsWith(prefix)) {
                String newKey = key.substring(prefix.length());
                String value = flinkConfiguration.getString(key, null);
                result.set(newKey, value);
                LOG.debug(
                        "Adding Flink config entry for {} as {}={} to Hadoop config",
                        key,
                        newKey,
                        value);
                foundHadoopConfiguration = true;
            }
        }
    }

    // 如果以上途径均未发现Hadoop配置,打印警告
    if (!foundHadoopConfiguration) {
        LOG.warn(
                "Could not find Hadoop configuration via any of the supported methods "
                        + "(Flink configuration, environment variables).");
    }

    return result;
}

Hadoop class path配置

构建Flink Hadoop classpath的相关代码位于config.sh

INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"

可使用export HADOOP_CLASSPATH=xxx方式为Flink指定Hadoop的class path。

通过这行代码还可以得知Flink的class path包含HADOOP_CONF_DIRYARN_CONF_DIR,分别对应Hadoop和Yarn配置文件目录。可以通过HADOOP_CONF_DIRYARN_CONF_DIR分别指定Hadoop和Yarn的配置文件路径。

Yarn相关配置

YarnClusterDescriptor::isReadyForDeployment方法中检测HADOOP_CONF_DIRYARN_CONF_DIR。如果HADOOP_CONF_DIR或者YARN_CONF_DIR都没有配置,打印警告。

// check if required Hadoop environment variables are set. If not, warn user
if (System.getenv("HADOOP_CONF_DIR") == null && System.getenv("YARN_CONF_DIR") == null) {
    LOG.warn(
            "Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set. "
                    + "The Flink YARN Client needs one of these to be set to properly load the Hadoop "
                    + "configuration for accessing YARN.");
}

一般来说hadoop classpath命令的返回结果中包含了Hadoop的conf目录,所以没有指定HADOOP_CONF_DIRYARN_CONF_DIR也不影响Flink作业访问HDFS和提交Yarn集群。

Yarn配置文件的读取逻辑为:

  1. 从class path中读取yarn-site.xmlyarn-default.xml。因为HADOOP_CLASSPATHHADOOP_CONF_DIRYARN_CONF_DIR环境变量都可以修改Flink Hadoop的class path,这三个环境变量的配置均会影响此步骤。
  2. 通过Flink配置文件中flink.yarn开头的配置指定Yarn配置项。

下面是源代码分析。

提交Yarn作业之前通过Utils::getYarnAndHadoopConfiguration方法读取Hadoop和Yarn的配置文件。

public static YarnConfiguration getYarnAndHadoopConfiguration(
        org.apache.flink.configuration.Configuration flinkConfig) {
    final YarnConfiguration yarnConfig = getYarnConfiguration(flinkConfig);
    // 获取Hadoop配置,前面已分析过
    yarnConfig.addResource(HadoopUtils.getHadoopConfiguration(flinkConfig));

    return yarnConfig;
}

UtilsgetYarnConfiguration方法从Flink配置文件中读取flink.yarn开头的配置,去掉前缀之后作为key放入configuration。

public static YarnConfiguration getYarnConfiguration(
        org.apache.flink.configuration.Configuration flinkConfig) {
    // 从class path中获取yarn-default.xml和yarn-site.xml文件
    final YarnConfiguration yarnConfig = new YarnConfiguration();

    for (String key : flinkConfig.keySet()) {
        for (String prefix : FLINK_CONFIG_PREFIXES) {
            if (key.startsWith(prefix)) {
                String newKey = key.substring("flink.".length());
                String value = flinkConfig.getString(key, null);
                yarnConfig.set(newKey, value);
                LOG.debug(
                        "Adding Flink config entry for {} as {}={} to Yarn config",
                        key,
                        newKey,
                        value);
            }
        }
    }

    return yarnConfig;
}

附录:日志不打印问题排查

检查的顺序为:

检查$FLINK_HOME/conf/log4j*.properties配置文件是否正确。

检查$FLINK_HOME/lib/中的日志相关jar包是否存在或是否冲突。

检查打包进作业内的日志相关jar包,需要都排除使用Flink框架自带的日志依赖库。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352

推荐阅读更多精彩内容