深入Java多线程之JUC锁(一)--互斥锁ReentrantLock

文章结构:

1)ReentrantLock介绍
2)使用demo
3)形象说明其原理
4)源码阅读分析
5)思考几个问题

一、ReentrantLock介绍:

(1)简介:

ReentrantLock是一个可重入的互斥锁,又被称为“独占锁”。

ReentrantLock锁在同一个时间点只能被一个线程锁持有;而可重入的意思是,ReentrantLock锁,可以被单个线程多次获取。

ReentrantLock分为“公平锁”和“非公平锁”。它们的区别体现在获取锁的机制上是否公平。“锁”是为了保护竞争资源,防止多个线程同时操作线程而出错,ReentrantLock在同一个时间点只能被一个线程获取(当某线程获取到“锁”时,其它线程就必须等待)。

(2)对比synchronized归纳为三个方面:

1)可重入互斥锁:

可重入:意思就是可被单个线程多次获取。
synchronized也是可重入的

2)公平与非公平:

synchronized是非公平的。
ReentrantLock有两种模式,默认是非公平。

在“公平锁”的机制下,线程依次排队获取锁;而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁。

3)等待可中断

也就是持有锁的线程长期不释放时,正等待的线程可选择放弃。不像synchronized会导致死锁,然后交给JVM处理死锁。

二、使用demo:

我们用最经典的生产者消费者代码来演示ReentrantLock。

(1)纯ReentrantLock使用:

定义仓库

public class Depot {

    private int size;        // 仓库的实际数量
    private Lock lock;        // 独占锁

    public Depot() {
        this.size = 0;
        this.lock = new ReentrantLock();
    }

    public void produce(int val) {
        lock.lock();
        try {
            size += val;
            System.out.println("生产者"+Thread.currentThread().getName()+" 生产"+val+"数量"+ " 仓库数量变为:"+ size);
        } finally {
            lock.unlock();
        }
    }

    public void consume(int val) {
        lock.lock();
        try {
            size -= val;
            System.out.println("消费者"+Thread.currentThread().getName()+" 消费"+val+"数量"+ " 仓库数量变为:"+ size);
        } finally {
            lock.unlock();
        }
    }
};

定义生产者消费者

public class Customer {
    private Depot depot;

    public Customer(Depot depot) {
        this.depot = depot;
    }

    // 消费产品:线程从仓库中消费产品。一个线程代表一个消费者
    public void consume(final int val) {
        new Thread() {
            public void run() {
                depot.consume(val);
            }
        }.start();
    }
}
public class Producer {
    private Depot depot;

    public Producer(Depot depot) {
        this.depot = depot;
    }

    // 消费产品:线程向仓库中生产产品。一个线程代表一个生产者
    public void produce(final int val) {
        new Thread() {
            public void run() {
                depot.produce(val);
            }
        }.start();
    }
}

测试

public class Test {
    public static void main(String[] args) {
        Depot depot = new Depot();
        Producer producer = new Producer(depot);
        Customer customer = new Customer(depot);

        producer.produce(70);
        producer.produce(120);
        customer.consume(60);
        customer.consume(150);
        producer.produce(130);
    }
}

运行结果:

生产者Thread-0 生产70数量 仓库数量变为:70
消费者Thread-3 消费150数量 仓库数量变为:-80
生产者Thread-4 生产130数量 仓库数量变为:50
消费者Thread-2 消费60数量 仓库数量变为:-10
生产者Thread-1 生产120数量 仓库数量变为:110

分析:
1)Depot 是个仓库。可以通过生产者去生产货物,通过消费者去消费货物。
2)通过ReentrantLock独占锁,可以锁住仓库,每个线程的修改都是原子性质,并且是有序的。
3)但是我们可以发现一个问题。现实中,仓库的容量不可能为负数,仓库的容量是有限制的。所以基于这样的生产消费模型,有实际业务需求,我们又需要利用ReentrantLock去设计。

(2)结合Condition条件队列解决我们的实际需求:

public class Depot {
    private int       capacity;    // 仓库的容量
    private int       size;        // 仓库的实际数量
    private Lock      lock;        // 独占锁
    private Condition fullCondtion;            // 生产条件
    private Condition emptyCondtion;        // 消费条件

