Flink 双网分离环境使用问题解决记录

Flink 使用介绍相关文档目录

Flink 使用介绍相关文档目录

前言

本文为大家分享博主在双网分离的环境下使用Flink遇到的问题和解决方案。

所谓双网分离环境指的是Flink集群运行环境存在两个网段,分别为管理网和业务网。这两个网段互不相通。管理网使用了千兆交换机,仅用于管理员操作集群和监控集群状态。业务网使用光纤和万兆交换机,Flink TaskManager之间交换数据全部走业务网,实现带宽的最大化利用。

环境信息

软件版本

  • Flink 1.15.4
  • Hadoop 3.1.1

网络配置

集群内配置有两个网络,分别为:

管理网:10.x.x.x。千兆网。
业务网:192.168.x.x。万兆网。
集群内相互访问使用hostname,hostname配置的均为业务网。

管理网和业务网不互通。

问题1:提交任务成功后出错,无法连接到JobManager

另有一个集群外客户端节点,其所在网段和管理网相同。在该客户端节点提交Flink任务到集群,出现无法连接到JobManager,报Connection Refused错误。错误中的IP为集群内万兆网的IP。但是作业能够提交成功,只是无法获取到作业的运行状态和结果。

通过这个错误不难得知问题是Flink JobManager的bind host配置存在问题。通过查看Yarn界面,找到Flink作业JobManager所在节点。使用netstat查看JobManager进程对应的端口和绑定IP。我们发现只有一个端口绑定的是192.168.x.x,其余都是0.0.0.0。这个端口对应的是Flink什么端口呢?查看Yarn中该Flink Application的启动日志可以发现这个端口是rest端口。

查阅官方文档,可以使用rest.bind-address方式绑定地址。于是编辑Flink配置文件flink-conf.yaml,增加:

rest.bind-address: 0.0.0.0

保存之后重新提交任务,问题依旧。绑定的地址并没有发生变化。查看Flink Web UI中JobManager的Configuration,发现rest.bind-addressrest.address值相同,都为集群内JobManager运行所在机器的hostname。rest.bind-address配置项不生效。

经过详细问题调查,发现是Flink 1.15.x版本之后bug。无论如何配置rest.bind-address,都会将该属性的值覆盖为运行时所在Yarn节点的地址。参见如下代码,位于YarnEntrypointUtils.javaloadConfiguration方法。

// ...
configuration.set(JobManagerOptions.ADDRESS, hostname);
configuration.set(RestOptions.ADDRESS, hostname);
configuration.set(RestOptions.BIND_ADDRESS, hostname);
// ...

该特性对应的提交为[FLINK-24474] Default rest.bind-address to localhost in flink-conf.yaml。Commit ID:6222532db0f0a1e75811fc215cd66bc26fb74afb

解决方法为修改上述片段的代码为:

// ...
configuration.set(JobManagerOptions.ADDRESS, hostname);
configuration.set(RestOptions.ADDRESS, hostname);
if (!configuration.contains(RestOptions.BIND_ADDRESS)) {
    configuration.set(RestOptions.BIND_ADDRESS, hostname);
}
// ...

按照修改之后的逻辑,只有配置文件中没有配置rest.bind-address,才会使用Yarn节点的地址。如果配置了,则以实际配置的值为准。

修改后重新编译,替换掉Flink客户端中flink-dist-xxx.jar文件。重新提交作业可以解决。

博主已通过patch FLINK-35332解决了这个问题。

问题2:集群外客户端flink提交时host无法解析为IP

问题现象:使用flink run方式在集群外提交可以ResourceManager可以解析host为IP,但是yarn-session模式启动的时候无法解析host。

经调查发现,控制解析的配置项为hadoop.security.token.service.use_ip

配置集群外客户端的hdfs-site.xml(这里有伏笔),修改hadoop.security.token.service.use_iptrue

<property>
    <name>hadoop.security.token.service.use_ip</name>
    <value>true</value>
</property>

集群中Hadoop的配置项保持不变,为false。然后尝试启动yarn-session,问题解决。

查找了Hadoop官网,发现hadoop.security.token.service.use_ip是属于core-site.xml的配置。集群中和集群外客户端将其错误的写到hdfs-site.xml中。

这里先说结论,这个诡异的问题原因在于该配置不能位于hdfs-site.xml中,必须在core-site.xml中。如果该属性错误配置到了hdfs-site.xml,会导致flink run -m yarn-cluster方式提交可以解析host但是yarn-session模式启动yarn-session的时候无法解析host。至于为什么会这样,我们接下来分析。

默认的日志级别不够详细,看不出来什么内容。将日志开启DEBUG级别之后提交任务。查看日志发现flink run -m yarn-cluster方式的hadoop.security.token.service.use_ip的值为true,启动yarn-session的时候hadoop.security.token.service.use_ip的值为false。怀疑yarn-session模式提交的时候不加载hdfs-site.xml

