并发框架是为了将业务代码和多线程代码隔离开来,为了可以让不懂并发的人员也可以开发使用这个框架。
框架中提供线程池、任务储存容器、使用者需要实现的任务接口、提交给框架执行的工作实体类、任务返回结果实体类、有可能还需要缓存定期清理已完成的任务。
框架业务示意图:
不论那种类型的任务都需要兼容,支持用户注册提交任务,查询任务进度和结果。
框架流程图:
框架内部跟业务代码是松耦合的,业务人员不需要了解内部代码,用线程池支持并发,提供给业务人员注册任务,并提供查询任务结果方法。内部需要并发容器储存任务,已经完成的任务放入队列中过期清除。
废话不多说,直接上代码:
这个接口需要业务人员实现,实现想要执行的任务
/*
*要求框架使用者实现的任务接口。
* data 是方法使用的业务数据类型
* return 方法执行后业务返回的结果
* */
public interface ITaskProcesser<T,R> {
TaskResult<R> taskExcutor(T data);
}
工作实体类:
用泛型实现,提供了任务计数器,任务计数器用到了原子类型,避免内存重排序导致的错误。
提供了放回任务结果的方法供开发人员查看。
提供了将完成的任务放入队列的方法。
其中任务计数器是不能让业务人员随意操作的,所以构造的时候没有传入参数。
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
/*
* 提交给框架执行的工作实体类
* */
public class JobInfo<R> {
//工作名字
private final String jobName;
//工作的任务个数
private final int jobLength;
//工作任务处理器
private final ITaskProcesser<?,?> taskProcesser;
//任务计数器
private AtomicInteger successCount;//原子类型
private AtomicInteger taskProcesserCount;
//放入已完成的队列
private LinkedBlockingDeque<TaskResult<R>> taskDetailQueue;
//完成任务超时 定时器 超时清除
private final long expireTime;
//构造方法
public JobInfo(String jobName, int jobLength, ITaskProcesser<?, ?> taskProcesser, long expireTime) {
this.jobName = jobName;
this.jobLength = jobLength;
this.taskProcesser = taskProcesser;
//开发人员不应该修改任务的计数 由框架来控制
this.successCount = new AtomicInteger();
this.taskProcesserCount = new AtomicInteger();
this.taskDetailQueue = new LinkedBlockingDeque<TaskResult<R>>(jobLength);
this.expireTime = expireTime;
}
public ITaskProcesser<?, ?> getTaskProcesser() {
return taskProcesser;
}
//返回成功处理的结果数
public int getSuccessCount() {
return successCount.get();
}
//返回当前已处理的结果数
public int getTaskProcesserCount() {
return taskProcesserCount.get();
}
//提供工作中失败的次数
public int getFailCount() {
return taskProcesserCount.get() - successCount.get();
}
public String getTotalProcess() {
return "Success["+successCount.get()+"]/Current["
+taskProcesserCount.get()+"] Total["+jobLength+"]";
}
//获取每个任务结果,从头部获取
public List<TaskResult<R>> getTaskDetail(){
List<TaskResult<R>> taskList = new LinkedList<>();
TaskResult<R> taskResult;
while((taskResult = taskDetailQueue.pollFirst()) != null){
taskList.add(taskResult);
}
return taskList;
}
//从业务角度来讲,保证最终一致性就行,不需要加锁,影响性能。已经用了源自操作和并发安全队列
public void addTaskResult(TaskResult<R> result, CheckJobProcesser checkJob){
if (TaskResultType.Success.equals(result.getResultType())){
successCount.incrementAndGet();
}
taskDetailQueue.addLast(result);//结果从尾部添加
taskProcesserCount.incrementAndGet();
if (taskProcesserCount.get() == jobLength){
checkJob.putJob(jobName,expireTime);
}
}
}
任务结果类
提供返回任务结果和原因的方法。
/*
* 任务返回结果实体类
* */
public class TaskResult<R> {
private final TaskResultType resultType;
private final R returnValue;//业务结果数据
private final String reason;//方法失败原因
public TaskResult(TaskResultType resultType, R returnValue, String reason) {
this.resultType = resultType;
this.returnValue = returnValue;
this.reason = reason;
}
public TaskResult(TaskResultType resultType, R returnValue) {
this.resultType = resultType;
this.returnValue = returnValue;
this.reason = "Success";
}
public TaskResultType getResultType() {
return resultType;
}
public R getReturnValue() {
return returnValue;
}
public String getReason() {
return reason;
}
}
任务执行的结果类:
/*
*
* 任务执行的结果类
* */
public enum TaskResultType {
Success,Failure,Exception;
//返回业务人员需要的结果
//返回业务人员不需要的结果
//返回异常
}
查询结果类:
主要实现将完成的任务放入队列,供查询,如果一点时间过了就将任务清除,防止占用大量内存。
使用了单例模式,也是为了节省内存。
启用线程清理任务,设置为守护线程。
/*
* 任务完成后,在一定的时间供查询,之后为释放资源节约内存,需要定期处理过期的任务
* */
public class CheckJobProcesser {
private static DelayQueue<ItemVo<String>> queue = new DelayQueue<>();//存放已完成任务,超时过期
//单例模式------
private CheckJobProcesser(){}
private static class ProcesserHolder{
public static CheckJobProcesser processer = new CheckJobProcesser();
}
public static CheckJobProcesser getInstance(){
return ProcesserHolder.processer;
}
//单例模式------
//处理队列中到期任务的实行
private static class FetchJob implements Runnable{
@Override
public void run() {
while (true){
try{
ItemVo<String> item = queue.take();
String jobName = (String)item.getDate();
PendingJobPool.getMap().remove(jobName);
System.out.println(jobName + "is out of date , remove from map");
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
}
/*任务完成后,放入队列,经过expireTime时间后,从整个框架中移除*/
public void putJob(String jobName,long expireTime){
ItemVo<String> item = new ItemVo<>(expireTime,jobName);
queue.offer(item);
System.out.println("job[" + jobName + "已经放入过期检查缓存,过期时长:" + expireTime);
}
//类初始化的时候就运行线程
static {
Thread thread = new Thread(new FetchJob());
thread.setDaemon(true);
thread.start();
System.out.println("开启守护线程");
}
}
框架主体:
这里使用了线程池,保守估计线程个数,使用的个数和cpu数量相同,这个可以根据业务需求修改。
线程池没有用 JDK 提供的那几个,主要考虑到想要用有界队列,就自己定义线程池。
用 ConcurrentHashMap 存放任务。
提供了注册任务方法、提交任务方法、获得任务结果等。
对工作中的任务进行了包装,提交给线程池使用,并处理任务的结果,写入缓存以供查询
同样使用了单例模式。
import com.enjoy.MultiThread.ch8a.vo.ITaskProcesser;
import com.enjoy.MultiThread.ch8a.vo.JobInfo;
import com.enjoy.MultiThread.ch8a.vo.TaskResult;
import com.enjoy.MultiThread.ch8a.vo.TaskResultType;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
/**
* 框架的主体类,也是调用者主要使用的类
*/
public class PendingJobPool {
//保守估计
private static final int THREAD_COUNT = Runtime.getRuntime().availableProcessors();
//有界队列
private static BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<>(5000);
//创建固定大小有界队列线程池
private static ExecutorService taskExecutor
= new ThreadPoolExecutor(THREAD_COUNT,THREAD_COUNT,60, TimeUnit.SECONDS,taskQueue);
//job 存放容器
private static ConcurrentHashMap<String, JobInfo<?>> jobInfoMap = new ConcurrentHashMap<>();
public static Map<String, JobInfo<?>> getMap(){
return jobInfoMap;
}
private static CheckJobProcesser checkJob
= CheckJobProcesser.getInstance();
//单例模式---
private PendingJobPool(){}
private static class JobPoolHolder{
public static PendingJobPool pool = new PendingJobPool();
}
public static PendingJobPool getInstance(){
return JobPoolHolder.pool;
}
//单例模式---
//调用者注册工作,如工作名,任务的处理器等等
public<R> void registerJob(String jobName, int jobLength, ITaskProcesser<?,?> taskProcesser,long expireTime){
JobInfo<R> jobInfo = new JobInfo(jobName,jobLength,taskProcesser,expireTime);
if (jobInfoMap.putIfAbsent(jobName,jobInfo) != null){
throw new RuntimeException("当前任务已经注册");
}
}
//调用者提交任务
public <T,R> void putTask(String jobName,T t){
JobInfo<R> jobInfo = getJob(jobName);
PendingTask<T,R> task = new PendingTask<>(jobInfo,t);
taskExecutor.execute(task);
}
//根据工作名称检索工作
private <R> JobInfo<R> getJob(String jobName){
JobInfo<R> jobInfo = (JobInfo<R>) jobInfoMap.get(jobName);
if (null == jobInfo){
throw new RuntimeException(jobName + "是个非法任务");
}
return jobInfo;
}
//对工作中的任务进行包装,提交给线程池使用,并处理任务的结果,写入缓存以供查询
private static class PendingTask<T,R> implements Runnable{
private JobInfo<R> jobInfo;
private T processData;
public PendingTask(JobInfo<R> jobInfo,T processData){
this.jobInfo = jobInfo;
this.processData = processData;
}
@Override
public void run(){
R r = null;
ITaskProcesser<T,R> taskProcesser = (ITaskProcesser<T,R>) jobInfo.getTaskProcesser();
TaskResult<R> result = null;
//调用业务员人员实现的方法
result = taskProcesser.taskExcutor(processData);
//要做检查,防止异常
try{
if (result == null){
result = new TaskResult<R>(TaskResultType.Exception,r,"result id null");
}
if (result.getResultType() == null) {
if (result.getReason() == null) {
result = new TaskResult<R>(TaskResultType.Exception, r, "reason is null");
} else {
result = new TaskResult<R>(TaskResultType.Exception, r,
"result is null,but reason:" + result.getReason());
}
}
}catch(Exception e){
e.printStackTrace();
result = new TaskResult<R>(TaskResultType.Exception,r,e.getMessage());
}finally {
jobInfo.addTaskResult(result,checkJob);
}
}
}
//获得每个任务的处理详情
public <R> List<TaskResult<R>> getTaskDetail(String jobName){
JobInfo<R> jobInfo = getJob(jobName);
return jobInfo.getTaskDetail();
}
//获得工作的整体处理进度
public <R> String getTaskProgess(String jobName) {
JobInfo<R> jobInfo = getJob(jobName);
return jobInfo.getTotalProcess();
}
}
实现了自己的任务测试一下整个框架,构造了成功失败异常的情况:
import com.enjoy.MultiThread.ch8a.vo.ITaskProcesser;
import com.enjoy.MultiThread.ch8a.vo.TaskResult;
import com.enjoy.MultiThread.ch8a.vo.TaskResultType;
import com.enjoy.MultiThread.tools.SleepTools;
import java.util.Random;
/**
*类说明:一个实际任务类,将数值加上一个随机数,并休眠随机时间
*/
public class MyTask implements ITaskProcesser<Integer,Integer> {
@Override
public TaskResult<Integer> taskExcutor(Integer data) {
Random r = new Random();
int flag = r.nextInt(500);
SleepTools.ms(flag);
if(flag<=300) {//正常处理的情况
Integer returnValue = data.intValue()+flag;
return new TaskResult<Integer>(TaskResultType.Success,returnValue);
}else if(flag>301&&flag<=400) {//处理失败的情况
return new TaskResult<Integer>(TaskResultType.Failure,-1,"Failure");
}else {//发生异常的情况
try {
throw new RuntimeException("异常发生了!!");
} catch (Exception e) {
return new TaskResult<Integer>(TaskResultType.Exception,
-1,e.getMessage());
}
}
}
}
测试代码:
import com.enjoy.MultiThread.ch8a.PendingJobPool;
import com.enjoy.MultiThread.ch8a.vo.TaskResult;
import com.enjoy.MultiThread.tools.SleepTools;
import java.util.List;
import java.util.Random;
public class AppTest {
private final static String JOB_NAME = "计算数值";
private final static int JOB_LENGTH = 1000;
//查询任务进度的线程
private static class QueryResult implements Runnable{
private PendingJobPool pool;
public QueryResult(PendingJobPool pool) {
super();
this.pool = pool;
}
@Override
public void run() {
int i=0;//查询次数
while(i<350) {
List<TaskResult<String>> taskDetail = pool.getTaskDetail(JOB_NAME);
if(!taskDetail.isEmpty()) {
System.out.println(pool.getTaskProgess(JOB_NAME));
System.out.println(taskDetail);
}
SleepTools.ms(100);
i++;
}
}
}
public static void main(String[] args) {
MyTask myTask = new MyTask();
//拿到框架的实例
PendingJobPool pool = PendingJobPool.getInstance();
//注册job
pool.registerJob(JOB_NAME, JOB_LENGTH, myTask,1000*5);
Random r = new Random();
for(int i=0;i<JOB_LENGTH;i++) {
//依次推入Task
pool.putTask(JOB_NAME, r.nextInt(1000));
}
Thread t = new Thread(new QueryResult(pool));
t.start();
}
}
测试结果:
结果比较多没有放全,可以自己本地跑一下。
代码 github 地址:
https://github.com/theodore816/javastudy/tree/master/com/enjoy/MultiThread/ch8a