Java8(6):使用并行流

对于斐波那契数的计算,我们都知道最容易理解的就是递归的方法:

斐波那契公式
public long recursiveFibonacci(int n) {
    if (n < 2) {
        return 1;
    }

    return recursiveFibonacci(n - 1) + recursiveFibonacci(n - 2);
}

当然这个递归也可以转化为迭代:

public long iterativeFibonacci(int n) {
    long n1 = 1, n2 = 1;
    long fi = 2; // n1 + n2

    for (int i = 2; i <= n; i++) {
        fi = n1 + n2;
        n1 = n2;
        n2 = fi;
    }

    return fi;
}

但是,对于以上两种方法,并不能并行化,因为后一项的值依赖于前一项,使得算法流程是串行的。所以引出了可以并行的计算斐波那契数的公式:

并行计算斐波那契数的公式

=>

并行计算斐波那契数的公式

f0f1 都是 1 —— 很明显我们可以对

(1, 1; 1, 0) ^ n
进行并行计算。


首先我们定义一个 Matrix 类,用来表示一个 2*2 的矩阵:

public class Matrix {

    /**
     * 左上角的值
     */
    public final BigInteger a;
    /**
     * 右上角的值
     */
    public final BigInteger b;
    /**
     * 左下角的值
     */
    public final BigInteger c;
    /**
     * 右下角的值
     */
    public final BigInteger d;

    public Matrix(int a, int b, int c, int d) {
        this(BigInteger.valueOf(a), BigInteger.valueOf(b),
                BigInteger.valueOf(c), BigInteger.valueOf(d));
    }

    public Matrix(BigInteger a, BigInteger b, BigInteger c, BigInteger d) {
        this.a = a;
        this.b = b;
        this.c = c;
        this.d = d;
    }

    /**
     * multiply
     *
     * @param m multiplier
     * @return
     */
    public Matrix mul(Matrix m) {
        return new Matrix(
                a.multiply(m.a).add(b.multiply(m.c)), // a*a + b*c
                a.multiply(m.b).add(b.multiply(m.d)), // a*b + b*d
                c.multiply(m.a).add(d.multiply(m.c)), // c*a + d*c
                c.multiply(m.b).add(d.multiply(m.d)));// c*b + d*d
    }

    /**
     * power of exponent
     *
     * @param exponent
     * @return
     */
    public Matrix pow(int exponent) {
        Matrix matrix = this.copy();

        for (int i = 1; i < exponent; i++) {
            matrix = matrix.mul(this);
        }

        return matrix;
    }

    public Matrix copy() {
        return new Matrix(a, b, c, d);
    }

}

然后我们来比较迭代和并行的效率:

我们先设置并行使用的线程数为 1,即单线程。

public static void main(String[] args) throws Exception {
    final int ITEM_NUM = 500000; // 计算斐波那契数列的第 ITEM_NUM 项

    System.out.println("开始迭代计算...");
    long begin = System.nanoTime();

    BigInteger fi1 = iterativeFibonacci(ITEM_NUM);

    long end = System.nanoTime();
    double time = (end - begin) / 1E9;
    System.out.printf("迭代计算用时: %.3f\n\n", time);

    /* ------------------------------ */
    System.out.println("开始并行计算...");
    begin = System.nanoTime();

    BigInteger fi2 = parallelFibonacci(ITEM_NUM, 1);

    end = System.nanoTime();
    time = (end - begin) / 1E9;
    System.out.printf("并行计算用时: %.3f\n\n", time);

    System.out.println("fi1 == fi2:" + (fi1.equals(fi2)));
}

static BigInteger iterativeFibonacci(int n) {
    BigInteger n1 = BigInteger.ONE;
    BigInteger n2 = BigInteger.ONE;
    BigInteger fi = BigInteger.valueOf(2); // n1 + n2

    for (int i = 2; i <= n; i++) {
        fi = n1.add(n2);
        n1 = n2;
        n2 = fi;
    }

    return fi;
}

static BigInteger parallelFibonacci(int itemNum, int threadNum) throws Exception {
    final Matrix matrix = new Matrix(1, 1, 1, 0);
    final Matrix primary = new Matrix(1, 0, 1, 0); // (f0, 0; f1, 0)
    final int workload = itemNum / threadNum;      // 每个线程要计算的 相乘的项数
    // (num / threadNum) 可能存在除不尽的情况,所以最后一个任务计算所有剩下的项数
    final int lastWorkload = itemNum - workload * (threadNum - 1);

    List<Callable<Matrix>> tasks = new ArrayList<>(threadNum);
    for (int i = 0; i < threadNum; i++) {
        if (i < threadNum - 1) {
            // 为了简洁,使用 Lambda 表达式替代要实现 Callable<Matrix> 的匿名内部类
            tasks.add(() -> matrix.pow(workload)); 
        } else {
            tasks.add(() -> matrix.pow(lastWorkload));
        }
    }

    ExecutorService threadPool = Executors.newFixedThreadPool(threadNum);
    List<Future<Matrix>> futures = threadPool.invokeAll(tasks); // 执行所有任务,invokeAll 会阻塞直到所有任务执行完毕

    Matrix result = primary.copy();
    for (Future<Matrix> future : futures) { // (matrix ^ n) * (f0, 0; f1, 0)
        result = result.mul(future.get());
    }

    threadPool.shutdown();

    return result.c;
}
并行的线程数为 1 的结果

