线程池+队列并发处理

package com.mqtt.thread;

import java.util.Hashtable;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.stereotype.Component;

/**

  • @author HeyS1

  • @date 2016/12/1

  • @description threadPool多线池,不返回子线程的运行状态

  • scheduler 调度线程池 用于处理订单线程池由于超出线程范围和队列容量而不能处理的订单
    /
    @Component
    public class ThreadPoolManager implements BeanFactoryAware {
    private static Logger log = LoggerFactory.getLogger(ThreadPoolManager.class);
    private BeanFactory factory;//用于从IOC里取对象
    // 线程池维护线程的最少数量
    private final static int CORE_POOL_SIZE = 2;
    // 线程池维护线程的最大数量
    private final static int MAX_POOL_SIZE = 10;
    // 线程池维护线程所允许的空闲时间
    private final static int KEEP_ALIVE_TIME = 0;
    // 线程池所使用的缓冲队列大小
    /
    *
    大于MAX_POOL_SIZE时,放入阻塞对列,
    大于》MAX_POOL_SIZE+WORK_QUEUE_SIZE:拒绝
    /
    private final static int WORK_QUEUE_SIZE =2/
    50*/;
    // 消息缓冲队列
    Queue<Object> msgQueue = new LinkedList<Object>();

    //用于储存在队列中的订单,防止重复提交
    Map<String, Object> cacheMap = new ConcurrentHashMap<>();

private  Hashtable<String, Object>  tables=new Hashtable<String, Object>();

/* @Autowired
DBThread dBThread;
*/

public Hashtable<String, Object> getTables() {
    return tables;
}

public void setTables(Hashtable<String, Object> tables) {
    this.tables = tables;
}

public ThreadPoolManager(Hashtable<String, Object>  tables ) {
    this.tables=tables;
}

public ThreadPoolManager() {
    super();
}


//由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序
final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("太忙了,把该订单交给调度线程池逐一处理" +r.toString()+"   "+
    //((DBThread) r).getMsg()+
    "--size="+msgQueue.size()+"---tsize="+threadPool.getQueue().size());
       
        
        //msgQueue.offer(dBThread.getMsg());
        msgQueue.offer(r);
    }
};

// 订单线程池
final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
        CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME,
        TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler);

// 调度线程池。此线程池支持定时以及周期性执行任务的需求。
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);

// 访问消息缓存的调度线程,每秒执行一次
// 查看是否有待定请求,如果有,则创建一个新的AccessDBThread,并添加到线程池中
final ScheduledFuture taskHandler = scheduler.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        if (!msgQueue.isEmpty()) {
            if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) {
                System.out.print("调度:");
               /* String orderId = (String) msgQueue.poll();
                DBThread accessDBThread = (DBThread) factory.getBean("dBThread");
                accessDBThread.setMsg(orderId);
                threadPool.execute(accessDBThread);*/
                DBThread dt = (DBThread) msgQueue.poll();
                threadPool.execute(dt);
                
            }
             /*while (msgQueue.peek() != null) {
             }*/
        }
    }
}, 0, 10, TimeUnit.MILLISECONDS);//.SECONDS

//终止订单线程池+调度线程池
public void shutdown() {
    //true表示如果定时任务在执行,立即中止,false则等待任务结束后再停止
    System.out.println(taskHandler.cancel(false));
    scheduler.shutdown();
    threadPool.shutdown();
}

public Queue<Object> getMsgQueue() {
    return msgQueue;
}


//将任务加入订单线程池
public void processOrders(String orderId) {
    if (cacheMap.get(orderId) == null) {
        cacheMap.put(orderId,new Object());
        
        DBThread dt = this.getNewThread(orderId, orderId);
        threadPool.execute(dt);
        
    }
}


/**
 * 无状态返回
 * @param topic
 * @param msg
 */
public void processOrders2(String topic,String msg ) {
 
    DBThread dt = this.getNewThread(topic, msg);
    threadPool.execute(dt);
    
        
}

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
    factory = beanFactory;
}

public DBThread getNewThread(String topic,String msg ){
    DBThread db=new DBThread(tables);
    db.setMsg(msg);
    db.setTopic(topic);
    return db;
}

public static void main(String[] agrs){

noReturnTest();

}



public static void noReturnTest(){

    Hashtable<String, Object>  tables=new Hashtable<String, Object>();
        ThreadPoolManager tmp2=new ThreadPoolManager(tables);
        int size=1000;
        long start = System.currentTimeMillis();
        
        for (int i = 0; i < size; i++) {
            //模拟并发500条记录            
            tmp2.processOrders2("for-"+i, i+"");
        
        }
        
        //等待执行完,处理完的个数等于总数则完成
        while(tables.size()<size){
            
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            
            
            //等待
            System.out.println("----等待完成,已处理:"+tables.size()+"~~~"+tmp2.getTables().size());
            
        }
        for(Map.Entry<String, Object> entry: tables.entrySet()){
            System.out.println("key---------"+entry.getKey()+"  "+"value--------"+entry.getValue());
        }
        long end = System.currentTimeMillis();
        
        System.out.println("-----------处理完毕----------------"+(end-start));
        tmp2.shutdown();
        System.out.println("----------------00000000000000000--------------");
    
}

}

-----------------线程类--------------------------------------------
package com.mqtt.thread;

import java.io.IOException;
import java.util.Hashtable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import com.mqtt.AlibabaMQTTSendRecive;

//线程池中工作的线程
@Component
@Scope("prototype")//spring 多例
public class DBThread implements Runnable {
private String msg;
private String topic ;
private Logger log = LoggerFactory.getLogger(DBThread.class);
private Hashtable<String, Object> tables=new Hashtable<String, Object>();

public DBThread(Hashtable<String, Object>tables ) {
this.tables=tables;
}
public DBThread() {
super();
}

public DBThread(String msg, String topic) {
super();
this.msg = msg;
this.topic = topic;
}

public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

@Override
public void run() {
//模拟在数据库插入数据
log.info("发送消息线程进程 DBThread= "+ this.toString()+ " ---- ->" + msg+"");
System.out.println("发送消息线程进程 DBThread= "+ this.toString()+ " ---- ->" + msg+"");

  try {
        Thread.sleep(3000);
                    //TODO dosomething
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
  
  tables.put(topic, msg);//线程执行完毕,消息反馈

}

public String getMsg() {
return msg;
}

public void setMsg(String msg) {
this.msg = msg;
}
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,525评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,203评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,862评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,728评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,743评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,590评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,330评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,244评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,693评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,885评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,001评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,723评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,343评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,919评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,042评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,191评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,955评论 2 355