    public Depot(int capacity) {
        this.capacity = capacity;
        this.size = 0;
        this.lock = new ReentrantLock();
        this.fullCondtion = lock.newCondition();
        this.emptyCondtion = lock.newCondition();
    }

    public void produce(int val) {
        lock.lock();
        try {
            // left 表示“想要生产的数量”(有可能生产量太多,需多此生产)
            int left = val;
            while (left > 0) {
                // 库存已满时,等待“消费者”消费产品。
                while (size >= capacity) {
                    fullCondtion.await();
                }
                // 获取“实际生产的数量”(即库存中新增的数量)
                // 如果“库存”+“想要生产的数量”>“总的容量”,则“实际增量”=“总的容量”-“当前容量”。(此时填满仓库)
                // 否则“实际增量”=“想要生产的数量”
                int inc = (size + left) > capacity ? (capacity - size) : left;
                size += inc;
                left -= inc;
                System.out.println(
                        "生产者" + Thread.currentThread().getName() + " 本线程想生产" + val + " 本次生产后还要生产" + left + " 仓库增加" + inc + " 仓库目前实际数量"
                                + size);
                // 通知“消费者”可以消费了。
                emptyCondtion.signal();
            }
        } catch (InterruptedException e) {

        } finally {
            lock.unlock();
        }
    }

    public void consume(int val) {
        lock.lock();
        try {
            // left 表示“客户要消费数量”(有可能消费量太大,库存不够,需多此消费)
            int left = val;
            while (left > 0) {
                // 库存为0时,等待“生产者”生产产品。
                while (size <= 0) {
                    emptyCondtion.await();
                }
                // 获取“实际消费的数量”(即库存中实际减少的数量)
                // 如果“库存”<“客户要消费的数量”,则“实际消费量”=“库存”;
                // 否则,“实际消费量”=“客户要消费的数量”。
                int dec = (size < left) ? size : left;
                size -= dec;
                left -= dec;
                System.out.printf(
                        "消费者" + Thread.currentThread().getName() + " 本线程想消费" + val + " 本次消费后还要消费" + left + " 仓库减少" + dec + " 仓库目前实际数量"
                                + size);
                fullCondtion.signal();
            }
        } catch (InterruptedException e) {
        } finally {
            lock.unlock();
        }
    }

    public String toString() {
        return "capacity:" + capacity + ", actual size:" + size;
    }
}

生产者消费者:

public class Customer {
    private Depot depot;

    public Customer(Depot depot) {
        this.depot = depot;
    }

    // 消费产品:线程从仓库中消费产品。一个线程代表一个消费者
    public void consume(final int val) {
        new Thread() {
            public void run() {
                depot.consume(val);
            }
        }.start();
    }
}
public class Producer {
    private Depot depot;

    public Producer(Depot depot) {
        this.depot = depot;
    }

    // 消费产品:线程向仓库中生产产品。一个线程代表一个生产者
    public void produce(final int val) {
        new Thread() {
            public void run() {
                depot.produce(val);
            }
        }.start();
    }
}

测试:

public class Test {
    public static void main(String[] args) {
        Depot depot = new Depot(100);
        Producer producer = new Producer(depot);
        Customer customer = new Customer(depot);

        producer.produce(70);
        producer.produce(120);
        customer.consume(60);
        customer.consume(150);
        producer.produce(130);
    }
}

结果:

生产者Thread-0 本线程想生产70 本次生产后还要生产0 仓库增加70 仓库目前实际数量70
消费者Thread-2 本线程想消费60 本次消费后还要消费0 仓库减少60 仓库目前实际数量10
生产者Thread-4 本线程想生产130 本次生产后还要生产40 仓库增加90 仓库目前实际数量100
消费者Thread-3 本线程想消费150 本次消费后还要消费50 仓库减少100 仓库目前实际数量0
生产者Thread-4 本线程想生产130 本次生产后还要生产0 仓库增加40 仓库目前实际数量40
消费者Thread-3 本线程想消费150 本次消费后还要消费10 仓库减少40 仓库目前实际数量0
生产者Thread-1 本线程想生产120 本次生产后还要生产20 仓库增加100 仓库目前实际数量100
消费者Thread-3 本线程想消费150 本次消费后还要消费0 仓库减少10 仓库目前实际数量90
生产者Thread-1 本线程想生产120 本次生产后还要生产10 仓库增加10 仓库目前实际数量100