可以看到单线程情况下,使用矩阵运算的效率大概只有迭代计算的 1/3 左右 —— 既然如此,那我们耍流氓的把并行的线程数改为 10 线程吧:

BigInteger fi2 = parallelFibonacci(ITEM_NUM, 10); // 10 线程并行计算
并行的线程数为 10 的结果

此时并行计算的用时碾压了迭代计算 —— 迭代计算委屈的哭了,并行计算这流氓耍的相当漂亮。


好像有点不对劲,我这篇文章的标题似乎是 使用并行流 —— 并行流呢?

其实前面都是铺垫 :) 在 parallelFibonacci 方法中,我们使用了线程池来并行的执行任务,我们来尝试将 parallelFibonacci 改为流式(即基于 Stream)风格的代码:

static BigInteger streamFibonacci(int itemNum, int threadNum) {
    final Matrix matrix = new Matrix(1, 1, 1, 0);
    final Matrix primary = new Matrix(1, 0, 1, 0);
    final int workload = itemNum / threadNum;
    final int lastWorkload = itemNum - workload * (threadNum - 1);

    // 流式API
    return IntStream.range(0, threadNum)    // 产生 [0, threadNum) 区间,用于将任务切分
            .parallel()                     // 使流并行化
            .map(i -> i < threadNum - 1 ? workload : lastWorkload)
            .mapToObj(w -> matrix.pow(w))   // map    ->  mN = matrix ^ workload
            .reduce((m1, m2) -> m1.mul(m2)) // reduce ->  m = m1 * m2 * ... * mN
            .map(m -> m.mul(primary))       // map    ->  m = m * primary
            .get().c;                       // get    ->  m.c
}

依旧在 10 线程的环境下运行下看看:

public static void main(String[] args) throws Exception {
    ...

    /* ------------------------------ */
    System.out.println("开始流式并行计算...");
    begin = System.nanoTime();

    BigInteger fi3 = streamFibonacci(ITEM_NUM, 10);

    end = System.nanoTime();
    time = (end - begin) / 1E9;
    System.out.printf("流式并行计算用时: %.3f\n\n", time);
    
    System.out.println("fi1 == fi2:" + (fi1.equals(fi2)));
    System.out.println("fi1 == fi3:" + (fi1.equals(fi3)));
}
10 线程并行计算的结果

是的,使用并行流就是这么的简单,只要你会使用 Stream API —— 给它加上 .parallel() —— 它就并行化了。写了这么多年的 Java 代码,从 Java6 到 Java7 再到 Java8,这一刻,我真的感动了(容我擦擦眼泪)。

而且我们可以看到,在线程数相同的情况下,使用 streamFibonacci(并行流)时,用时要比parallelFibonacci 方法更短。为了验证,我夸张一点,将线程数提高到 32:

BigInteger fi2 = parallelFibonacci(ITEM_NUM, 32);

...

BigInteger fi3 = streamFibonacci(ITEM_NUM, 32);
32 线程并行计算的结果

可以看到,此时 parallelFibonacci 的运行时间反而比 10 线程的时候更长了,而 streamFibonacci 使用的时间却更短了 —— 流式 API 厉害了!

但这是什么原因呢?这个问题留给有兴趣的读者思考和探究吧。


值得注意的是,并行流的底层实现是基于 ForkJoinPool 的,并且使用的是一个共享的 ForkJoinPool —— ForkJoinPool.commonPool()。为了充分利用处理器资源和提升程序性能,我们应该尽量使用并行流来执行 CPU 密集的任务,而不是 IO 密集的任务 —— 因为共享池中的线程数量是有限的,如果共享池中某些线程执行 IO 密集的任务,那么这些线程将长时间处于等待 IO 操作完成的状态,一旦共享池中的线程耗尽,那么程序中其他想继续使用并行流的地方就需要等待,直到有空闲的线程可用,这会在很大程度上影响到程序的性能。所以使用并行流之前,我们要注意到这个细节。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352

推荐阅读更多精彩内容

  • Java8 in action 没有共享的可变数据,将方法和函数即代码传递给其他方法的能力就是我们平常所说的函数式...
    铁牛很铁阅读 1,225评论 1 2
  • 从三月份找实习到现在,面了一些公司,挂了不少,但最终还是拿到小米、百度、阿里、京东、新浪、CVTE、乐视家的研发岗...
    时芥蓝阅读 42,230评论 11 349
  • 一、多线程 说明下线程的状态 java中的线程一共有 5 种状态。 NEW:这种情况指的是,通过 New 关键字创...
    Java旅行者阅读 4,673评论 0 44
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 171,979评论 25 707
  • 他的歌很好听,他唱过《消愁》《借》《向我这样的人》《如果有一天我变得很有钱》……今天讲的就是他的歌如果有喜欢的就在...
    不易粉丝阅读 147评论 0 0