ForkjoinPool -2

externalSubmit

externalSubmit是externalPush的完整版本,从externalPush源码中可以看到,它把很多“疑难杂症”都交给externalSubmit处理,自己仅处理简单的情况。前面externalPush中的第一个if有这样一个条件:&&U.compareAndSwapInt(q, QLOCK, 0, 1),如果执行CAS失败,难道就要把任务抛弃?显然不是,所以看一下externalSubmit是怎么处理这种情况的。
从externalPush的注释中,我们知道每个线程有自己的probe,通过probe,每个线程跟一个队列绑定。externalSubmit的策略比较简单,就是之前在externalPush中由于执行CAS失败而无法push的任务,在externalSubmit再执行一次CAS,如果成功则把任务放入线程对应的队列,如果失败说明该队列比较繁忙,所以externalSubmit就给该线程换一个probe,从而给该线程换一个队列。显然,这个过程是循环的,即每次执行CAS失败externalSubmit就会给该线程换一个队列,直到执行CAS成功为止。

    private void externalSubmit(ForkJoinTask<?> task) {
        int r;                                    // initialize caller's probe
        if ((r = ThreadLocalRandom.getProbe()) == 0) {
            ThreadLocalRandom.localInit();
            r = ThreadLocalRandom.getProbe();
        }
        for (;;) {
            WorkQueue[] ws; WorkQueue q; int rs, m, k;
            boolean move = false;
            // runState<0 此时 对应的状态是terminate 
            if ((rs = runState) < 0) {
                tryTerminate(false, false);     // help terminate
                throw new RejectedExecutionException();
            }
        // 如果 runState 的starter 位 对应是0 , 说明还没有初始化,所以要初始化,并且启动它
            else if ((rs & STARTED) == 0 ||     // initialize
                     ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
                int ns = 0;
                rs = lockRunState();
                try {
                    if ((rs & STARTED) == 0) {
                        U.compareAndSwapObject(this, STEALCOUNTER, null,
                                               new AtomicLong());
                        // create workQueues array with size a power of two
                       // workqueque 的长度 是2 byte 
                       //config 高16位是mode(FIFO或者LIFO),低16位是ForkJoinPool.workQueues的下标;
                       // 低16位永远是2 的幂次方,p 永远是 一个1 ,后面一堆0 
                        int p = config & SMASK; // ensure at least 2 slots
                        int n = (p > 1) ? p - 1 : 1;
                        n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                        n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                       //这一串眼花缭乱的操作 其实就是要把int 低16位,从最左端第一个1 ,到最右端 全部设置成1
                      // for example n ->  10000000,  n|n>>>1 -> 11000000, n|n>>>2 -> 11110000, 
                      // n|n>>>4 ->11111111, 其他的操作就没有作用了。
                        workQueues = new WorkQueue[n];
                        ns = STARTED;
                    }
                } finally {
                    unlockRunState(rs, (rs & ~RSLOCK) | ns);
                }
            }
           // externalpush 失败的任务会在这里再次提交
            else if ((q = ws[k = r & m & SQMASK]) != null) {
                if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                    ForkJoinTask<?>[] a = q.array;
                    int s = q.top;
                    boolean submitted = false; // initial submission or resizing
                    try {                      // locked version of push
                        if ((a != null && a.length > s + 1 - q.base) ||
                            (a = q.growArray()) != null) {
                            int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                            U.putOrderedObject(a, j, task);
                            U.putOrderedInt(q, QTOP, s + 1);
                            submitted = true;
                        }
                    } finally {
                        U.compareAndSwapInt(q, QLOCK, 1, 0);
                    }
                    if (submitted) {
                        signalWork(ws, q);
                        return;
                    }
                }
                move = true;                   // move on failure
            }
          // 创建新队列
            else if (((rs = runState) & RSLOCK) == 0) { // create new queue
                q = new WorkQueue(this, null);
                q.hint = r;
                q.config = k | SHARED_QUEUE;
                q.scanState = INACTIVE;
                rs = lockRunState();           // publish index
                if (rs > 0 &&  (ws = workQueues) != null &&
                    k < ws.length && ws[k] == null)
                    ws[k] = q;                 // else terminated
                unlockRunState(rs, rs & ~RSLOCK);
            }
            else
                move = true;                   // move if busy
            if (move)
                r = ThreadLocalRandom.advanceProbe(r);
        }
    }


externalSubmit和externalPush都使用一个CAS操作来保证同步:
U.compareAndSwapInt(q, QLOCK, 0, 1)
其实forkjoinPool内workQueues数组已经设置的比较大了,为2^16=65536,外部线程对任务的提交只用到其中的偶数部分,但也有32768。通常线程数上百已经算比较多的了,但是相对于32786而言,还是比较小的,所以会发生冲突的几率就很小。这也是为何使用CAS来简单地保证同步的原因。
创建和唤醒worker

