一个线上bug引出的parallelStream() for循环背后面的ForkJoin ...

背景

公司的一个ETL项目,主要是从Blob上的CSV文件和HDFS平台下载数据并解析后入到业务的Mysql
数据量大概一个小时20个文件左右(基本集中到每个小时的50分左右),每个文件8~20万条数据量,分别入到不同的表, 我们在入库的时候是把文件解析后分成1000条一批批量插入(篇幅有限,这里只聊入库的场景)。
用的是jdk1.8的Stream.parallel()的方式并发入库。

问题

运行一段时间后发现随着文件量的增加,入库时间越来越长,分析发现入库线程每个实例入库线程大概8个左右,线程占用满了就相互等待。

问题排查

入库工具代码如下

//由于插入数据量太大,这里做分段批量插入处理 1000个一批
List<List<T>> dayList = ...

long start = System.currentTimeMillis();
AtomicInteger atomicInteger = new AtomicInteger(0);
//这里异步入库
dayList.parallelStream().forEach(list->{
    Integer integer;
    try {
        //具体的插入库的方法
        integer = function.apply(list);
    } catch (Exception e) {
        log.error("[ListPageHelper.saveBatchOrSize]-------->{}",e.toString());
        //线程池连接超时再试一次
        if (e instanceof CannotAcquireLockException) {
            log.error("[CannotAcquireLockException]-------->{}",e.toString());
            integer = function.apply(it);
        } else {
            throw e;
        }
    }
    atomicInteger.addAndGet(integer);
});
long end = System.currentTimeMillis();
log.info("数据量--------->{},耗时-------->{}",list.size(),(end - start));

之前了解过ForkJoin,实际开发中也没怎么用过,也知道java的Stream.parallel()底层用的是ForkJoin, 但是具体没有看过,刚好借此机会了解一下,就跟着源码看了一下 发现了几个问题

  1. ForkJoinPool 总的工作线程个数是 (cpu*2-1) + main 个线程
  2. ForkJoinPool变量是全局的,也就是说如果不自己创建整个项目就用这 cpu*2个线程来处理。
  3. forkjoin 的过程是先fork完后再一一执行 ,其他的需要等待所有线程内的数据遍历后才会被分配到。 这句话的意思是,比如:我有两个list:list1(1~1000)、list2(1001-2000)要遍历, 如果list1先进行forEach, 那么list2 会等到list1 放出空闲线程的时候才会开始执行。

基于以上导致我们的问题如下:

  1. 同一个时间只能有限个线程入库(这个量有点少)
  2. 虽然同一个文件的数据是并发入库的,但是不同文件之前并不是并发入库的(这样不符合我们的业务逻辑,我们业务是所有维度的数据入库完成才对业务有用)

改进思路(这里只看入库逻辑)

  1. 短期应急策略(改动量最小): 依然用这个模式,但是需要不同文件之间,能并行处理。
  2. 长期策略需要重构: 重构成生产者消费者模式,能自由控制并发下载和并发消费。

问题解决

方法一:
提高全局ForkJoinPool的线程数量,这样虽然会提高入库速度依然存在文件之间不能并发执行的问题。

//这个可以给ForkJoinPool 全局设置20个线程
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");

方法二:

  1. 每次执行的时候新建个ForkJoinPool。
  2. 把Stream.parallel() 里面的任务Join到新建的ForkJoinPool里面(源码角度后面分析)。 代码如下 代码如下:
//由于插入数据量太大,这里做分段批量插入处理 1000个一批
List<List<T>> dayList = ......

long start = System.currentTimeMillis();
AtomicInteger atomicInteger = new AtomicInteger(0);
//这里异步入库
ForkJoinPool forkJoinPool = new ForkJoinPool(4);

