11 并发框架的搭建

1 业务逻辑

2 框架分析

图片.png

3 类设计

3.1 结果类

装载任务的结果类型

/**
 *
 *类说明:方法本身运行是否正确的结果类型
 */
public enum TaskResultType {
    //方法成功执行并返回了业务人员需要的结果
    Success,
    //方法成功执行但是返回的是业务人员不需要的结果
    Failure,
    //方法执行抛出了Exception
    Exception;
}

装载任务结果

/**
 *
 *类说明:任务处理返回结果实体类
 */
public class TaskResult<R> {
    //方法本身运行是否正确的结果类型
    private final TaskResultType resultType;
    //方法的业务结果数据;
    private final R returnValue;
    //这里放方法失败的原因
    private final String reason;
    
    public TaskResult(TaskResultType resultType, R returnValue, String reason) {
        super();
        this.resultType = resultType;
        this.returnValue = returnValue;
        this.reason = reason;
    }
    
    //方便业务人员使用,这个构造方法表示业务方法执行成功返回的结果
    public TaskResult(TaskResultType resultType, R returnValue) {
        super();
        this.resultType = resultType;
        this.returnValue = returnValue;
        this.reason = "Success";
    }

    public TaskResultType getResultType() {
        return resultType;
    }

    public R getReturnValue() {
        return returnValue;
    }

    public String getReason() {
        return reason;
    }

    @Override
    public String toString() {
        return "TaskResult [resultType=" + resultType 
                + ", returnValue=" + returnValue 
                + ", reason=" + reason + "]";
    }   
    
    
    
}

3.2 工作的任务处理器

接口定义,为调用者提供自定义实现任务逻辑的接口规范

/**
 *
 *类说明:要求框架使用者实现的任务接口,因为任务的性质在调用时才知道,
 *所以传入的参数和方法的返回值均使用泛型
 */
public interface ITaskProcesser<T, R> {
    /**
     * @param data 调用方法需要使用的业务数据
     * @return 方法执行后业务方法的结果
     */
    TaskResult<R> taskExecute(T data);
}

3.3 工作类

同一类工作的定义,同类的工作可能多次执行
包含任务的唯一标识、任务处理器、任务结果队列等信息

/**
 *
 *类说明:提交给框架执行的工作实体类,工作:表示本批次需要处理的同性质任务(Task)的一个集合
 */
public class JobInfo<R> {
    //区分唯一的工作
    private final String jobName;
    //工作的任务个数
    private final int jobLength;
    //这个工作的任务处理器
    private final ITaskProcesser<?,?> taskProcesser;
    //成功处理的任务数
    private final AtomicInteger  successCount;
    //已处理的任务数
    private final AtomicInteger  taskProcesserCount;
    //结果队列,拿结果从头拿,放结果从尾部放
    private final LinkedBlockingDeque<TaskResult<R>> taskDetailQueue;
    //工作的完成保存的时间,超过这个时间从缓存中清除
    private final long expireTime;
    
    //与课堂上有不同,修订为,阻塞队列不应该由调用者传入,应该内部生成,长度为工作的任务个数
    public JobInfo(String jobName, int jobLength, 
            ITaskProcesser<?, ?> taskProcesser,
            long expireTime) {
        super();
        this.jobName = jobName;
        this.jobLength = jobLength;
        this.taskProcesser = taskProcesser;
        this.successCount = new AtomicInteger(0);
        this.taskProcesserCount = new AtomicInteger(0);
        this.taskDetailQueue = new LinkedBlockingDeque<TaskResult<R>>(jobLength);;
        this.expireTime = expireTime;
    }

    public ITaskProcesser<?, ?> getTaskProcesser() {
        return taskProcesser;
    }

    //返回成功处理的结果数
    public int getSuccessCount() {
        return successCount.get();
    }

    //返回当前已处理的结果数
    public int getTaskProcesserCount() {
        return taskProcesserCount.get();
    }
    
    //提供工作中失败的次数,课堂上没有加,只是为了方便调用者使用
    public int getFailCount() {
        return taskProcesserCount.get() - successCount.get();
    }
    
    public String getTotalProcess() {
        return "Success["+successCount.get()+"]/Current["
                +taskProcesserCount.get()+"] Total["+jobLength+"]";
    }
    
    //获得工作中每个任务的处理详情
    public List<TaskResult<R>> getTaskDetail(){
        List<TaskResult<R>> taskList = new LinkedList<>();
        TaskResult<R> taskResult;
        //从阻塞队列中拿任务的结果,反复取,一直取到null为止,说明目前队列中最新的任务结果已经取完,可以不取了
        while((taskResult=taskDetailQueue.pollFirst())!=null) {
            taskList.add(taskResult);
        }
        return taskList;
    }
    
