Java实现生产者/消费者模型实战应用

场景: 我们需要创建一个job,这个job是异步执行的,且任务有多个状态,每个状态需要不同的处理。

实现: 在服务里创建一个生产消费模型,job在创建后,设置初始状态,并放在队列里由消费者消费,处理业务逻辑。消费成功后,更改状态再次放入队列中,等待下一次消费。

实现一: wait && notify

最朴素也是最简单的方案:wait && notify机制 。
队列中有数据就阻塞生产者线程,消费者消费后就唤醒生产者。反之,队列中没有数据就阻塞消费者线程,生产者添加数据后唤醒消费者线程。wait && notify机制虽然足够简单,但是不够灵活,并发效率也不佳,不能满足实际场景需求。

 // 存储生产者产生的数据
    static List<String> list = new ArrayList<>();

    public static void main(String[] args) {

        new Thread(() -> {
            while (true) {
                synchronized (list) {
                    // 判断 list 中是否有数据,如果有数据的话,就进入等待状态,等数据消费完
                    if (list.size() != 0) {
                        try {
                            list.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    // list 中没有数据时,产生数据添加到 list 中
                    try {
                        Thread.sleep(5000);
                        list.add(UUID.randomUUID().toString());
                        list.notify();
                        System.out.println(Thread.currentThread().getName() + list);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, "生产者线程 A ").start();


        new Thread(() -> {
            while (true) {
                synchronized (list) {
                    // 如果 list 中没有数据,则进入等待状态,等收到有数据通知后再继续运行
                    if (list.size() == 0) {
                        try {
                            list.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    // 有数据时,读取数据
                    System.out.println(Thread.currentThread().getName() + list);
                    list.notify();
                    // 读取完毕,将当前这条 UUID 数据进行清除
                    list.clear();
                }
            }
        }, "消费者线程 B ").start();

    }

实现二: BlockingQueue

BlockingQueue的写法最简单。核心思想是,把并发和容量控制封装在缓冲区中

 public static void main(String[] args) throws InterruptedException {
        LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
        for(int i =1;i<=10;i++){
            queue.put("第"+i+"条消息");
        }
        System.out.println("当前队列还有"+queue.size()+"消息");


        new Thread(()->{
            try {
                System.out.println("睡眠中");
                Thread.sleep(10000);
                for(int i =1;i<=10;i++){
                    queue.put("新的消息:第"+i+"条消息");
                }

            } catch (InterruptedException e) {

            }
        }).start();

        int nThreads = 1 ;
        ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
        for(int i = 0 ;i<nThreads;i++){
            executorService.submit(()->{
                while (true){
                    System.out.println("消费者");
                    String poll = null;
                    try {
                        poll = queue.take();

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("消费者:"+poll==null?"":poll);
                    System.out.println(Thread.currentThread().getName());
                }
            });
        }


    }

demo中通过put方法生产数据,take方法消费数据。这个两个方法都有阻塞线程的效果,我们来看下:

2.1 put()

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        
        // 以可中断的形式获取put锁
        putLock.lockInterruptibly();
        try {
            // 与offer(e, timeout, unit)相比,采用了无限等待的方式
            while (count.get() == capacity) {
                // 当执行了移除元素操作后,会通过signal操作来唤醒notFull队列中的一个线程
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

2.2 take()

public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 出队,并自减
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
            // 只要队列还有元素,就唤醒一个take操作
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
        // 如果在队列满的情况下移除一个元素,会唤醒一个put操作
            signalNotFull();
        return x;
    }

这里能看到take其实就是put的一个翻版。这里也不难发现wait && notify机制实际上也是在模拟实现一个BlockingQueue。使用BlockingQueue不枉为最佳选择。

实战:

利用@PostConstruct在服务启动时加载生产者/消费者线程。exportRunnerListener循环执行asyncJobRunner()消费队列,使用take()方法阻塞线程,避免资源浪费。执行消费者之前,我们需要在db里面捞一下未完成的任务,避免因服务重启造成的任务丢失。

private final ExecutorService executorService;
private static final LinkedBlockingQueue<String> jobQueue = new LinkedBlockingQueue<>();
...
@PostConstruct
    public void NeoExportRunner() {
        int nThreads = 3;
        //每次重启找出未完成的job
        exportJobRepository.findByStatusIn(Arrays.asList(new String[]{PENDING, LOADED, UPLOADED, NOTIFIED}))
                .orElse(new ArrayList<>()).forEach(
                //加入队列
                o -> jobQueue.add(o.getId())
        );
        this.executorService = Executors.newFixedThreadPool(nThreads);

        for (int i = 0; i < nThreads; i++) {
            executorService.execute(() -> {
                exportRunnerListener();
            });
        }

    }
    
    public void exportRunnerListener() {
        while (true) {
            log.info("asyncJobRunner is working... {}", Thread.currentThread().getName());
            try {
                //消费者
                asyncJobRunner();
            } catch (Exception e) {
                log.error("asyncJobRunner is error...{}",e);
            }
        }
    }

asyncJobRunner()是一个任务调度器。拿到队列里的消息,根据状态来处理不同的业务逻辑。每个job执行完后,变更任务状态,重新写回队列,下次消费时进行下一个状态的处理,从而实现状态扭转。

    public static void asyncJobRunner() throws InterruptedException {
        
        //消费jobQueue中的数据
        Optional.ofNullable(jobQueue.take()).flatMap(id -> exportJobRepository.findById(id)).ifPresent(job -> {
            switch (job.getStatus()) {
                case PENDING:
                    loadData(job);
                    log.info("Job had been loaded.. job -> " + job.toString());
                    break;

                case LOADED:
                    upload(job);
                    log.info("Job had been uploaded.. job -> " + job.toString());
                    break;

                case UPLOADED:
                    notify(job);
                    log.info("Job had been notify.. job -> " + job.toString());
                    break;

                case FAILED:
                    retry(job);
                    log.info("Job had been failed.. will be retry. .job -> " + job.toString());
                    break;

                case NOTIFIED:
                    finish(job);
                    log.info("Job had been finished. . job -> " + job.toString());
                    break;

                case FINISHED:
                    log.warn("Finished job should not appear in job queue, check for logical error. job -> " + job.toString());
                    break;

                case CANCELED:
                    log.info("Job had been canceled. Nothing to do here. job -> " + job.toString());
                    break;

                default:
                    log.error("Unrecognized job status. job -> " + job.toString());

            }
        });
    }
  

这里说一下retry机制,当任务在某个状态发生异常,并未执行成功,我们来设置一个retry机制在任务FAILED的时候进行补偿。某个状态异常时,将当前状态保存在LastStatus中并设置当前状态为FAILED,同时记录retry的次数。这样以来下次我们拿到这个job的状态是FAILED,在调用retry方法时把失败时的状态在写回去丢到队列里,下一次就可以继续执行了。

    public static void retry(ExportJob job) {
    //判断重试次数是否<最大重试次数
        if (job.getRetry() < maximumRetry) {
            job.setStatus(job.getLastStatus());
            job.setRetry(job.getRetry() + 1);
            jobQueue.add(job.getId());
        } else {
            //save db -> error status
            log.error("Max retry exceeds. job -> " + job.toString());
        }
    }

addJob()cancelJob() 提供给我们的业务代码调用,用来创建任务和取消任务,这里的取消任务做不到实时性,具体代码需要根据实际业务场景进行调整。

public static String addJob(String id, String type, String channel) {
        ExportJob exportJob = ExportJob.builder()
                .channel(channel)
                .jobId(id)
                //初始化任务
                .status(PENDING)
                .createAt(sdf.format(System.currentTimeMillis()))
                .type(type)
                .id(UUID.randomUUID().toString().replaceAll("-", ""))
                .retry(0)
                .build();
        ExportJob saved = exportJobRepository.saveAndFlush(exportJob);
        jobQueue.add(saved.getId());
        return saved.getId();
    }
public static String cancelJob(String id) {
        canceled.add(id);
        exportJobRepository.findById(id).ifPresent(job -> {
            //取消任务
            job.setStatus(CANCELED);
            exportJobRepository.saveAndFlush(job);
        });
        return id;
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352