AQS(三) 多线程可重入

有道面试题

有一家生产奶酪的厂家,每天需要生产100000份奶酪卖给超市.
通过一辆送货车发货,送货车辆每次送100份。
厂家有一个容量为1000份的冷库,用于奶酪保鲜,生产的奶酪需要先存放在冷库。
运输车辆从冷库取货。
厂家有三条生产线,分别是 牛奶供应生产线、发酵剂制作生产线、奶酪生产线。生产每份奶酪需要2份牛奶和1份发酵剂。
请设计生产系统。


1.分析

  1. 三条生产线加一个送货车就是四个线程
  2. 奶酪生产需要等待牛奶与发酵剂
  3. 送货车需要等待奶酪数量满足一次运送的量才开始运送
  4. 奶酪两种情况下需要停止生产
  • 达到冷藏库数量
  • 达到当天奶酪生产目标

其实这就是一个变种的生产者消费者模式,下面开始实现。

2.实现

传统的 消费者生产者模型 是用Thread的wait+notify实现,这里我想用ReentrantLock来玩,用 Condition 来细化锁,具体代码如下

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MilkOne {

    // 运送容量
    final static int ALL_SIZE = 100;
    // 当前容量
    final static int CURR_CAPACITY = 1000;
    // 总生产量
    final static int ALL_CAPACITY = 10000;

    // 牛奶
    final static AtomicInteger milkInt = new AtomicInteger(0);
    // 发酵剂 - 10000
    final static AtomicInteger starterInt = new AtomicInteger(0);
    // 奶酪 - 10000
    final static AtomicInteger cheeseInt = new AtomicInteger(0);

    static class LockCondition {

        Lock lock = new ReentrantLock();

        // 取货车
        Condition carCondition = lock.newCondition();
        // 牛奶
        Condition milkCondition = lock.newCondition();
        // 发酵剂 - 10000
        Condition starterCondition = lock.newCondition();
        // 奶酪 - 10000
        Condition cheeseCondition = lock.newCondition();

        void lock() { lock.lock(); }

        void unlock() { lock.unlock(); }

        void signalCar() { carCondition.signal(); }

        void awaitCar() throws InterruptedException{ carCondition.await(); }

        void signalMilk() { milkCondition.signal(); }

        void awaitMilk() throws InterruptedException{ milkCondition.await(); }

        void signalStarter() { starterCondition.signal(); }

        void awaitStarter() throws InterruptedException{ starterCondition.await(); }

        void signalCheese() { cheeseCondition.signal(); }

        void awaitCheese() throws InterruptedException{ cheeseCondition.await(); }

    }

    /**
     * 三个生产线程(总共生产10000)
     *
     * 没说牛奶跟发酵剂需要保存,这里不设等待
     *
     * 车取货的线程(每满一百取一次)
     *
     */
    public static void main(String[] args) throws InterruptedException{

        LockCondition lock = new LockCondition();

        // 1.生产牛奶
        Thread milkTh = new Thread(new MilkTh(lock), "milk");
        milkTh.start();

        // 2.生产发酵剂
        Thread starterTh = new Thread(new StarterTh(lock), "starter");
        starterTh.start();

        // 3.生产奶酪
        Thread cheeseTh = new Thread(new CheeseTh(lock), "cheese");
        cheeseTh.start();

        // 4.car
        Thread carTh = new Thread(new CarTh(lock), "car");
        carTh.start();
    }

    // 4.运输车
    static class CarTh implements Runnable {
        LockCondition lock;
        CarTh(LockCondition lock) { this.lock = lock; }

        @Override
        public void run(){
            int p = 0;
            for (;;) {
                lock.lock();
                System.out.println(p + " car lock...");
                try {
                    if (p >= ALL_CAPACITY / ALL_SIZE) {
                        break;
                    }
                    // 1.没满运货车数量
                    if (cheeseInt.get() < ALL_SIZE) {
                        // 1.唤醒生产奶酪
                        lock.signalCheese();
                        // 2.运货车等待
                        lock.awaitCar();
                        continue;
                    }
                    // 可以运送
                    if (cheeseInt.compareAndSet(cheeseInt.get(), cheeseInt.get() - ALL_SIZE)) {
                        System.out.println(p++ + " 运送 cheeseInt: " + cheeseInt.get());
                        // 可以继续生产奶酪
                        lock.signalCheese();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(p + " car unlock...");
                    lock.unlock();
                }
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("car end..."
                    + ", milk:" + milkInt.get() + ", starter:" + starterInt.get() + ", cheese:" + cheeseInt.get() + ", p:" + p);
        }
    }

    // 3.奶酪
    static class CheeseTh implements Runnable {
        LockCondition lock;
        CheeseTh(LockCondition lock) { this.lock = lock; }

        @Override
        public void run(){

            int z = 0;
            for (;;) {
                lock.lock();
                System.out.println(z + " cheese lock...");
                try {
                    if (z >= ALL_CAPACITY) {
                        break;
                    }
                    if (starterInt.get() < 1) lock.awaitStarter();
                    if (milkInt.get() < 2) lock.awaitMilk();

                    /* 1奶酪 = 2牛奶 + 1发酵剂 */
                    // 2牛奶
                    milkInt.compareAndSet(milkInt.get(), milkInt.get() - 2);
                    // 1发酵剂
                    starterInt.decrementAndGet();
                    // 1奶酪
                    int num = cheeseInt.incrementAndGet();
                    System.out.println(z++ + " 生产了一份 cheeseInt: " + cheeseInt.get());
                    // 唤醒运货车
                    lock.signalCar();
                    // 超过了 1000 份奶酪
                    if (CURR_CAPACITY <= num) {
                        // 停止生产
                        lock.awaitCheese();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(z + " cheese unlock...");
                    lock.unlock();
                }
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("cheese end..."
                    + ", milk:" + milkInt.get() + ", starter:" + starterInt.get() + ", cheese:" + cheeseInt.get());
        }
    }

    // 2.发酵剂
    static class StarterTh implements Runnable {
        LockCondition lock;
        StarterTh(LockCondition lock) { this.lock = lock; }

        @Override
        public void run(){
            int y = 0;
            for (;;) {
                lock.lock();
                System.out.println(y + " starter lock...");
                try {
                    if (y >= ALL_CAPACITY) {
                        break;
                    }
                    // 一次生产一份
                    starterInt.incrementAndGet();
                    System.out.println(y++ + " 生产了一份 starterInt: " + starterInt.get());
                    lock.signalStarter();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(y + " starter unlock...");
                    lock.unlock();
                }
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("starter end..."
                    + ", milk:" + milkInt.get() + ", starter:" + starterInt.get() + ", cheese:" + cheeseInt.get());
        }
    }

    // 1.牛奶
    static class MilkTh implements Runnable {

        LockCondition lock;
        MilkTh(LockCondition lock) { this.lock = lock; }

        @Override
        public void run(){
            int x = 0;
            for (;;) {
                lock.lock();
                System.out.println(x + " milk lock...");
                try {
                    if (x >= ALL_CAPACITY) {
                        break;
                    }
                    // 一次生产两份
                    milkInt.addAndGet(2);
                    System.out.println(x++ + " 生产了两份 milkInt: " + milkInt.get());
                    lock.signalMilk();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(x + " milk unlock...");
                    lock.unlock();
                }
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("milk end..."
                    + ", milk:" + milkInt.get() + ", starter:" + starterInt.get() + ", cheese:" + cheeseInt.get());
        }
    }
}

这么写功能看上去是实现了,但其实有一个很严重的问题,那就是四个线程共用了同一把可重入锁,而可重入锁是基于 AQS 的独占模式,也就是同一时间只有一个线程能加锁成功,其他线程只能等待。

而此处,明显四个线程应该是可以同时执行的,比如同时生产牛奶和发酵剂,一边生产奶酪一边运送奶酪等。

因此这里的 ReentrantLock 是不符合要求的。

3. 优化

这里就有两种思路,ReentrantLock 行不通那就换成 AQS 共享模式的实现,不过这里我先用了 ReentrantLock 就不想换了。另一个思路就是改造 ReentrantLock。

独占模式的 ReentrantLock 行不通,我改成共享的 ReentrantLock 可以不?

试试看。

import java.io.Serializable;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;

public class HyLock implements Serializable {

    private Sync sync;

    HyLock(int permits) {
        sync = new Sync(permits);
    }

    public void lock() { sync.lock(); }

    public void unlock() { sync.unlock(); }

    public Condition newCondition() { return sync.newCondition(); }

    static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        Sync(int state) {
            setState(state);
        }

        final void lock() {
            acquireShared(1);
        }

        final void unlock() {
            releaseShared(1);
        }

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        protected final int tryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current)
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        protected boolean tryRelease(int arg) {
            return tryReleaseShared(arg);
        }

        protected boolean tryAcquire(int arg) {
            return tryAcquireShared(arg) >= 0;
        }

        protected boolean isHeldExclusively() {
            return true;
        }
    }
}

然后替换下 LockCondition 中的 ReentrantLock

static class LockCondition {

    HyLock lock;
        
    LockCondition(int permits) {
      lock = new HyLock(permits);
    }
    ...
}

main 方法也改下

LockCondition lock = new LockCondition(4);

然后运行会发现报 lock 空指针,看下 LockCondition 的 class 文件

static class LockCondition {
    HyLock lock;
    Condition carCondition;
    Condition milkCondition;
    Condition starterCondition;
    Condition cheeseCondition;

    LockCondition(int permits) {
        this.carCondition = this.lock.newCondition();
        this.milkCondition = this.lock.newCondition();
        this.starterCondition = this.lock.newCondition();
        this.cheeseCondition = this.lock.newCondition();
        this.lock = new HyLock(permits);
    }

对比下 java 文件

static class LockCondition {

    HyLock lock;

    LockCondition(int permits) {
        lock = new HyLock(permits);
    }

    Condition carCondition = lock.newCondition();
    Condition milkCondition = lock.newCondition();
    Condition starterCondition = lock.newCondition();
    Condition cheeseCondition = lock.newCondition();

JIT 会将下面四个赋值操作放到唯一构造方法里面,且放在最上面执行,而此时 lock 还没有初始化所以会报 NPE,所以需要调整一下 LockCondition 类

static class LockCondition {
    HyLock lock;
    // 取货车
    Condition carCondition;
    // 牛奶
    Condition milkCondition;
    // 发酵剂 - 10000
    Condition starterCondition;
    // 奶酪 - 10000
    Condition cheeseCondition;

    LockCondition(int permits) {
        lock = new HyLock(permits);
        carCondition = lock.newCondition();
        milkCondition = lock.newCondition();
        starterCondition = lock.newCondition();
        cheeseCondition = lock.newCondition();
    }
    ...
}

这样就行了。整体上来看,ReentrantLock 从单线程变成了多线程

  1. 相关的数据修改(AtomicInteger)都是原子的所以不会造成数据问题
  2. AQS 的共享模式参照 Semaphore 实现

但遗憾的是,这种改造是有问题的。

如果你看过 ConditionObject 的实现,会发现其内部实现完全没有考虑并发风险,因为其实际场景都是先获取到了锁再执行条件队列(condition queue)的操作(入队、出队),若想真正实现一个多线程版 ReentrantLock,我们还需要重新写一个并发安全的 Condition 实现来替代 AQS 自身的 ConditionObject。

有个佐证就是,JDK 提供的共享锁实现中 newCondition 方法(继承自 java.util.concurrent.locks.Lock 接口)均为不支持,比如 ReentrantReadWriteLock 的 read 锁

public Condition newCondition() {
  throw new UnsupportedOperationException();
}

另外考虑

  1. 是否只能通过等待时间来控制生产速度 or 运送速度?如何设计?
  2. 有些地方是否可以换 CountdownLatch / Semaphore / CyclicBarrier 实现?
  3. 冷库这块是否充分实现了题意?若没有,如何改?

剑仙三尺剑,杯中二两酒。齐活。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 224,176评论 6 522
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 95,928评论 3 402
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 171,252评论 0 366
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 60,700评论 1 300
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 69,717评论 6 399
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 53,231评论 1 314
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 41,608评论 3 428
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 40,572评论 0 279
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 47,117评论 1 324
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 39,137评论 3 344
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 41,280评论 1 354
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,908评论 5 350
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 42,597评论 3 336
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 33,067评论 0 25
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 34,202评论 1 275
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 49,784评论 3 380
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 46,308评论 2 365