多线程消费的模板代码

public class MessageDispatch {
    private final List<Worker> workers;
    private volatile boolean isWorker = true;

    /**
     * 构造函数初始化线程池
     */
    public MessageDispatch(int poolSize, int capacity) {
        this.workers = Collections.synchronizedList(new ArrayList<>());
        for (int i = 0; i < poolSize; i++) {
            //使用有界队列防止任务无限增长
            Worker worker = new Worker(new ArrayBlockingQueue<>(capacity), this);
            worker.start();
            workers.add(worker);
        }
    }

    public static class Worker extends Thread {
        private final MessageDispatch pool;
        private final BlockingQueue<Runnable> queue;

        public Worker(BlockingQueue<Runnable> queue, MessageDispatch pool) {
            this.queue = queue;
            this.pool = pool;
        }

        public Queue<Runnable> getQueue() {
            return queue;
        }

//        非阻塞的方式去取,会频繁的循环,忙等待
//        @Override
//        public void run() {
//            while (this.pool.isWorker || queue.size() > 0) {
//                Runnable task = queue.poll();
//                if (task != null) {
//                    task.run();
//                }
//            }
//        }

        @Override
        public void run() {
            Runnable task = null;
            while (this.pool.isWorker || !queue.isEmpty()) {
                try {
                    if (this.pool.isWorker) {
                        //工作中默认任务会不断到来,所以阻塞的方式获取,阻塞方式获取。
                        task = this.queue.take();
                    } else {
                        //工作结束,执行完剩下的任务再退出,非阻塞方式拿。
                        task = this.queue.poll();
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    e.printStackTrace();
                    break;
                }
                if (task != null) {
                    task.run();
                }
            }
        }
    }

    public boolean submit(String id, Runnable task) {
        int index = id.hashCode() % this.workers.size();
        Queue<Runnable> runnableQueue = this.workers.get(index).getQueue();
        return runnableQueue.add(task);
    }

    public void shutDown() {
        this.isWorker = false;
        for (Thread worker : workers) {
            if (worker.getState().equals(Thread.State.BLOCKED) || worker.getState().equals(Thread.State.WAITING)) {
                worker.interrupt();//强制中断这个线程
            }
        }
    }

    public static void main(String[] args) {
        MessageDispatch pool = new MessageDispatch(10, 100);

        List<Message> messages = new ArrayList<>();
        messages.add(new Message(0, "order init"));
        messages.add(new Message(0, "order pay"));
        messages.add(new Message(1, "order init"));
        messages.add(new Message(1, "order pay"));
        messages.add(new Message(2, "order init"));
        messages.add(new Message(2, "order pay"));

        for (Message msg : messages) {
            pool.submit(msg.getId().toString(), () -> System.out.println(msg.getId() + ":" + msg.getMessage()));
        }

        pool.shutDown();
    }
}

一种不好的写法

// 非阻塞的方式去取,会频繁的循环,忙等待
@Override
    public void run() {
        while (this.pool.isWorker || queue.size() > 0) {
            Runnable task = queue.poll();
            if (task != null) {
                task.run();
            }
        }
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容