java 并行处理

参考链接

概述

所谓的并行处理其目的就是为了提升原先串行处理的速度,从原理上来讲提升最明显的属于串行处理的逻辑之间没有共享资源的情况(或则说没有资源竞争的情况存在,基于多核服务器环境下);
从本质上看,其实现方式是将单线程任务切分为多个分片,由不同的线程来处理各个分片,充分利用资源以达提升处理速度的目的;由于使用多线程处理方式,故必定会增加线程间的通信(如每个线程的结果状态及结果数据的汇总)
原串行逻辑:
A -> B -> C -> D -> END
并行逻辑:
A
B
-> END
C
D

JAVA实现方式

  • 多线程方式
    启动多个线程进行任务处理,待处理完成后进行汇总
    简单代码:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;

public class ParallelTask {

    private final static int DEALCOUNT = 10;
    private final static ExecutorService exec = Executors.newCachedThreadPool();
    
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ParallelTask parallel = new ParallelTask();
        List<Future<String>> futureList = new ArrayList<Future<String>>();
        long start_ts = System.nanoTime();
        for (int i = 0; i < DEALCOUNT; i++) {
            final int taskNum = i;
            futureList.add(exec.submit(parallel.new Task(taskNum)));
        }
        List<String> retList = new ArrayList<String>();
        ThreadPoolExecutor checkExec = (ThreadPoolExecutor)exec;
        while(checkExec.getCompletedTaskCount() < futureList.size()){
            Thread.sleep(10L);
        }
        for(Future<String> f : futureList){
            retList.add(f.get());
        }
        long end_ts = System.nanoTime();
        System.out.println("Speed times : " + (end_ts - start_ts)/1000000);
        exec.shutdown();
        for(String s : retList){
            System.out.println(s);
        }
    }
    
    class Task implements Callable<String>{
        
        private int taskNum;
        
        public Task(int num){
            this.taskNum = num;
        }

        @Override
        public String call() throws Exception {
            Thread.sleep(1000L);
            return taskNum + "_SUCC.";
        }
    }
}
  • CountDownLatch
    允许一个或多个线程一直等待,直到其他线程执行完后再执行;通过一个计数器来实现的,计数器的初始化值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就相应得减1。当计数器到达0时,表示所有的线程都已完成任务,然后在闭锁上等待的线程就可以恢复执行任务。
    伪代码:
Main thread start
Create CountDownLatch for N threads
Create and start N threads
Main thead wait on latch
N threads completes there tasks are returns
Main thread resume execution