forkJoinPool.submit(()->{
    dayList.parallelStream().filter(CollectionUtils::isNotEmpty).forEach(list->{
        Integer integer;
        try {
            //具体的插入库的方法
            integer = function.apply(list);
        } catch (Exception e) {
            log.error("[ListPageHelper.saveBatchOrSize]-------->{}",e.toString());
            //线程池连接超时再试一次
            if (e instanceof CannotAcquireLockException) {
                log.error("[CannotAcquireLockException]-------->{}",e.toString());
                integer = function.apply(it);
            } else {
                throw e;
            }
        }
        atomicInteger.addAndGet(integer);
    });
}).join();
forkJoinPool.shutdown();

long end = System.currentTimeMillis();
log.info("数据量--------->{},耗时-------->{}",list.size(),(end - start));

疑问

为什么在ForkJoinPool的submit 方法里面用parallelStream开启线程会占用ForkJoinPool的线程池里的线程数量? parallelStream的原理这里就不展开说明,这里只分析线程内部是怎么共用ForkJoinPool里面的线程池的。

Fork/Join框架中几个重要的类介绍

ForkJoinPool: 实现自ExecutorService是线程的执行器,其他的一些线程池也都是ExecutorService的子类。
ForkJoinTask: 实现自Future,可以看成是任务本身。
ForkJoinWorkerThread: 是Thread的子类,主要负责执行Runnable任务。

严格意义上ForkJoinTask并不是任务本身,由于他没有实现Runnable接口,但是他的子类AdaptedRunnableAction实现了Runnable,这里是适配器模式赋予ForkJoinFask任务的执行逻辑Runnable.

ForkJoinPool主要是由WorkQueue[]组成,WorkQueue队列里面存的是ForkJoinTask[]和ForkJoinWorkerThread。 而ForkJoinWorkerThread持有ForkJoinPool和WorkQueue的引用。

关系图如下

ForkJoin结构图

源码调用逻辑图

ForkJoin流程图

说明
submit流程

  1. ForkJoinPool#submit() 通过适配类ForkJoinTask.AdaptedRunnableAction创建任务并提交。

  2. ForkJoinPool#externalPush() 这里判断workQueues是否存在,如果不存在创建WorkQueue[]

  3. ForkJoinPool#signalWork() 如果活跃线程数量少创建工作线程。

  4. ForkJoinPool#createWorker() 创建工作线程。

  5. ForkJoinWorkerThread(ForkJoinPool pool)

    • 创建ForkJoinWorkerThread对象并持有ForkJoinPool的引用。
    • 调用pool.registerWorker(this)获得workQueue对象。
  6. registerWorker(ForkJoinWorkerThread wt) 把线程对象注册到WorkQueue,并且把workQueue添加到ForkJoinPoolWorkQueue[]中。

  7. WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) WorkQueue对象持有 ForkJoinPool、ForkJoinWorkerThread对象。

fork流程

  1. ForkJoinTask<V> fork() 调用fork()方法。

  2. 判断调用fork()方法的线程是否是ForkJoinWorkerThread,

    • 如果是直接调用当前线程的workQueue.push方法 (这里就是为啥,parallelStream开启线程会占用ForkJoinPool线程池的数量)
    • 如果不是调用全局的ForkJoinPool.common .externalPush(this)
  3. WorkQueue#push(ForkJoinTask<?> task) 这个push方法并不是把任务加入到当前线程的WorkQueue,而是调用ForkJoinPool#signalWork()方法添加到ForkJoinPool中重新分配到工作线程中的WorkQueue。

join流程

  1. ForkJoinTask#join() 调用doJoin().
  2. doJoin()调用exec()中真正的执行分片任务的逻辑(这里就不展开细说了)
  3. getRawResult() 获取执行的结果

以上就是ForkJoin的大概源码的逻辑,解决上面疑问的逻辑主要是在fork流程的第二步中,判断当前线程是否是ForkJoinWorkerThread类型的,如果是就把当前线程加入到线程池中,而并发流parallelStream()中的创建线程提交任务逻辑就是调用fork()方法。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,809评论 6 513
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,189评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 167,290评论 0 359
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,399评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,425评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,116评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,710评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,629评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,155评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,261评论 3 339
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,399评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,068评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,758评论 3 332
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,252评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,381评论 1 271
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,747评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,402评论 2 358

推荐阅读更多精彩内容