唤醒和创建worker:signalWork

    final void signalWork(WorkQueue[] ws, WorkQueue q) {
        long c; int sp, i; WorkQueue v; Thread p;
        //ctl <0 说明小于目标并行度
        while ((c = ctl) < 0L) {                       // too few active
          // 为0 说明 没有闲置的 worker
            if ((sp = (int)c) == 0) {                  // no idle workers
                //  还需要再加worker
                if ((c & ADD_WORKER) != 0L)            // too few workers
                    tryAddWorker(c);
                break;
            }
           //线程未开始运行,未初始化, 或者已经终止
            if (ws == null)                            // unstarted/terminated
                break;
            if (ws.length <= (i = sp & SMASK))         // terminated
                break;
            if ((v = ws[i]) == null)                   // terminating
                break;
            int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
            int d = sp - v.scanState;                  // screen CAS
            long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
            if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
                v.scanState = vs;                      // activate v
                if ((p = v.parker) != null)
                    U.unpark(p);
                break;
            }
            if (q != null && q.base == q.top)          // no more work
                break;
        }
    }

添加worker:tryAddWorker与createWorker

    private void tryAddWorker(long c) {
        boolean add = false;
        do {
            long nc = ((AC_MASK & (c + AC_UNIT)) |
                       (TC_MASK & (c + TC_UNIT)));
            if (ctl == c) {
                int rs, stop;                 // check if terminating
                if ((stop = (rs = lockRunState()) & STOP) == 0)
                    add = U.compareAndSwapLong(this, CTL, c, nc);
                unlockRunState(rs, rs & ~RSLOCK);
                if (stop != 0)
                    break;
                if (add) {
                    createWorker();
                    break;
                }
            }
        } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
    }

    private boolean createWorker() {
        ForkJoinWorkerThreadFactory fac = factory;
        Throwable ex = null;
        ForkJoinWorkerThread wt = null;
        try {
            if (fac != null && (wt = fac.newThread(this)) != null) {
               //  启动了线程
                wt.start();
                return true;
            }
        } catch (Throwable rex) {
            ex = rex;
        }
        // 出现异常,取消注册
        deregisterWorker(wt, ex);
        return false;
    }

registerWorker

    final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
        UncaughtExceptionHandler handler;
// 设置work为守护线程,保证worker 随着main 结束而结束
        wt.setDaemon(true);                           // configure thread
        if ((handler = ueh) != null)
            wt.setUncaughtExceptionHandler(handler);
        WorkQueue w = new WorkQueue(this, wt);
        int i = 0;                                    // assign a pool index
        int mode = config & MODE_MASK;
        int rs = lockRunState();
        try {
            WorkQueue[] ws; int n;                    // skip if no array
            if ((ws = workQueues) != null && (n = ws.length) > 0) {
                int s = indexSeed += SEED_INCREMENT;  // unlikely to collide
                int m = n - 1;
                i = ((s << 1) | 1) & m;               // odd-numbered indices
                if (ws[i] != null) {                  // collision
                    int probes = 0;                   // step by approx half n
                    int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
                    while (ws[i = (i + step) & m] != null) {
                        if (++probes >= n) {
                            workQueues = ws = Arrays.copyOf(ws, n <<= 1);
                            m = n - 1;
                            probes = 0;
                        }
                    }
                }
                w.hint = s;                           // use as random seed
                w.config = i | mode;
                w.scanState = i;                      // publication fence
                ws[i] = w;
            }
        } finally {
            unlockRunState(rs, rs & ~RSLOCK);
        }
        wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
        return w;
    }

ForkJoinThread的run

    public void run() {
        if (workQueue.array == null) { // only run once
            Throwable exception = null;
            try {
                onStart();
                // 从这里开始工作
                pool.runWorker(workQueue);
            } catch (Throwable ex) {
                exception = ex;
            } finally {
                try {
                    onTermination(exception);
                } catch (Throwable ex) {
                    if (exception == null)
                        exception = ex;
                } finally {
                    pool.deregisterWorker(this, exception);
                }
            }
        }
    }

任务执行
线程启动后会执行pool的runWorker:

    final void runWorker(WorkQueue w) {
        w.growArray();                   // allocate queue
        int seed = w.hint;               // initially holds randomization hint
        int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift
        for (ForkJoinTask<?> t;;) {
            if ((t = scan(w, r)) != null)
                w.runTask(t);
            else if (!awaitWork(w, r))
                break;
            r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
        }
    }
        final void runTask(ForkJoinTask<?> task) {
            if (task != null) {
                scanState &= ~SCANNING; // mark as busy
                (currentSteal = task).doExec();
                U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
                execLocalTasks();
                ForkJoinWorkerThread thread = owner;
               // 因为窃取了一个任务,nsteal++
                if (++nsteals < 0)      // collect on overflow
                    transferStealCount(pool);
                scanState |= SCANNING;
                if (thread != null)
                    thread.afterTopLevelExec();
            }
        }


任务扫描(任务窃取)