从结果来看,解决了仓库的容量不可能为负数,仓库的容量是有限制的这两个实际业务需求。

(3)对比一下使用synchronized如何解决我们的这两个实际业务需求。

三、形象说明其原理:

背景:一群人排队领钱。一个家庭的人同属一个线程。

(1)公平锁:

1)非重入情况

在这里插入图片描述

2)重入情况:

一家人,就是同一个线程,那就优先。


在这里插入图片描述

(2)非公平锁:

直接尝试插队。


在这里插入图片描述

尝试失败,那就去队尾


在这里插入图片描述

(3)如果加入条件队列呢?

1)公平锁:

在这里插入图片描述

2)非公平锁:

在这里插入图片描述

(4)如果多个条件队列呢?

在这里插入图片描述

1)公平锁下:

在这里插入图片描述

然后变成这样的队列


在这里插入图片描述

2)非公平锁下:

在这里插入图片描述

去争夺锁的占有权


在这里插入图片描述

如果成功占有


在这里插入图片描述

四、源码阅读分析:

//实现Lock接口
public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    /** Synchronizer providing all implementation mechanics */
    // 同步队列
    private final Sync sync;

    //抽象同步器Sync继承AQS,子类可以非公平或者公平锁
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;
        // 获取锁
        abstract void lock();
        // 非公平锁的尝试获取
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            // 获取状态
            int c = getState();
            if (c == 0) {
            // 跟公平锁获取的区别时,这里少了判断队列是否为空的函数hasQueuedPredecessors
            // 即不管排队队列是否为空,该线程都将直接尝试获取锁
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //如果c != 0,表示该锁已经被线程占有,则判断该锁是否是当前线程占有,若是设置state,否则直接返回false
            else if (current == getExclusiveOwnerThread()) {// 当前线程拥有该锁
                int nextc = c + acquires;// 增加重入次数
                if (nextc < 0) // 超过了int的表示范围
                    throw new Error("Maximum lock count exceeded");
                // 设置状态
                setState(nextc);
                return true;
            }
            return false;
        }
         //tryRelease方法获取状态并减去releases的值,如果为0表示锁完全被释放
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            //只有持有锁的线程才能进行释放动作
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                //锁被释放,独占线程设置为null
                setExclusiveOwnerThread(null);
            }
            //更改状态
            setState(c);
            return free;
        }
        // 判断资源是否被当前线程占有
        protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
        // 新生一个条件
        final ConditionObject newCondition() {
            return new ConditionObject();
        }

        // 返回资源的占用线程
        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }
        // 返回状态
        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }
        // 资源是否被占用
        final boolean isLocked() {
            return getState() != 0;
        }

        /**
         * Reconstitutes the instance from a stream (that is, deserializes it).
         */
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

    // 非公平锁
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

       // 获得锁
        final void lock() {
        // 比较并设置状态成功,状态0表示锁没有被占用
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                // 锁已经被占用,或者set失败;以独占模式获取对象
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

    // 公平锁
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
        // 以独占模式获取对象,忽略中断
            acquire(1);
        }

        // 尝试公平获取锁
        protected final boolean tryAcquire(int acquires) {
        // 获取当前线程
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {// 状态为0,锁没被占用
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    // 设置当前线程独占
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //如果c != 0,表示该锁已经被线程占有,则判断该锁是否是当前线程占有,若是设置state,否则直接返回false
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)// 超过了int的表示范围
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

   
    public ReentrantLock() {
    // 默认非公平策略
        sync = new NonfairSync();
    }

    
    public ReentrantLock(boolean fair) {
        //可以传递参数确定采用公平策略或者是非公平策略,参数为true表示公平策略
        sync = fair ? new FairSync() : new NonfairSync();
    }

    //委托给Sync同步器
    public void lock() {
        sync.lock();
    }

   //委托给Sync同步器
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    //委托给Sync同步器
    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }

    //委托给Sync同步器
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

   //委托给Sync同步器
    public void unlock() {
        sync.release(1);
    }
    //委托给Sync同步器
    public Condition newCondition() {
        return sync.newCondition();
    }
