Flink 使用介绍相关文档目录
前言
本文为大家分享博主在双网分离的环境下使用Flink遇到的问题和解决方案。
所谓双网分离环境指的是Flink集群运行环境存在两个网段,分别为管理网和业务网。这两个网段互不相通。管理网使用了千兆交换机,仅用于管理员操作集群和监控集群状态。业务网使用光纤和万兆交换机,Flink TaskManager之间交换数据全部走业务网,实现带宽的最大化利用。
环境信息
软件版本
- Flink 1.15.4
- Hadoop 3.1.1
网络配置
集群内配置有两个网络,分别为:
管理网:10.x.x.x。千兆网。
业务网:192.168.x.x。万兆网。
集群内相互访问使用hostname,hostname配置的均为业务网。
管理网和业务网不互通。
问题1:提交任务成功后出错,无法连接到JobManager
另有一个集群外客户端节点,其所在网段和管理网相同。在该客户端节点提交Flink任务到集群,出现无法连接到JobManager,报Connection Refused错误。错误中的IP为集群内万兆网的IP。但是作业能够提交成功,只是无法获取到作业的运行状态和结果。
通过这个错误不难得知问题是Flink JobManager的bind host配置存在问题。通过查看Yarn界面,找到Flink作业JobManager所在节点。使用netstat查看JobManager进程对应的端口和绑定IP。我们发现只有一个端口绑定的是192.168.x.x
,其余都是0.0.0.0
。这个端口对应的是Flink什么端口呢?查看Yarn中该Flink Application的启动日志可以发现这个端口是rest端口。
查阅官方文档,可以使用rest.bind-address
方式绑定地址。于是编辑Flink配置文件flink-conf.yaml
,增加:
rest.bind-address: 0.0.0.0
保存之后重新提交任务,问题依旧。绑定的地址并没有发生变化。查看Flink Web UI中JobManager的Configuration,发现rest.bind-address
和rest.address
值相同,都为集群内JobManager运行所在机器的hostname。rest.bind-address
配置项不生效。
经过详细问题调查,发现是Flink 1.15.x版本之后bug。无论如何配置rest.bind-address
,都会将该属性的值覆盖为运行时所在Yarn节点的地址。参见如下代码,位于YarnEntrypointUtils.java
的loadConfiguration
方法。
// ...
configuration.set(JobManagerOptions.ADDRESS, hostname);
configuration.set(RestOptions.ADDRESS, hostname);
configuration.set(RestOptions.BIND_ADDRESS, hostname);
// ...
该特性对应的提交为[FLINK-24474] Default rest.bind-address to localhost in flink-conf.yaml。Commit ID:6222532db0f0a1e75811fc215cd66bc26fb74afb
解决方法为修改上述片段的代码为:
// ...
configuration.set(JobManagerOptions.ADDRESS, hostname);
configuration.set(RestOptions.ADDRESS, hostname);
if (!configuration.contains(RestOptions.BIND_ADDRESS)) {
configuration.set(RestOptions.BIND_ADDRESS, hostname);
}
// ...
按照修改之后的逻辑,只有配置文件中没有配置rest.bind-address
,才会使用Yarn节点的地址。如果配置了,则以实际配置的值为准。
修改后重新编译,替换掉Flink客户端中flink-dist-xxx.jar
文件。重新提交作业可以解决。
博主已通过patch FLINK-35332解决了这个问题。
问题2:集群外客户端flink提交时host无法解析为IP
问题现象:使用flink run方式在集群外提交可以ResourceManager可以解析host为IP,但是yarn-session模式启动的时候无法解析host。
经调查发现,控制解析的配置项为hadoop.security.token.service.use_ip
。
配置集群外客户端的hdfs-site.xml
(这里有伏笔),修改hadoop.security.token.service.use_ip
为true
:
<property>
<name>hadoop.security.token.service.use_ip</name>
<value>true</value>
</property>
集群中Hadoop的配置项保持不变,为false
。然后尝试启动yarn-session
,问题解决。
查找了Hadoop官网,发现hadoop.security.token.service.use_ip
是属于core-site.xml
的配置。集群中和集群外客户端将其错误的写到hdfs-site.xml
中。
这里先说结论,这个诡异的问题原因在于该配置不能位于hdfs-site.xml
中,必须在core-site.xml
中。如果该属性错误配置到了hdfs-site.xml
,会导致flink run -m yarn-cluster
方式提交可以解析host但是yarn-session模式启动yarn-session的时候无法解析host。至于为什么会这样,我们接下来分析。
默认的日志级别不够详细,看不出来什么内容。将日志开启DEBUG级别之后提交任务。查看日志发现flink run -m yarn-cluster
方式的hadoop.security.token.service.use_ip
的值为true,启动yarn-session的时候hadoop.security.token.service.use_ip
的值为false。怀疑yarn-session模式提交的时候不加载hdfs-site.xml
。
仔细观察flink yarn-session的启动日志,发现开头有这一行:
... org.apache.flink.runtime.util.HadoopUtils [] - Searching Hadoop configuration files in HADOOP_HOME
Flink run命令启动的时候没有这一行。怀疑是这里加载了HDFS配置文件。
想要揭开谜底,我们接下来分析Flink yarn-session的启动代码。
Flink Yarn Session启动代码
查看yarn-session.sh
的最后一行启动Java进程代码,可得知应用的入口为FlinkYarnSessionCli
类。
"$JAVA_RUN" $JVM_ARGS $FLINK_ENV_JAVA_OPTS -classpath "$CC_CLASSPATH" $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -j "$FLINK_LIB_DIR"/flink-dist*.jar "$@"
跟踪main方法,然后找到FlinkYarnSessionCli::run
方法,我们找到这一行代码:
// ...
final YarnClusterDescriptor yarnClusterDescriptor =
(YarnClusterDescriptor)
yarnClusterClientFactory.createClusterDescriptor(effectiveConfiguration);
// ...
继续跟踪,查看YarnClusterClientFactory::createClusterDescriptor
的内容:
@Override
public YarnClusterDescriptor createClusterDescriptor(Configuration configuration) {
checkNotNull(configuration);
final String configurationDirectory = configuration.get(DeploymentOptionsInternal.CONF_DIR);
YarnLogConfigUtil.setLogConfigFileInConfig(configuration, configurationDirectory);
return getClusterDescriptor(configuration);
}
继续分析YarnClusterClientFactory::getClusterDescriptor
。
private YarnClusterDescriptor getClusterDescriptor(Configuration configuration) {
final YarnClient yarnClient = YarnClient.createYarnClient();
// 获取Yarn和Hadoop配置
final YarnConfiguration yarnConfiguration =
Utils.getYarnAndHadoopConfiguration(configuration);
yarnClient.init(yarnConfiguration);
yarnClient.start();
return new YarnClusterDescriptor(
configuration,
yarnConfiguration,
yarnClient,
YarnClientYarnClusterInformationRetriever.create(yarnClient),
false);
}
这里有一个关键调用Utils.getYarnAndHadoopConfiguration(configuration)
。作用为获取Yarn和Hadoop配置。
public static YarnConfiguration getYarnAndHadoopConfiguration(
org.apache.flink.configuration.Configuration flinkConfig) {
final YarnConfiguration yarnConfig = getYarnConfiguration(flinkConfig);
yarnConfig.addResource(HadoopUtils.getHadoopConfiguration(flinkConfig));
return yarnConfig;
}
其中getYarnConfiguration
调用中执行了new YarnConfiguration
该调用从classpath中加载yarn-default.xml
、yarn-site.xml
和resource-types.xml
文件。
同样,HadoopUtils.getHadoopConfiguration(flinkConfig)
调用的时候执行了new HdfsConfiguration
,加载了hdfs-site.xml
、hdfs-default.xml
和core-site.xml
、core-default.xml
配置。
到这里可知,Flink yarn-session启动的时候的确加载了hdfs-site.xml
配置文件。
Flink run命令的入口类为CliFrontEnd
,并没有使用FlinkYarnSessionCli
。因此我们从Hadoop源代码中打印hadoop.security.token.service.use_ip
值这一行DEBUG日志的为止展开分析。
这一行日志位于Hadoop源代码中SecurityUtil
类的setTokenServiceUseIp
方法中。我们将相关的方法贴出来,如下所示:
static {
setConfigurationInternal(new Configuration());
}
private static void setConfigurationInternal(Configuration conf) {
boolean useIp = conf.getBoolean(
CommonConfigurationKeys.HADOOP_SECURITY_TOKEN_SERVICE_USE_IP,
CommonConfigurationKeys.HADOOP_SECURITY_TOKEN_SERVICE_USE_IP_DEFAULT);
setTokenServiceUseIp(useIp);
// ...
}
@InterfaceAudience.Private
@VisibleForTesting
public static void setTokenServiceUseIp(boolean flag) {
if (LOG.isDebugEnabled()) {
LOG.debug("Setting "
+ CommonConfigurationKeys.HADOOP_SECURITY_TOKEN_SERVICE_USE_IP
+ " to " + flag);
}
useIpForTokenService = flag;
hostResolver = !useIpForTokenService
? new QualifiedHostResolver()
: new StandardHostResolver();
}
不难发现,在SecurityUtil
类加载的时候执行了setConfigrationInternal
方法,然后间接调用了setTokenServiceUseIp
方法。
setConfigrationInternal
传入的参数是new Configuration
。我们查看它的static代码块:
static {
deprecationContext = new AtomicReference(new DeprecationContext((DeprecationContext)null, defaultDeprecations));
addDefaultResource("core-default.xml");
addDefaultResource("core-site.xml");
ClassLoader cL = Thread.currentThread().getContextClassLoader();
if (cL == null) {
cL = Configuration.class.getClassLoader();
}
if (cL.getResource("hadoop-site.xml") != null) {
LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, mapred-site.xml and hdfs-site.xml to override properties of core-default.xml, mapred-default.xml and hdfs-default.xml respectively");
addDefaultResource("hadoop-site.xml");
}
}
发现它只从classpath中加载core-default.xml
和core-site.xml
文件,不会去加载hdfs-site.xml
。
到这里得出结论:Flink yarn-session提交的时候加载了hdfs-site.xml
文件,但是flink run -m yarn-cluster
的时候没有加载。分析完毕。
这次问题分析得出的启示是对于Hadoop相关配置,务必将配置项写到正确的配置文件中,否则会出现奇怪的问题,较难定位原因。
Hadoop解析host相关代码
这里我们返回思考前面的问题:仅仅将hadoop.security.token.service.use_ip
修改true
的确可行。但为什么设置它为false
的时候就无法解析hostname为IP了呢?
为了搞清楚这个问题,接下来需要分析hadoop.security.token.service.use_ip
究竟做了些什么。
跟踪YarnClient::start
调用找到YarnImpl::serviceStart
protected void serviceStart() throws Exception {
try {
this.rmClient = (ApplicationClientProtocol)ClientRMProxy.createRMProxy(this.getConfig(), ApplicationClientProtocol.class);
if (this.historyServiceEnabled) {
this.historyClient.start();
}
} catch (IOException var2) {
IOException e = var2;
throw new YarnRuntimeException(e);
}
super.serviceStart();
}
继续跟踪RMProxy::createRMProxy
:
private static <T> T createRMProxy(YarnConfiguration conf, Class<T> protocol, RMProxy instance, RetryPolicy retryPolicy) throws IOException {
if (HAUtil.isHAEnabled(conf)) {
RMFailoverProxyProvider<T> provider = instance.createRMFailoverProxyProvider(conf, protocol);
return RetryProxy.create(protocol, provider, retryPolicy);
} else {
InetSocketAddress rmAddress = instance.getRMAddress(conf, protocol);
LOG.info("Connecting to ResourceManager at " + rmAddress);
T proxy = getProxy(conf, protocol, rmAddress);
return RetryProxy.create(protocol, proxy, retryPolicy);
}
}
找到获取InetSocketAddress
的方法ClientRMProxy::getRMAddress
:
@Private
protected InetSocketAddress getRMAddress(YarnConfiguration conf, Class<?> protocol) throws IOException {
if (protocol == ApplicationClientProtocol.class) {
return conf.getSocketAddr("yarn.resourcemanager.address", "0.0.0.0:8032", 8032);
} else if (protocol == ResourceManagerAdministrationProtocol.class) {
return conf.getSocketAddr("yarn.resourcemanager.admin.address", "0.0.0.0:8033", 8033);
} else if (protocol == ApplicationMasterProtocol.class) {
setAMRMTokenService(conf);
return conf.getSocketAddr("yarn.resourcemanager.scheduler.address", "0.0.0.0:8030", 8030);
} else {
String message = "Unsupported protocol found when creating the proxy connection to ResourceManager: " + (protocol != null ? protocol.getClass().getName() : "null");
LOG.error(message);
throw new IllegalStateException(message);
}
}
继续展开YarnConfiguration.getSocketAddr
:
public InetSocketAddress getSocketAddr(String name, String defaultAddress, int defaultPort) {
String address;
if (HAUtil.isHAEnabled(this) && getServiceAddressConfKeys(this).contains(name)) {
address = HAUtil.getConfValueForRMInstance(name, defaultAddress, this);
} else {
address = this.get(name, defaultAddress);
}
return NetUtils.createSocketAddr(address, defaultPort, name);
}
跟踪NetUtils::createSocketAddr
,一路到NetUtils::createSocketAddrForHost
:
public static InetSocketAddress createSocketAddrForHost(String host, int port) {
String staticHost = getStaticResolution(host);
String resolveHost = staticHost != null ? staticHost : host;
InetSocketAddress addr;
try {
InetAddress iaddr = SecurityUtil.getByName(resolveHost);
if (staticHost != null) {
iaddr = InetAddress.getByAddress(host, iaddr.getAddress());
}
addr = new InetSocketAddress(iaddr, port);
} catch (UnknownHostException var6) {
addr = InetSocketAddress.createUnresolved(host, port);
}
return addr;
}
最终跟踪到SecurityUtil::getByName
方法:
@Private
public static InetAddress getByName(String hostname) throws UnknownHostException {
return hostResolver.getByName(hostname);
}
它使用hostResolver
的getByName
方法解析host。
hostResolver
是HostResolver
类型,具有两个实现类,为StandardHostResolver
和QulifiedHostResolver
两种。
至于hostResolver
具体是哪个类,答案就在它初始化赋值的地方。我们找到setTokenServiceUseIp
方法。
@Private
@VisibleForTesting
public static void setTokenServiceUseIp(boolean flag) {
useIpForTokenService = flag;
hostResolver = (HostResolver)(!useIpForTokenService ? new QualifiedHostResolver() : new StandardHostResolver());
}
到这里我们发现hadoop.security.token.service.use_ip
为true
对应的是StandardHostResolver
,false
对应的是QualifiedHostResolver
。
StandardHostResolver
解析host的方式较为简单,直接使用JDK的InetAddress.getByName
完成解析。不再过多分析。QualifiedHostResolver
解析较为复杂。前面的问题是hadoop.security.token.service.use_ip
配置为false
的时候无法解析host,那接下来的重点则是搞清楚QualifiedHostResolver
如何解析host。
我们查看QualifiedHostResolver
的getByName
方法:
@Override
public InetAddress getByName(String host) throws UnknownHostException {
InetAddress addr = null;
byte[] ip;
if (IPAddressUtil.isIPv4LiteralAddress(host)) {
ip = IPAddressUtil.textToNumericFormatV4(host);
addr = InetAddress.getByAddress(host, ip);
} else if (IPAddressUtil.isIPv6LiteralAddress(host)) {
ip = IPAddressUtil.textToNumericFormatV6(host);
addr = InetAddress.getByAddress(host, ip);
} else if (host.endsWith(".")) {
addr = this.getByExactName(host);
} else if (host.contains(".")) {
addr = this.getByExactName(host);
if (addr == null) {
addr = this.getByNameWithSearch(host);
}
} else {
InetAddress loopback = InetAddress.getByName((String)null);
if (host.equalsIgnoreCase(loopback.getHostName())) {
addr = InetAddress.getByAddress(host, loopback.getAddress());
} else {
addr = this.getByNameWithSearch(host);
if (addr == null) {
addr = this.getByExactName(host);
}
}
}
if (addr == null) {
throw new UnknownHostException(host);
} else {
return addr;
}
}
该方法的逻辑:
- 如果host字符串是合法的IPv4和IPv6字面值,如果是,将且构造为InetAddress返回,不用解析hostname。
- 如果host字符串以
.
结尾,调用getByExactName解析。 - 如果host字符串包含
.
,先使用getByExactName解析。如果无法解析,使用searchDomain逐个尝试解析。 - 其余情况,先尝试按照loopback解析。如果无法解析,按照前面即第三种情况解析。
接下来分析按照host解析IP的方法getByExactName
。
InetAddress getByExactName(String host) {
InetAddress addr = null;
// 这里将host视为FQDN
String fqHost = host;
// 如果FQDN结尾不是.,在结尾增加一个点
if (!fqHost.endsWith(".")) {
fqHost = fqHost + ".";
}
try {
// 调用InetAddress.getByName解析FQDN
addr = this.getInetAddressByName(fqHost);
// 构建InetAddress
addr = InetAddress.getByAddress(host, addr.getAddress());
} catch (UnknownHostException var5) {
}
return addr;
}
到这里问题已经比较清晰了。集群外客户端节点没有配置DNS,检查该节点配置的/etc/hosts
,发现指向集群的hostname都不是.
结尾的。前面方法解析host的时候在host结尾拼接了一个.
再解析。我们按照这个逻辑,尝试将host后面加一个.
,然后ping一下这个host,发现域名解析错误,问题锁定。
PS: 以"."结尾的FQDN称之为rooted/absolute FQDN。
getByExactName
将FQDN转化为absolute FQDN,然后再尝试解析。
修改/etc/hosts
文件,将这些域名结尾都加上.
,然后保持hadoop.security.token.service.use_ip
为false
不变,重新提交任务,任务运行一切正常。
这里建议使用DNS方式解析域名。
最后我们在额外看下getByNameWithSearch
方法做了些什么。
InetAddress getByNameWithSearch(String host) {
InetAddress addr = null;
if (host.endsWith(".")) {
addr = this.getByExactName(host);
} else {
Iterator i$ = this.searchDomains.iterator();
while(i$.hasNext()) {
String domain = (String)i$.next();
String dot = !domain.startsWith(".") ? "." : "";
addr = this.getByExactName(host + dot + domain);
if (addr != null) {
break;
}
}
}
return addr;
}
该方法遍历每个search domain(Linux/etc/resolv.conf
中的search配置),分别在原始host后面拼接上search domain,逐个尝试解析。一旦解析成功立刻返回。
本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。