    //放任务的结果,从业务应用角度来说,保证最终一致性即可,不需要对方法加锁.
    public void addTaskResult(TaskResult<R> result,CheckJobProcesser checkJob) {
        if (TaskResultType.Success.equals(result.getResultType())) {
            successCount.incrementAndGet();
        }
        taskDetailQueue.addLast(result);
        taskProcesserCount.incrementAndGet();

        //当所有任务结束后,将当前的JobInfo类放入延时队列
        if(taskProcesserCount.get()==jobLength) {
            checkJob.putJob(jobName, expireTime);
        }
        
    }
}

3.4 框架主体类

  • 将工作类JobInfo注册到ConcurrentHashMap<String, JobInfo<?>>中
  • 将任务放入BlockingQueue<Runnable>中
  • 将工作类JobInfo对应的任务处理封装在线程中,然后用线程池调用,每一个线程对应一个工作任务
  • 根据任务所属的JobInfo类型,在ConcurrentHashMap获取之前注册的JobInfo,然后用这个工作类处理当前任务
  • 改JobInfo类型的所有任务结果存储在JobInfo的LinkedBlockingDeque<TaskResult<R>>结果队列中

/**
 * 框架的主体类,也是调用者主要使用的类
 */
public class PendingJobPool {
    
    //保守估计
    private static final int THREAD_COUNTS = 
            Runtime.getRuntime().availableProcessors();
    //任务队列
    private static BlockingQueue<Runnable> taskQueue
     = new ArrayBlockingQueue<>(5000);
    //线程池,固定大小,有界队列
    private static ExecutorService taskExecutor = 
            new ThreadPoolExecutor(THREAD_COUNTS, THREAD_COUNTS, 60, 
                    TimeUnit.SECONDS, taskQueue);
    //job的存放容器
    private static ConcurrentHashMap<String, JobInfo<?>> jobInfoMap
       = new  ConcurrentHashMap<>();
    
    private static CheckJobProcesser checkJob
        = CheckJobProcesser.getInstance();
    
    public static Map<String, JobInfo<?>> getMap(){
        return jobInfoMap;
    }
    
    //单例模式------
    private PendingJobPool() {}
    
    private static class JobPoolHolder{
        public static PendingJobPool pool = new PendingJobPool();
    }
    
    public static PendingJobPool getInstance() {
        return JobPoolHolder.pool;
    }
    //单例模式------
    
    //对工作中的任务进行包装,提交给线程池使用,并处理任务的结果,写入缓存以供查询
    private static class PendingTask<T,R> implements Runnable{
        
        private JobInfo<R> jobInfo;
        private T processData;

        public PendingTask(JobInfo<R> jobInfo, T processData) {
            super();
            this.jobInfo = jobInfo;
            this.processData = processData;
        }

        @SuppressWarnings("unchecked")
        @Override
        public void run() {
            R r = null;
            ITaskProcesser<T,R> taskProcesser =
                    (ITaskProcesser<T, R>) jobInfo.getTaskProcesser();
            TaskResult<R> result = null;
            
            try {
                //调用业务人员实现的具体方法
                result = taskProcesser.taskExecute(processData);
                //要做检查,防止开发人员处理不当
                if (result == null) {
                    result = new TaskResult<R>(TaskResultType.Exception, r, 
                            "result is null");
                }
                if (result.getResultType() == null) {
                    if (result.getReason() == null) {
                        result = new TaskResult<R>(TaskResultType.Exception, r, "reason is null");
                    } else {
                        result = new TaskResult<R>(TaskResultType.Exception, r,
                                "result is null,but reason:" + result.getReason());
                    }
                } 
            } catch (Exception e) {
                e.printStackTrace();
                result = new TaskResult<R>(TaskResultType.Exception, r, 
                        e.getMessage());                
            }finally {
                jobInfo.addTaskResult(result,checkJob);
            }
        }
    }
    
    //根据工作名称检索工作
    @SuppressWarnings("unchecked")
    private <R> JobInfo<R> getJob(String jobName){
        JobInfo<R> jobInfo = (JobInfo<R>) jobInfoMap.get(jobName);
        if(null==jobInfo) {
            throw new RuntimeException(jobName+"是个非法任务。");
        }
        return jobInfo;
    }
    
    //调用者提交工作中的任务
    public <T,R> void putTask(String jobName,T t) {
        JobInfo<R> jobInfo = getJob(jobName);
        PendingTask<T,R> task = new PendingTask<T,R>(jobInfo,t);
        taskExecutor.execute(task);
    }
    
    //调用者注册工作,如工作名,任务的处理器等等
    public <R> void registerJob(String jobName,int jobLength,
            ITaskProcesser<?, ?> taskProcesser,long expireTime) {
        JobInfo<R> jobInfo = new JobInfo(jobName,jobLength,
                taskProcesser,expireTime);
        if (jobInfoMap.putIfAbsent(jobName, jobInfo)!=null) {
            throw new RuntimeException(jobName+"已经注册了!");
        }
    }
    