仔细观察flink yarn-session的启动日志,发现开头有这一行:

... org.apache.flink.runtime.util.HadoopUtils [] - Searching Hadoop configuration files in HADOOP_HOME

Flink run命令启动的时候没有这一行。怀疑是这里加载了HDFS配置文件。

想要揭开谜底,我们接下来分析Flink yarn-session的启动代码。

Flink Yarn Session启动代码

查看yarn-session.sh的最后一行启动Java进程代码,可得知应用的入口为FlinkYarnSessionCli类。

"$JAVA_RUN" $JVM_ARGS $FLINK_ENV_JAVA_OPTS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -j "$FLINK_LIB_DIR"/flink-dist*.jar "$@"

跟踪main方法,然后找到FlinkYarnSessionCli::run方法,我们找到这一行代码:

// ...
final YarnClusterDescriptor yarnClusterDescriptor =
        (YarnClusterDescriptor)
                yarnClusterClientFactory.createClusterDescriptor(effectiveConfiguration);
// ...

继续跟踪,查看YarnClusterClientFactory::createClusterDescriptor的内容:

@Override
public YarnClusterDescriptor createClusterDescriptor(Configuration configuration) {
    checkNotNull(configuration);

    final String configurationDirectory = configuration.get(DeploymentOptionsInternal.CONF_DIR);
    YarnLogConfigUtil.setLogConfigFileInConfig(configuration, configurationDirectory);

    return getClusterDescriptor(configuration);
}

继续分析YarnClusterClientFactory::getClusterDescriptor

private YarnClusterDescriptor getClusterDescriptor(Configuration configuration) {
    final YarnClient yarnClient = YarnClient.createYarnClient();
    // 获取Yarn和Hadoop配置
    final YarnConfiguration yarnConfiguration =
            Utils.getYarnAndHadoopConfiguration(configuration);

    yarnClient.init(yarnConfiguration);
    yarnClient.start();

    return new YarnClusterDescriptor(
            configuration,
            yarnConfiguration,
            yarnClient,
            YarnClientYarnClusterInformationRetriever.create(yarnClient),
            false);
}

这里有一个关键调用Utils.getYarnAndHadoopConfiguration(configuration)。作用为获取Yarn和Hadoop配置。

public static YarnConfiguration getYarnAndHadoopConfiguration(
        org.apache.flink.configuration.Configuration flinkConfig) {
    final YarnConfiguration yarnConfig = getYarnConfiguration(flinkConfig);
    yarnConfig.addResource(HadoopUtils.getHadoopConfiguration(flinkConfig));

    return yarnConfig;
}

其中getYarnConfiguration调用中执行了new YarnConfiguration该调用从classpath中加载yarn-default.xmlyarn-site.xmlresource-types.xml文件。
同样,HadoopUtils.getHadoopConfiguration(flinkConfig)调用的时候执行了new HdfsConfiguration,加载了hdfs-site.xmlhdfs-default.xmlcore-site.xmlcore-default.xml配置。

到这里可知,Flink yarn-session启动的时候的确加载了hdfs-site.xml配置文件。

Flink run命令的入口类为CliFrontEnd,并没有使用FlinkYarnSessionCli。因此我们从Hadoop源代码中打印hadoop.security.token.service.use_ip值这一行DEBUG日志的为止展开分析。

这一行日志位于Hadoop源代码中SecurityUtil类的setTokenServiceUseIp方法中。我们将相关的方法贴出来,如下所示:

  static {
    setConfigurationInternal(new Configuration());
  }
  
  private static void setConfigurationInternal(Configuration conf) {
    boolean useIp = conf.getBoolean(
      CommonConfigurationKeys.HADOOP_SECURITY_TOKEN_SERVICE_USE_IP,
      CommonConfigurationKeys.HADOOP_SECURITY_TOKEN_SERVICE_USE_IP_DEFAULT);
    setTokenServiceUseIp(useIp);
    // ...
  }

  @InterfaceAudience.Private
  @VisibleForTesting
  public static void setTokenServiceUseIp(boolean flag) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Setting "
          + CommonConfigurationKeys.HADOOP_SECURITY_TOKEN_SERVICE_USE_IP
          + " to " + flag);
    }
    useIpForTokenService = flag;
    hostResolver = !useIpForTokenService
        ? new QualifiedHostResolver()
        : new StandardHostResolver();
  }

不难发现,在SecurityUtil类加载的时候执行了setConfigrationInternal方法,然后间接调用了setTokenServiceUseIp方法。

setConfigrationInternal传入的参数是new Configuration。我们查看它的static代码块:

static {
    deprecationContext = new AtomicReference(new DeprecationContext((DeprecationContext)null, defaultDeprecations));
    addDefaultResource("core-default.xml");
    addDefaultResource("core-site.xml");
    ClassLoader cL = Thread.currentThread().getContextClassLoader();
    if (cL == null) {
        cL = Configuration.class.getClassLoader();
    }

    if (cL.getResource("hadoop-site.xml") != null) {
        LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, mapred-site.xml and hdfs-site.xml to override properties of core-default.xml, mapred-default.xml and hdfs-default.xml respectively");
        addDefaultResource("hadoop-site.xml");
    }

}

发现它只从classpath中加载core-default.xmlcore-site.xml文件,不会去加载hdfs-site.xml

到这里得出结论:Flink yarn-session提交的时候加载了hdfs-site.xml文件,但是flink run -m yarn-cluster的时候没有加载。分析完毕。

这次问题分析得出的启示是对于Hadoop相关配置,务必将配置项写到正确的配置文件中,否则会出现奇怪的问题,较难定位原因。

Hadoop解析host相关代码

这里我们返回思考前面的问题:仅仅将hadoop.security.token.service.use_ip修改true的确可行。但为什么设置它为false的时候就无法解析hostname为IP了呢?

为了搞清楚这个问题,接下来需要分析hadoop.security.token.service.use_ip究竟做了些什么。

跟踪YarnClient::start调用找到YarnImpl::serviceStart

protected void serviceStart() throws Exception {
    try {
        this.rmClient = (ApplicationClientProtocol)ClientRMProxy.createRMProxy(this.getConfig(), ApplicationClientProtocol.class);
        if (this.historyServiceEnabled) {
            this.historyClient.start();
        }
    } catch (IOException var2) {
        IOException e = var2;
        throw new YarnRuntimeException(e);
    }

    super.serviceStart();
}

继续跟踪RMProxy::createRMProxy

private static <T> T createRMProxy(YarnConfiguration conf, Class<T> protocol, RMProxy instance, RetryPolicy retryPolicy) throws IOException {
    if (HAUtil.isHAEnabled(conf)) {
        RMFailoverProxyProvider<T> provider = instance.createRMFailoverProxyProvider(conf, protocol);
        return RetryProxy.create(protocol, provider, retryPolicy);
    } else {
        InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol);
        LOG.info("Connecting to ResourceManager at " + rmAddress);
        T proxy = getProxy(conf, protocol, rmAddress);
        return RetryProxy.create(protocol, proxy, retryPolicy);
    }
}

找到获取InetSocketAddress的方法ClientRMProxy::getRMAddress

@Private
protected InetSocketAddress getRMAddress(YarnConfiguration conf, Class<?> protocol) throws IOException {
    if (protocol == ApplicationClientProtocol.class) {
        return conf.getSocketAddr("yarn.resourcemanager.address", "0.0.0.0:8032", 8032);
    } else if (protocol == ResourceManagerAdministrationProtocol.class) {
        return conf.getSocketAddr("yarn.resourcemanager.admin.address", "0.0.0.0:8033", 8033);
    } else if (protocol == ApplicationMasterProtocol.class) {
        setAMRMTokenService(conf);
        return conf.getSocketAddr("yarn.resourcemanager.scheduler.address", "0.0.0.0:8030", 8030);
    } else {
        String message = "Unsupported protocol found when creating the proxy connection to ResourceManager: " + (protocol != null ? protocol.getClass().getName() : "null");
        LOG.error(message);
        throw new IllegalStateException(message);
    }
}

继续展开YarnConfiguration.getSocketAddr

public InetSocketAddress getSocketAddr(String name, String defaultAddress, int defaultPort) {
    String address;
    if (HAUtil.isHAEnabled(this) && getServiceAddressConfKeys(this).contains(name)) {
        address = HAUtil.getConfValueForRMInstance(name, defaultAddress, this);
    } else {
        address = this.get(name, defaultAddress);
    }

    return NetUtils.createSocketAddr(address, defaultPort, name);
}

跟踪NetUtils::createSocketAddr,一路到NetUtils::createSocketAddrForHost

public static InetSocketAddress createSocketAddrForHost(String host, int port) {
    String staticHost = getStaticResolution(host);
    String resolveHost = staticHost != null ? staticHost : host;

    InetSocketAddress addr;
    try {
        InetAddress iaddr = SecurityUtil.getByName(resolveHost);
        if (staticHost != null) {
            iaddr = InetAddress.getByAddress(host, iaddr.getAddress());
        }

        addr = new InetSocketAddress(iaddr, port);
    } catch (UnknownHostException var6) {
        addr = InetSocketAddress.createUnresolved(host, port);
    }

    return addr;
}

最终跟踪到SecurityUtil::getByName方法:

@Private
public static InetAddress getByName(String hostname) throws UnknownHostException {
    return hostResolver.getByName(hostname);
}

