记录一次Flink作业异常的排查过程

最近2周开始接手apache flink全链路监控数据的作业,包括指标统计,业务规则匹配等逻辑,计算结果实时写入elasticsearch. 昨天遇到生产环境有作业无法正常重启的问题,我负责对这个问题进行排查跟进。

第一步,基础排查

首先拿到jobmanager和taskmanager的日志,我从taskmanager日志中很快发现2个基础类型的报错,一个是npe,一个是索引找不到的异常

elasticsearch sinker在执行写入数据的前后提供回调接口让作业开发人员对异常或者成功写入进行处理,如果在处理异常过程中有异常抛出,那么框架会让该task失败,导致作业重启。

npe很容易修复,索引找不到是创建索引的服务中的一个小bug,这些都是小问题。

重点是在日志中我看到另一个错误:

java.lang.OutOfMemoryError: unable to create new native thread
    at java.lang.Thread.start0(Native Method)
    at java.lang.Thread.start(Unknown Source)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.<init>(RecordWriter.java:122)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.createRecordWriter(RecordWriter.java:321)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriter(StreamTask.java:1202)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.createRecordWriters(StreamTask.java:1170)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:212)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:190)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.<init>(OneInputStreamTask.java:52)
    at sun.reflect.GeneratedConstructorAccessor4.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
    at java.lang.reflect.Constructor.newInstance(Unknown Source)
    at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1405)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:689)
    at java.lang.Thread.run(Unknown Source)

这种异常,一般是nproc设置太小导致的,或者物理内存耗尽,检查完ulimit和内存,发现都很正常,这就比较奇怪了。

第二步、分析jstack和jmap

perfma有一个产品叫xland,我也是第一次使用,不得不说,确实牛逼,好用!
首先把出问题的taskmanager的线程栈信息和内存dump出来,具体命令:

jstatck pid > 生成的文件名
jmap -dump:format=b,file=生成的文件名 进程号

接着把这两个文件导入xland,xland可以直接看到线程总数,可以方便搜索统计线程数、实例个数等等

最先发现的问题是这个taskmanager 线程总数竟然有17000+,这个数字显然有点大,这个时候我想看一下,哪一种类型的线程比较大,xland可以很方便的搜索,统计,这时候我注意到有一种类型的线程非常多,总数15520

image.png

更上层的调用信息看不到了,只看到来自apache http client,根据作业流程,首先想到的就是es sinker的RestHighLevelClient用到这个东西

那么我们在xland中统计RestHighLevelClient对象个数,发现有几百个,很显然这里有问题

第三步、定位具体问题

有了前面xland的帮助,我们很容易定位到是esclient出了问题
在我们的作业里面有2个地方用到了es client,一个是es sinker,es sinker使用的就是RestHighLevelClient,另一个是我们同学自己写的一个es client,同样是使用RestHighLevelClient,在es sinker的ElasticsearchSinkFunction中单独构造,用于在写入es前,先搜索一些东西拿来合并,还做了cache

1、怀疑RestHighLevelClient bug

我们通过一个测试,来验证是不是RestHighLevelClient的问题

启动一个单纯使用es sinker的job,调整并发度,观察前面出现较多的
I/O dispatcher线程的个数,最后发现单个es sinker也会有240+个
I/O dispatcher线程,通过调整并发,所有taskmanager的
I/O dispatcher线程总数基本和并发成正向比例
停掉写es作业,此时所有taskmanager是不存在I/O dispatcher线程的

看起来I/O dispatcher那种线程数量大,似乎是“正常的”

2、杀掉作业,观察线程是否被正常回收
杀掉作业,I/O dispatcher线程变成0了,看起来es sinker使用是正常的

这时候基本上可以判断是我们自己写的es client的问题。到底是什么问题呢?

我们再做一个测试进一步确认

3、启动问题作业,杀死job后,观察I/O dispatcher线程个数
重启flink的所有taskmanager,给一个“纯净”的环境,发现杀死作业后,还有I/O dispatcher线程。
这个测试可以判断是我们的es client存在线程泄漏

四、背后的原理

es sinker本质上是一个RichSinkFunction,RichSinkFunction带了open 和close 方法,在close方法中,es sinker正确关闭了http client

@Override
    public void close() throws Exception {
        if (bulkProcessor != null) {
            bulkProcessor.close();
            bulkProcessor = null;
        }

        if (client != null) {
            client.close();
            client = null;
        }

        callBridge.cleanup();

        // make sure any errors from callbacks are rethrown
        checkErrorAndRethrow();
    }

而我们的es client是没有被正确关闭的。

具体原理应该是是这样的,当es sinker出现npe或者写es rejected等异常时,job会被flink重启,es sinker这种RichSinkFunction类型的算子会被flink 调用close关闭释放掉一些资源,而我们写在ElasticsearchSinkFunction中es client,是不会被框架关照到的,而这种写法我们自己也无法预先定义重启后关闭client的逻辑.

如果在构造时使用单例,理论上应该是可以避免作业反复重启时es client不断被构造导致线程泄漏和内存泄漏的,但是编写单例写法有问题,虽然有double check,但是没加volatile,同时锁的是this, 而不是类。

五、小结

1、xland确实好用,排查问题帮助很大
2、flink作业用到的外部客户端不要单独构造,要使用类似RichFunction这种方式,提供open,close方法,确保让资源能够被flink正确释放掉。
3、用到的对象,创建的线程,线程池等等最好都起一个名字,方便使用xland事后排查问题,如果有经验的话,应该一开始就统计下用于构造es client的那个包装类对象个数。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351