    //获得每个任务的处理详情
    public <R> List<TaskResult<R>> getTaskDetail(String jobName){
        JobInfo<R> jobInfo = getJob(jobName);
        return jobInfo.getTaskDetail();
    }
    
    //获得工作的整体处理进度
    public <R> String getTaskProgess(String jobName) {
        JobInfo<R> jobInfo = getJob(jobName);
        return jobInfo.getTotalProcess();   
    }
    
}

3.5 缓存类

将任务处理结果放入延时队列,超时则清除出缓存


/**
 *
 *类说明:任务完成后,在一定的时间供查询,之后为释放资源节约内存,需要定期处理过期的任务
 */
public class CheckJobProcesser {
    private static DelayQueue<ItemVo<String>> queue 
        = new DelayQueue<ItemVo<String>>();//存放已完成任务等待过期的队列
    
    //单例模式------
    private CheckJobProcesser() {}
    
    private static class ProcesserHolder{
        public static CheckJobProcesser processer = new CheckJobProcesser();
    }
    
    public static CheckJobProcesser getInstance() {
        return ProcesserHolder.processer;
    }
    //单例模式------    
    
    //处理队列中到期任务的实行
    private static class FetchJob implements Runnable{

        @Override
        public void run() {
            while(true) {
                try {
                    //拿到已经过期的任务
                    ItemVo<String> item = queue.take();
                    String jobName =  (String)item.getDate();
                    PendingJobPool.getMap().remove(jobName);
                    System.out.println(jobName+" is out of date,remove from map!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }           
        }
    }
    
    /*任务完成后,放入队列,经过expireTime时间后,从整个框架中移除*/
    public void putJob(String jobName,long expireTime) {
        ItemVo<String> item = new ItemVo<String>(expireTime,jobName);
        queue.offer(item);
        System.out.println("Job["+jobName+"已经放入了过期检查缓存,过期时长:"+expireTime);
    }
    
    static {
        Thread thread = new Thread(new FetchJob());
        thread.setDaemon(true);
        thread.start();
        System.out.println("开启任务过期检查守护线程................");
    }
    
    
}

3.6 调用实现类

/**
 *
 *类说明:一个实际任务类,将数值加上一个随机数,并休眠随机时间
 */
public class MyTask implements ITaskProcesser<Integer,Integer> {

    @Override
    public TaskResult<Integer> taskExecute(Integer data) {
        Random r = new Random();
        int flag = r.nextInt(500);
        SleepTools.ms(flag);
        if(flag<=300) {//正常处理的情况
            Integer returnValue = data.intValue()+flag;
            return new TaskResult<Integer>(TaskResultType.Success,returnValue);
        }else if(flag>301&&flag<=400) {//处理失败的情况
            return new TaskResult<Integer>(TaskResultType.Failure,-1,"Failure");
        }else {//发生异常的情况
            try {
                throw new RuntimeException("异常发生了!!");
            } catch (Exception e) {
                return new TaskResult<Integer>(TaskResultType.Exception,
                        -1,e.getMessage());
            }
        }
    }

}

/**
 *
 *类说明:模拟一个应用程序,提交工作和任务,并查询任务进度
 */
public class AppTest {
    
    private final static String JOB_NAME = "计算数值";
    private final static int JOB_LENGTH = 1000;
    
    //查询任务进度的线程
    private static class QueryResult implements Runnable{
        
        private PendingJobPool pool;

        public QueryResult(PendingJobPool pool) {
            super();
            this.pool = pool;
        }

        @Override
        public void run() {
            int i=0;//查询次数
            while(i<350) {
                List<TaskResult<String>> taskDetail = pool.getTaskDetail(JOB_NAME);
                if(!taskDetail.isEmpty()) {
                    System.out.println(pool.getTaskProgess(JOB_NAME));
                    System.out.println(taskDetail);                 
                }
                SleepTools.ms(100);
                i++;
            }
        }
        
    }

    public static void main(String[] args) {
        MyTask myTask = new MyTask();
        //拿到框架的实例
        PendingJobPool pool = PendingJobPool.getInstance();
        //注册job
        pool.registerJob(JOB_NAME, JOB_LENGTH, myTask,1000*5);
        Random r = new Random();
        for(int i=0;i<JOB_LENGTH;i++) {
            //依次推入Task
            pool.putTask(JOB_NAME, r.nextInt(1000));
        }
        Thread t = new Thread(new QueryResult(pool));
        t.start();
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,635评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,628评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,971评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,986评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,006评论 6 394
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,784评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,475评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,364评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,860评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,008评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,152评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,829评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,490评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,035评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,156评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,428评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,127评论 2 356

推荐阅读更多精彩内容