简单代码1:

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CountDownLatchDemo {
    private final static int DEALCOUNT = 10;
    private final static ExecutorService exec = Executors.newCachedThreadPool();
    public static void main(String[] args) throws Exception {
        long start_ts = System.nanoTime();
        final CountDownLatch countDownLatch = new CountDownLatch(DEALCOUNT);
        Map<Integer,String> dealRet = new HashMap<Integer,String>();
        for (int i = 0; i < DEALCOUNT; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    doTask(threadNum,dealRet);
                } catch (Exception e) {
                    System.out.println("exception : " + e.getMessage());
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        //等待处理结束
        countDownLatch.await();//全部结束
        //countDownLatch.await(2, TimeUnit.SECONDS);//等待一段时间
        long end_ts = System.nanoTime();
        System.out.println("DEAL END;Speed times : " + (end_ts - start_ts)/1000000 );
        exec.shutdown();
        //show result
        showResult(dealRet);
    }
    
    private static void doTask(int threadNum,Map<Integer,String> dealRet) throws Exception {
        Thread.sleep(3000);
        System.out.printf("Deal No : %s\n", threadNum);
        if( null != dealRet ){
            dealRet.put(threadNum, "SUCC");
        }
        Thread.sleep(100);
    }
    
    private static void showResult(Map<Integer,String> retMap){
        if( null != retMap ){
            Iterator<Map.Entry<Integer,String>> entries = retMap.entrySet().iterator();
            while( entries.hasNext() ){
                Map.Entry<Integer,String> entry = entries.next();
                System.out.println("Key : " + entry.getKey() + " | Value : " + entry.getValue());
            }
        }
    }
}

简单代码2:

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchTask {
    private final static int DEALCOUNT = 10;
    private final static ExecutorService exec = Executors.newCachedThreadPool();
    public static void main(String[] args) throws Exception {
        long start_ts = System.nanoTime();
        final CountDownLatch countDownLatch = new CountDownLatch(DEALCOUNT);
        Map<String,Object> dealRet = new HashMap<String,Object>();
        CountDownLatchTask svr = new CountDownLatchTask();
        for (int i = 0; i < DEALCOUNT; i++) {
            final int threadNum = i;
            WorkTask<CountDownLatchTask> work = svr.new WorkTask<CountDownLatchTask>();
            Object[] methArgs = new Object[]{threadNum};
            Class[] parameterTypes = new Class[]{int.class};
            String methodName = "doTask";
            String retKey = threadNum + "_ret";
            work.initTask(svr, methArgs, parameterTypes, methodName, retKey, dealRet, countDownLatch);
            exec.execute(work);
        }
        //等待处理结束
        countDownLatch.await();//全部结束
        //countDownLatch.await(2, TimeUnit.SECONDS);//等待一段时间
        long end_ts = System.nanoTime();
        System.out.println("DEAL END;Speed times : " + (end_ts - start_ts)/1000000 );
        exec.shutdown();
        //show result
        showResult(dealRet);
    }
    
    class WorkTask<T> implements Runnable{
        private CountDownLatch countDownLatch = null;
        private T obj;
        private Object[] args;
        private Class[] parameterTypes;
        private String methodName;
        private String retKey;
        private Map<String,Object> retMap;
        
        public void initTask(T t,Object[] args,Class[] parameterTypes,String method,String retKey,Map<String,Object> retMap,CountDownLatch latch){
            this.obj = t;
            this.args = args;
            this.parameterTypes = parameterTypes;
            this.methodName = method;
            this.retKey = retKey;
            this.retMap = retMap;
            this.countDownLatch = latch;
        }
        
        @Override
        public void run() {
            try{
                Method method = this.obj.getClass().getMethod(methodName, parameterTypes);
                Object retObj = method.invoke(this.obj, args);
                if( null != retMap && null != retKey ){
                    retMap.put(retKey, retObj);
                }
            }catch (Exception e){
                System.out.println("Task Run Exception." + e.getMessage());
            }finally{
                if( null != countDownLatch ){
                    countDownLatch.countDown();
                }
            }   
        }
    }
    
    public String doTask(int threadNum) throws Exception {
        Thread.sleep(3000);
        return threadNum + "_SUCC.";
    }
    
    private static void showResult(Map<String,Object> retMap){
        if( null != retMap ){
            Iterator<Map.Entry<String,Object>> entries = retMap.entrySet().iterator();
            while( entries.hasNext() ){
                Map.Entry<String,Object> entry = entries.next();
                System.out.println("Key : " + entry.getKey() + " | Value : " + entry.getValue());
            }
        }
    }
}
  • ForkJoinPool(JAVA 7)
    用于实现“分而治之”的算法,特别是分治之后递归调用的函数,最适合的是计算密集型的任务;fork/join框架的目的是以递归方式将可以并行执行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。
    代码样例1:
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.stream.LongStream;

public class ForkJoinPoolDemo {
    
    private static ForkJoinPool pool = new ForkJoinPool(4);

    public static void main(String[] args) throws InterruptedException {
         long[] numbers = LongStream.rangeClosed(1, 1000).toArray();
         long start_ts = System.nanoTime();
         long lRet = pool.invoke(new SumTask(numbers,0,numbers.length-1));
         long end_ts = System.nanoTime();
         System.out.println(pool.getPoolSize());//线程池的大小
         System.out.println("Result : " + lRet + " | Speed : " + (end_ts - start_ts)/1000000);
         lRet = 0;
         start_ts = System.nanoTime();
         for(long lNum : numbers){
             lRet += lNum;
            //其他计算任务
             Thread.sleep(10L);
         }
         end_ts = System.nanoTime();
         System.out.println("Result : " + lRet + " | Speed : " + (end_ts - start_ts)/1000000);
         
    }
    
    private static class SumTask extends RecursiveTask<Long> {
        private long[] numbers;
        private int from;
        private int to;

        public SumTask(long[] numbers, int from, int to) {
            this.numbers = numbers;
            this.from = from;
            this.to = to;
        }

        @Override
        protected Long compute() {
            // 当需要计算的数字小于32时,直接计算结果
            if (to - from < 32) {
                long total = 0;
                for (int i = from; i <= to; i++) {
                    total += numbers[i];
                    try {
                        //其他计算任务
                        Thread.sleep(10L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                return total;
            // 否则,把任务一分为二,递归计算
            } else {
                int middle = (from + to) / 2;
                SumTask taskLeft = new SumTask(numbers, from, middle);
                SumTask taskRight = new SumTask(numbers, middle+1, to);
                taskLeft.fork();//开启一个新线程(或是重用线程池内的空闲线程),将任务交给该线程处理
                taskRight.fork();
                return taskLeft.join() + taskRight.join();//等待该任务的处理线程处理完毕,获得返回值
            }
        }
    }
}
  • Parallel Stream(JAVA 8)
    通过parallel()方法,将顺序执行的流转变成一个并行流来处理;并行流就是将一个内容分成多个数据块,并用不同的线程分别处理每个数据块,最后合并每个数据块的结果;其实现机制也是使用到fork/join框架;
    简单代码:
import java.util.stream.LongStream;
import java.util.stream.Stream;

public class ParallelStreamDemo {
    public static void main(String[] args) throws InterruptedException {
        long dealNum = 10000 * 1000l;
        doSumIterate(dealNum);
        Thread.sleep(1000L);
        doSumRange(dealNum);
        Thread.sleep(1000L);
        doSumByForEach(dealNum);
    }
    
    public static void doSumIterate(long n){
        long start_ts = System.nanoTime();
        Long lRet = Stream.iterate(1L, i -> i + 1).limit(n).reduce(0L, Long::sum);
        long end_ts = System.nanoTime();
        System.out.println("Sum From 0 To " + n  + " Result : " + lRet + ";Speed : " + (end_ts-start_ts)/1000000);
    }
    
    public static void doSumRange(long n){
        long start_ts = System.nanoTime();
        Long lRet = LongStream.rangeClosed(1, n).parallel().reduce(0L, Long::sum);
        long end_ts = System.nanoTime();
        System.out.println("Sum From 0 To " + n  + " Result : " + lRet + ";Speed : " + (end_ts-start_ts)/1000000);
    }
    
    public static void doSumByForEach(long n){
        long[] numbers = LongStream.rangeClosed(1, n).toArray();
        long start_ts = System.nanoTime();
        Long lRet = 0L;
        for(long lNum : numbers){
             lRet += lNum;
         }
        long end_ts = System.nanoTime();
        System.out.println("Sum From 0 To " + n  + " Result : " + lRet + ";Speed : " + (end_ts-start_ts)/1000000);
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,744评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,505评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,105评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,242评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,269评论 6 389
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,215评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,096评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,939评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,354评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,573评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,745评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,448评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,048评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,683评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,838评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,776评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,652评论 2 354

推荐阅读更多精彩内容