执行Flink Job报Too many open files问题排查

Flink Job执行报错Too many open files问题排查

最近部署flink job的时候遇到报错。

java.lang.IllegalStateException: org.apache.http.nio.reactor.IOReactorException: Failure opening selector
    at org.apache.http.impl.nio.client.IOReactorUtils.create(IOReactorUtils.java:43)
    at org.apache.http.impl.nio.client.HttpAsyncClientBuilder.build(HttpAsyncClientBuilder.java:603)
    at org.elasticsearch.client.RestClientBuilder$2.run(RestClientBuilder.java:241)
    at org.elasticsearch.client.RestClientBuilder$2.run(RestClientBuilder.java:238)
    at java.security.AccessController.doPrivileged(Native Method)
    at org.elasticsearch.client.RestClientBuilder.createHttpClient(RestClientBuilder.java:238)
    at org.elasticsearch.client.RestClientBuilder.access$000(RestClientBuilder.java:42)
    at org.elasticsearch.client.RestClientBuilder$1.run(RestClientBuilder.java:209)
    at org.elasticsearch.client.RestClientBuilder$1.run(RestClientBuilder.java:206)
    at java.security.AccessController.doPrivileged(Native Method)
    at org.elasticsearch.client.RestClientBuilder.build(RestClientBuilder.java:206)
    at org.elasticsearch.client.RestHighLevelClient.<init>(RestHighLevelClient.java:269)
    at org.elasticsearch.client.RestHighLevelClient.<init>(RestHighLevelClient.java:261)
    at org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6ApiCallBridge.createClient(Elasticsearch6ApiCallBridge.java:75)
    at org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6ApiCallBridge.createClient(Elasticsearch6ApiCallBridge.java:47)
    at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:308)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.http.nio.reactor.IOReactorException: Failure opening selector
    at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.<init>(AbstractMultiworkerIOReactor.java:146)
    at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.<init>(DefaultConnectingIOReactor.java:81)
    at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.<init>(DefaultConnectingIOReactor.java:96)
    at org.apache.http.impl.nio.client.IOReactorUtils.create(IOReactorUtils.java:41)
    ... 23 more
Caused by: java.io.IOException: Too many open files
    at sun.nio.ch.EPollArrayWrapper.epollCreate(Native Method)
    at sun.nio.ch.EPollArrayWrapper.<init>(EPollArrayWrapper.java:130)
    at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:69)
    at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
    at java.nio.channels.Selector.open(Selector.java:227)
    at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.<init>(AbstractMultiworkerIOReactor.java:144)
    ... 26 more

这个报错以前也遇到过的,先看下容器的句柄上限

# ulimit -aS | grep open
open files                      (-n) 65536

Flink TaskManger里面就这一个job,正常不应该占这么多句柄的。

因为这个job之前并行度设置很低的时候,部署成功过,所以怀疑是并行度设置高导致的。
当时给的并行度单个taskmanager是64,降低到32看一下。
降低到32后,这次job部署成功了,看下容器的句柄数,果然非常高。

[root@bss-zcm-011 flink]# lsof -p 13 | wc -l
56020

使用File Leak Detector分析句柄

File Leak Detector这个工具检查一下,官网下载得到file-leak-detector-1.13-jar-with-dependencies.jar

先取消job后把TaskManager容器重启,进入容器,attach到taskmanager进程上。

java -jar /flink/file-leak-detector-1.13-jar-with-dependencies.jar 13 http=19999,threshold=20000,strong

然后部署job,job成功运行后,执行

curl http://localhost:19999/ > openfiles.txt

分析openfiles.txt文件,首先这个文件里包含的句柄数量是18313个,和实际占用的5万多个有差距,这是因为这个工具是通过javaagent去动态拦截某些java类的方法,例如写入文件,网络通信等,并不严格包含所有句柄,不过通过里面的内容是可以分析问题的。

18313 descriptors are open

紧接着我发现这里面绝大多数是pool-开头的线程打开的,熟悉线程池的知道,jdk线程池的默认线程名称就是这个。

#165 selector by thread:pool-68-thread-1 on Thu Apr 02 10:36:59 CST 2020
    at java.nio.channels.spi.AbstractSelector.<init>(AbstractSelector.java:86)
    at sun.nio.ch.SelectorImpl.<init>(SelectorImpl.java:54)
    at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:64)
    at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
    at java.nio.channels.Selector.open(Selector.java:227)
    at org.apache.http.impl.nio.reactor.AbstractIOReactor.<init>(AbstractIOReactor.java:103)
    at org.apache.http.impl.nio.reactor.BaseIOReactor.<init>(BaseIOReactor.java:87)
    at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:320)
    at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:189)
    at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase.doExecute(CloseableHttpAsyncClientBase.java:67)
    at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase.access$000(CloseableHttpAsyncClientBase.java:38)
    at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:57)
    at java.lang.Thread.run(Thread.java:748)

pool-开头的线程有17920个,我的job是1个source对应5个es sink的,并行度是32
这台主机的cpu是112核,docker容器没有对cpu资源进行隔离。

我突然发现 5*32*112=17920

看来是某个线程池的大小默认使用了cpu核数,pool-开头的线程初始化了这个线程池

直接原因 HttpAsyncClient Sub Reactor线程池默认大小使用CPU核数