scan的作用是扫描workQueues,窃取一个task,因为worker刚刚创建,所以worker的工作队列是空的,必须从外面扫一个task来执行。如果扫描来的task在被执行时调用了task.fork,生成新的task,则新的task就会被push到这个worker的工作队列。之后worker会执行自身工作队列中的task。当自身工作队列执行完毕,进入下一次循环,再次扫描task。如果扫描不到task,说明worker,多余了,即现有的worker已经足够执行workQueues中的任务了,此时worker应当休眠,在产生新task时,可能会被唤醒。如果workQueues已经没有多余任务可以执行,说明pool要终止了,则应当终止worker。worker应当休眠还是终止,这个逻辑在awaitWork实现,可查看后续章节

  // w 是worker所在的workqueue, 在奇数队列, r是一个随机种子,用来获取偶数task 队列
   private ForkJoinTask<?> scan(WorkQueue w, int r) {
        WorkQueue[] ws; int m;
        if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
            int ss = w.scanState;                     // initially non-negative
            for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
                WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
                int b, n; long c;
               //如果队列不为空,开始扫描task
                if ((q = ws[k]) != null) {
              //如果有task
                    if ((n = (b = q.base) - q.top) < 0 &&
                        (a = q.array) != null) {      // non-empty
                        long i = (((a.length - 1) & b) << ASHIFT) + ABASE; 
                        // q.base ==b 是确保 base没有变化过,如果有变化,说明这个被别的线程修改了, 也就是说别的线程已经获取了task ,当前试图获取task 的努力就应该放弃了
                        if ((t = ((ForkJoinTask<?>)
                                  U.getObjectVolatile(a, i))) != null &&
                            q.base == b) {
                            // w是激活的
                            if (ss >= 0) {
                            // 如果cas 失败,说明有别的线程在操作这个queue, 需要换一个queue,此时 更改r值, contintue
                                if (U.compareAndSwapObject(a, i, t, null)) {
                                    q.base = b + 1;
                                    //如果剩余任务大于1, 则唤醒worker, 来增加处理能力
                                    if (n < -1)       // signal others
                                        signalWork(ws, q);
                                    return t;
                                }
                            }
                            //如果w未激活,但是又有task ,说明当前task 比较多,当前的worker 处理不过来, 需要激活新的worker 
                            else if (oldSum == 0 &&   // try to activate
                                     w.scanState < 0)
                               // 尝试唤醒休眠的worker ,或者创建worker
                                tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
                        }
                 // 如果取不到task 且 w 未激活
                        if (ss < 0)                   // refresh
                    //w.scanState 重新赋给ss ,这期间有可能有其他线程激活了w
                            ss = w.scanState;
                        r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
              //scan 是从origin 的位置扫描的, 即workQueues[origin], 但是如果队列竞争比较大, 有多个worker 在扫描它, 则需要换一个队列进行扫描,即   r ^= r << 1; r ^= r >>> 3; r ^= r << 10;     origin = k = r & m;   
                   // 最后origin 队列应该是非空而且是竞争比较低的队列
                        origin = k = r & m;           // move and rescan
                        oldSum = checkSum = 0;
                        continue;
                    }
                    checkSum += b;
                }
                  //判断是否是否应该设置worker 为失活。 当k = (k + 1) & m)  = = origin, 说明遍历了一遍,还是没有找到workeQueues 的元素遍历了一遍,还是取不到task
                if ((k = (k + 1) & m) == origin) {    // continue until stable
                //如果是激活状态,则进入if 体,把状态设置为失活, 但是此时是伪失活,只是把scanState 设置到control,而没有让线程真正休眠。线程还会继续扫描任务,当代码再次来到这里,且workQueues 又被遍历了一遍
               // 这时候判断ss = w.scanState 就是判断此时worker有没有被其他线程用signalWorker 或者tryRelease 唤醒, 当满足    oldSum == (oldSum = checkSum) , 这个条件必须满足,当前求导的q必须为空,如果不为空,此条件被更改
          //checkSum 不可能等于oldsum。由于伪失活后,扫描的一直不为空, 则说明worker 繁忙,伪失活的会继续处理,如果 扫到为空的, 说明不忙, 则退出循环由后面的awaitWorker让worker 休眠或者终止。
                    if ((ss >= 0 || (ss == (ss = w.scanState))) &&
                        oldSum == (oldSum = checkSum)) {
                        if (ss < 0 || w.qlock < 0)    // already inactive
                            break;
                        int ns = ss | INACTIVE;       // try to inactivate
                        long nc = ((SP_MASK & ns) |
                                   (UC_MASK & ((c = ctl) - AC_UNIT)));
                        w.stackPred = (int)c;         // hold prev stack top
                        U.putInt(w, QSCANSTATE, ns);
                        if (U.compareAndSwapLong(this, CTL, c, nc))
                            ss = ns;
                        else
                            w.scanState = ss;         // back out
                    }
            // worker 伪失活之后,会重新扫描
                    checkSum = 0;
                }
            }
        }
        return null;
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,080评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,422评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,630评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,554评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,662评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,856评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,014评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,752评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,212评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,541评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,687评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,347评论 4 331
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,973评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,777评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,006评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,406评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,576评论 2 349

推荐阅读更多精彩内容