它使用hostResolvergetByName方法解析host。
hostResolverHostResolver类型,具有两个实现类,为StandardHostResolverQulifiedHostResolver两种。

至于hostResolver具体是哪个类,答案就在它初始化赋值的地方。我们找到setTokenServiceUseIp方法。

@Private
@VisibleForTesting
public static void setTokenServiceUseIp(boolean flag) {
    useIpForTokenService = flag;
    hostResolver = (HostResolver)(!useIpForTokenService ? new QualifiedHostResolver() : new StandardHostResolver());
}

到这里我们发现hadoop.security.token.service.use_iptrue对应的是StandardHostResolverfalse对应的是QualifiedHostResolver

StandardHostResolver解析host的方式较为简单,直接使用JDK的InetAddress.getByName完成解析。不再过多分析。QualifiedHostResolver解析较为复杂。前面的问题是hadoop.security.token.service.use_ip配置为false的时候无法解析host,那接下来的重点则是搞清楚QualifiedHostResolver如何解析host。

我们查看QualifiedHostResolvergetByName方法:

@Override
public InetAddress getByName(String host) throws UnknownHostException {
    InetAddress addr = null;
    byte[] ip;
    if (IPAddressUtil.isIPv4LiteralAddress(host)) {
        ip = IPAddressUtil.textToNumericFormatV4(host);
        addr = InetAddress.getByAddress(host, ip);
    } else if (IPAddressUtil.isIPv6LiteralAddress(host)) {
        ip = IPAddressUtil.textToNumericFormatV6(host);
        addr = InetAddress.getByAddress(host, ip);
    } else if (host.endsWith(".")) {
        addr = this.getByExactName(host);
    } else if (host.contains(".")) {
        addr = this.getByExactName(host);
        if (addr == null) {
            addr = this.getByNameWithSearch(host);
        }
    } else {
        InetAddress loopback = InetAddress.getByName((String)null);
        if (host.equalsIgnoreCase(loopback.getHostName())) {
            addr = InetAddress.getByAddress(host, loopback.getAddress());
        } else {
            addr = this.getByNameWithSearch(host);
            if (addr == null) {
                addr = this.getByExactName(host);
            }
        }
    }

    if (addr == null) {
        throw new UnknownHostException(host);
    } else {
        return addr;
    }
}

该方法的逻辑:

  1. 如果host字符串是合法的IPv4和IPv6字面值,如果是,将且构造为InetAddress返回,不用解析hostname。
  2. 如果host字符串以.结尾,调用getByExactName解析。
  3. 如果host字符串包含.,先使用getByExactName解析。如果无法解析,使用searchDomain逐个尝试解析。
  4. 其余情况,先尝试按照loopback解析。如果无法解析,按照前面即第三种情况解析。

接下来分析按照host解析IP的方法getByExactName

InetAddress getByExactName(String host) {
    InetAddress addr = null;
    // 这里将host视为FQDN
    String fqHost = host;
    // 如果FQDN结尾不是.,在结尾增加一个点
    if (!fqHost.endsWith(".")) {
        fqHost = fqHost + ".";
    }

    try {
        // 调用InetAddress.getByName解析FQDN
        addr = this.getInetAddressByName(fqHost);
        // 构建InetAddress
        addr = InetAddress.getByAddress(host, addr.getAddress());
    } catch (UnknownHostException var5) {
    }

    return addr;
}

到这里问题已经比较清晰了。集群外客户端节点没有配置DNS,检查该节点配置的/etc/hosts,发现指向集群的hostname都不是.结尾的。前面方法解析host的时候在host结尾拼接了一个.再解析。我们按照这个逻辑,尝试将host后面加一个.,然后ping一下这个host,发现域名解析错误,问题锁定。

PS: 以"."结尾的FQDN称之为rooted/absolute FQDN。getByExactName将FQDN转化为absolute FQDN,然后再尝试解析。

修改/etc/hosts文件,将这些域名结尾都加上.,然后保持hadoop.security.token.service.use_ipfalse不变,重新提交任务,任务运行一切正常。

这里建议使用DNS方式解析域名。

最后我们在额外看下getByNameWithSearch方法做了些什么。

InetAddress getByNameWithSearch(String host) {
    InetAddress addr = null;
    if (host.endsWith(".")) {
        addr = this.getByExactName(host);
    } else {
        Iterator i$ = this.searchDomains.iterator();

        while(i$.hasNext()) {
            String domain = (String)i$.next();
            String dot = !domain.startsWith(".") ? "." : "";
            addr = this.getByExactName(host + dot + domain);
            if (addr != null) {
                break;
            }
        }
    }

    return addr;
}

该方法遍历每个search domain(Linux/etc/resolv.conf中的search配置),分别在原始host后面拼接上search domain,逐个尝试解析。一旦解析成功立刻返回。

本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。