通过调试代码我发现是Flink ES Connector中的ElasticSearch RestHighLevelClient使用HttpAsyncClient创建的线程池


image.png
image.png
image.png

默认值是CPU核数,印证了猜想。

定制RestClientFactory 指定线程池大小

找到配置来源后搜索下es客户端的配置,参考Flink Elasticsearch Connector 官方文档

For Elasticsearch 6.x and above, internally, the RestHighLevelClient is used for cluster communication. By default, the connector uses the default configurations for the REST client. To have custom configuration for the REST client, users can provide a RestClientFactory implementation when setting up the ElasticsearchClient.Builder that builds the sink.

public class ESRestClientFactory implements RestClientFactory {

    private int ioThreadCount = -1;

    public ESRestClientFactory(int ioThreadCount) {
        this.ioThreadCount = ioThreadCount;
    }

    @Override
    public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
        restClientBuilder.setMaxRetryTimeoutMillis(this.maxRetryTimeout);
        restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                if (-1 != ioThreadCount) {
                    httpClientBuilder
                        .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(ioThreadCount).build());
                }
                return httpClientBuilder;
            }
        });
    }
}

将上面的ioThreadCount设置一个较小的值。
这样运行后,句柄数少了很多。这样单个taskmanager 64并行度运行job,也不会发生句柄耗尽了。

设置下HttpAsyncClient的线程名称

PS: 熟悉Mycat分库分表中间件的话,对于纯手工NIO应该不陌生的。下面main reactor,sub reactor的概念相信你能百度到,这里不会赘述。

上面我发现默认线程池名称对排查问题不友好,我想自己设置一下线程名称前缀,至少包含Flink ES Sink实例的名称。

首先http请求ES是个客户端连接操作,对应org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor,这个类继承了org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor
AbstractMultiworkerIOReactor是Main Reactor,负责创建ioThreadCount个org.apache.http.impl.nio.reactor.BaseIOReactor, BaseIOReactor即为Sub Reactor。

设置Main Reactor的线程名称

默认情况下Main Reactor的ThreadFactory是java.util.concurrent.Executors#defaultThreadFactory,这非常不友好
观察到Flink ES Sink线程的名称XXXFilter -> Sink: xxxSink (26/128),里面都有Sink:。把Sink:后面部分的截取出来,就可以知道这个Main Reactor是哪个并行度了。

package com.navercorp.pinpoint.flink.statistic;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.elasticsearch.client.RestClientBuilder;

/**
 * @author tankilo
 */
public class ESRestClientFactory implements RestClientFactory {

    private int ioThreadCount;

    public ESRestClientFactory(int ioThreadCount) {
        this.ioThreadCount = ioThreadCount;
    }

    @Override
    public void configureRestClientBuilder(RestClientBuilder restClientBuilder) {
        restClientBuilder.setMaxRetryTimeoutMillis(this.maxRetryTimeout);
        restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                if (-1 != ioThreadCount) {
                    httpClientBuilder
                        .setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(ioThreadCount).build());
                }
                String threadName = Thread.currentThread().getName();
                String namePrefix = threadName.substring(threadName.indexOf("Sink: ") + "Sink: ".length());
                httpClientBuilder.setThreadFactory(new MyThreadFactory(namePrefix));
                return httpClientBuilder;
            }
        });
    }

    private static class MyThreadFactory implements ThreadFactory {
        private final ThreadGroup group;

        private final AtomicInteger threadNumber = new AtomicInteger(1);

        private final String namePrefix;

        public MyThreadFactory(String namePrefix) {
            this.namePrefix = "es-http-main-reactor-" + namePrefix + "-";
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
            if (t.isDaemon()) {
                t.setDaemon(false);
            }
            if (t.getPriority() != Thread.NORM_PRIORITY) {
                t.setPriority(Thread.NORM_PRIORITY);
            }
            return t;
        }
    }
}

显示效果是es-http-main-reactor-xxxSink (6/8)-1

设置Sub Reactor的线程名称

Sub Reactor默认的线程名称都是I/O dispatcher 4479这样的,ThreadFactory如下
org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.DefaultThreadFactory


    static class DefaultThreadFactory implements ThreadFactory {

        private final static AtomicLong COUNT = new AtomicLong(1);

        @Override
        public Thread newThread(final Runnable r) {
            return new Thread(r, "I/O dispatcher " + COUNT.getAndIncrement());
        }

    }

这个看了下,没有地方可以设置,主要是ES Client的问题,ThreadFactory直接传入null使用了默认值,导致无法配置。

    public DefaultConnectingIOReactor(final IOReactorConfig config) throws IOReactorException {
        this(config, null);
    }

总结

  1. 因为docker容器没有对cpu进行资源隔离,所以触发了句柄耗尽的问题,其实应该对taskmanager容器进行修改的,不过因为一些情况限制,选择对HttpAsyncClient Sub Reactor线程池的大小进行限制,从默认CPU个数减少到合适的值。
  2. Flink ES Connector目前为止还是同步写入的,这样ElasticSearch RestHighLevelClient中HttpAsyncClient Sub Reactor线程池默认使用CPU核数的行为是不是浪费?每个Sink都是同步写入的话,并发为1,那么Sub Reactor线程池可以设置为1么?
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351

推荐阅读更多精彩内容