//委托给Sync同步器
    public int getHoldCount() {
        return sync.getHoldCount();
    }

    //委托给Sync同步器
    public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
    }

    //委托给Sync同步器
    public boolean isLocked() {
        return sync.isLocked();
    }

    //委托给Sync同步器
    public final boolean isFair() {
        return sync instanceof FairSync;
    }

    //委托给Sync同步器
    protected Thread getOwner() {
        return sync.getOwner();
    }

   //委托给Sync同步器
    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

  //委托给Sync同步器
    public final boolean hasQueuedThread(Thread thread) {
        return sync.isQueued(thread);
    }

     //委托给Sync同步器
    public final int getQueueLength() {
        return sync.getQueueLength();
    }

  //委托给Sync同步器
    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }

    //委托给Sync同步器,管理等待队列
    public boolean hasWaiters(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
    }

   //委托给Sync同步器,管理等待队列
    public int getWaitQueueLength(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
    }

    //委托给Sync同步器,管理等待队列
    protected Collection<Thread> getWaitingThreads(Condition condition) {
        if (condition == null)
            throw new NullPointerException();
        if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
            throw new IllegalArgumentException("not owner");
        return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
    }

    
    public String toString() {
        Thread o = sync.getOwner();
        return super.toString() + ((o == null) ?
                                   "[Unlocked]" :
                                   "[Locked by thread " + o.getName() + "]");
    }
}

五、思考几个问题

(1)关键的CAS方法的设计

protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

对应unsafe类compareAndSwapInt方法的四个参数分别是:
1)var1:需要改变的对象
2)var2:偏移量(即之前求出value offset的值--在本对象中变量位置,对象中要修改变量的偏移量)
3)var3:期待的值
4)var4:更新后的值
如果this.value的值与expect这个值相等,那么则将value修改为update这个值,并返回一个true。
如果调用该方法时,this.value的值与expect这个值不相等,那么不做任何操作,并返回一个false。

(2)为什么要自己产生中断?

就是线程等候可以自己中断也可以别人中断。
即使在等待队列中,线程在阻塞中被中断唤醒而获得CPU的执行权,但是如果此线程前面还有线程,根据公平性,依旧不能拿到锁,再次阻塞。所以要维持中断状态,重新产生中断。

(3)理解下流程图中所说的waitStatus

1)设计原因:

如果获取锁失败,判断是否应该挂起当前线程,可见,挂起线程是有条件的。
这是AQS静态内部类Node的成员变量,用于记录Node对应的线程等待状态。

2)waitSatus值含义:

1- SIGNAL

从前面的代码状态转换可以看得出是前面有线程在运行,需要前面线程结束后,调用unpark()方法才能激活自己,值为:-1

2- CANCELLED

当AQS发起取消或fullyRelease()时,会是这个状态。值为1,也是几个状态中唯一一个大于0的状态,所以前面判定状态大于0就基本等价于是CANCELLED的意思。

通俗来说,因超时或中断,节点会被设为取消状态,被取消状态则不该去竞争锁,只能保持取消状态不变,不能转换状态。这个状态的节点会踢出队列,被GC收回。

3- CONDITION

线程基于Condition对象发生了等待,进入了相应的队列,自然也需要Condition对象来激活,值为-2。

4- PROPAGATE

读写锁中,当读锁最开始没有获取到操作权限,得到后会发起一个doReleaseShared()动作,内部也是一个循环,当判定后续的节点状态为0时,尝试通过CAS自旋方式将状态修改为这个状态,表示节点可以运行。

5- 状态0

初始化状态,也代表正在尝试去获取临界资源的线程所对应的Node的状态。


问题待续......

更多文章点此处

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,039评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,223评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,916评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,009评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,030评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,011评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,934评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,754评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,202评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,433评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,590评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,321评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,917评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,568评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,738评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,583评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,482评论 2 352

推荐阅读更多精彩内容