Java并发编程学习(三)——ReentrantLock

一.synchronized存在的问题

Java并发编程学习(二)——synchronized关键字一文中,我们重点分析了synchronized的用法和实现,我们知道synchronized可以解决多线程时的内存可见性问题,保证线程安全,并且使用非常简单,但是synchronized同样存在一些问题:

  • 不可中断:当两个线程竞争进入临界区时,如果一个线程先进入,另一个线程就需要不停的尝试争夺锁,在这个过程中不响应中断。
  • 不公平:当一个获得锁的线程释放锁的时候,其他线程不管先来后到都有可能获得锁,这就有可能造成先开始等待的线程一直得不到执行
  • 读写不分离:有的时候,某个临界区内只是读取共享状态,当获得锁的线程进入只读的临界区时,其他线程依赖不能进入,要一直等到当前线程释放锁,这就造成不必要的等待,降低吞吐量
  • 不灵活:synchronized只能控制一个代码块或一个方法,有时,我们可能想在一个方法中加锁,在另外一个方法中释放锁,这时,synchronized就无能为力了。

jdk1.5中引入了著名的java.util.concurrent包,其中有一个子包locks,提供了更为强大的锁机制,可以解决上面所有问题。

今天我们先来看一下locks包中的ReentrantLock(可重入锁)。

二.ReentrantLock的好处

俗话说:是骡子是马,拉出来溜溜。ReentrantLock真的能提供比synchronized更强大的功能吗?我们拭目以待。

我们先来看一个早点摊卖油条的需求:老王开了一个早点摊卖油条,每天街坊邻居都排着队买,为了简化逻辑,我们假设老王每天只卖100根油条,老王每天起早炸好100根油条,然后开始卖。

由于油条是有限的,但是买的人很多,因此必须对油条这个共享资源进行同步操作,否则,就可能出现一根油条同时卖给两个人的情况。

买油条

问题似乎不难,使用synchronized就可以解决这个问题了,代码如下:

// 油条
class FriedBreadStick {

}
class FriedBreadStickQueue {
    private Queue<FriedBreadStick> stickQueue = new ArrayBlockingQueue<FriedBreadStick>(10);

    // 炸好油条后放入
    public void putStick(FriedBreadStick stick) {
        if (stickQueue.size() < 10) {
            stickQueue.add(stick);
        }
    }

    // 有人买油条时获取油条
    public synchronized List<FriedBreadStick> getStick(int num) {
        List<FriedBreadStick> list = new ArrayList<FriedBreadStick>();
        if (stickQueue.size() >= num) {
            for (int i = 0; i < num; i++) {
                list.add(stickQueue.poll());
            }
        }
        try {
            Thread.sleep(200); // 造成其他线程等待的情况
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return list;
    }

}

class Customer extends Thread {
    private FriedBreadStickQueue queue;

    public Customer(String name, FriedBreadStickQueue queue) {
        super(name);
        this.queue = queue;
    }

    @Override
    public void run() {
        List<FriedBreadStick> list = queue.getStick(2); // 买两根油条
        if (list.size() == 2) {
            System.out.println(Thread.currentThread().getName() + " buy 2 friedBreadStick");
        } else {
            System.out.println(Thread.currentThread().getName() + " buy no friedBreadStick");
        }
    }
}

public class BreakfastShop {

    public static void main(String[] args) {
        FriedBreadStickQueue queue = new FriedBreadStickQueue();
        for (int i = 0; i < 10; i++) {
            queue.putStick(new FriedBreadStick());
        }

        for (int i = 0; i < 6; i++) {
            new Customer("customer" + i, queue).start();
            try {
                Thread.sleep(10); // 每个线程启动后,等待10ms,造成不同线程等待时间不同的情况
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}

上面代码中,我们使用synchronized修饰了卖油条的方法,使卖油条的过程是同步的,同一时刻,只能由一个人买油条。为了造成线程等待的情况,我们故意使每次卖油条的过程持续200ms以上,并且买油条的人每隔10ms多一个,这样,就会造成线程等待,而且不同线程等待的时候不同。

执行结果如下:

customer0 buy 2 friedBreadStick
customer5 buy 2 friedBreadStick
customer4 buy 2 friedBreadStick
customer3 buy 2 friedBreadStick
customer2 buy 2 friedBreadStick
customer1 buy no friedBreadStick

我们可以看到,有5个人买到了油条,没有造成一根油条卖给多个人的情况,但是customer1明显来的更早,却没有买到油条,反而被来的最晚的customer抢去了,这明显不公平。

我们再来看看使用ReentrantLock的情况。

class FriedBreadStickQueue {
    private Queue<FriedBreadStick> stickQueue = new ArrayBlockingQueue<FriedBreadStick>(10);

    private final Lock lock = new ReentrantLock(true); // 参数为true表示希望获得一个公平锁

    // 炸好油条后放入
    public void putStick(FriedBreadStick stick) {
        if (stickQueue.size() < 10) {
            stickQueue.add(stick);
        }
    }

    // 有人买油条时获取油条
    public List<FriedBreadStick> getStick(int num) {
        List<FriedBreadStick> list = new ArrayList<FriedBreadStick>();
        lock.lock();
        if (stickQueue.size() >= num) {
            for (int i = 0; i < num; i++) {
                list.add(stickQueue.poll());
            }
        }
        try {
            Thread.sleep(200); // 造成其他线程等待的情况
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        lock.unlock();
        return list;
    }

}

再次执行,结果如下:

customer0 buy 2 friedBreadStick
customer1 buy 2 friedBreadStick
customer2 buy 2 friedBreadStick
customer3 buy 2 friedBreadStick
customer4 buy 2 friedBreadStick
customer5 buy no friedBreadStick

可以看到,顾客们按照先来后到的顺序买油条,这样就公平了,大家都没有怨言。

不过值得注意的是,ReentrantLock并不能完全保证线程执行的公平性,但是会尽量保证。

我们再来看另一个例子,我们经常会去银行取钱,当有多个人进行取钱时,就需要排队,同一个窗口同一时间只能服务以为顾客,因此柜台窗口成为了共享资源,对共享资源的获取需要进行同步:

银行柜台
// 银行业务
class Business {

    public synchronized void getMoney() throws InterruptedException {
        System.out.println(Thread.currentThread().getName() + " begin getMoney");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " was interrupted during get Money");
            return;
        }
        System.out.println(Thread.currentThread().getName() + " finish getMoney");
    }
}

class Customer extends Thread {
    private Business business;

    public Customer(String name, Business business) {
        super(name);
        this.business = business;
    }

    @Override
    public void run() {
        try {
            business.getMoney();
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " was interrupted");
        }
    }
}
public class Bank {

    public static void main(String[] args) {
        Business business = new Business();
        Customer customer1 = new Customer("customer1", business);
        Customer customer2 = new Customer("customer2", business);
        customer1.start();

        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        customer2.start(); // 顾客1开始取钱100ms后顾客2开始排队等待
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        customer2.interrupt(); // 顾客2排队等待100ms后,被打断
    }
}

我们这个例子主要是为了测试线程在等待锁的过程中能否响应中断,运行结果如下:

customer1 begin getMoney
customer1 finish getMoney
customer2 begin getMoney
customer2 was interrupted during get Money

在customer2 等待100ms时,就已经收到了中断信号,此时,customer1仍然没有完成取钱的操作,但是customer2 依然执着的等待着锁,并没有响应中断,直到获得锁之后,才响应。

下面修改一下程序:

class Business {
    private Lock lock = new ReentrantLock();

    public void getMoney() throws InterruptedException {
        lock.lockInterruptibly();
        System.out.println(Thread.currentThread().getName() + " begin getMoney");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            System.out.println(Thread.currentThread().getName() + " was interrupted");
            return;
        }
        lock.unlock();
        System.out.println(Thread.currentThread().getName() + " finish getMoney");
    }
}

注意,我们这次使用了Lock,并且在加锁时使用了lockInterruptibly方法,运行结果如下:

customer1 begin getMoney
customer2 was interrupted
customer1 finish getMoney

显然,在customer2 获得锁之前,就响应了中断,不再竞争锁了。

以上,我们看了两个例子,一个是有关锁获取时公平性的,一个是有关竞争锁时是否响应中断的,可以看出,ReentrantLock相比synchronized确认可以更加灵活:

  1. 提供了公平锁和非公平锁的选项
  2. 提供了可中断加锁以及不可中断加锁的选项
  3. 此外,ReentrantLock还可以试探性的获取锁,如果获取成功,则进入临界区,否则可以自由选择是继续等待还是放弃

可见,synchronized之于ReentrantLock,就相当于匕首与瑞士军刀,一个锋利无比,一个功能多样;也相当于沙县小吃和自助餐,一个简单美味,一个玲琅满目,任君挑选。

三.具体实现

下面我们分析一下ReentrantLock的核心实现逻辑。

通过查看源代码,我们可以看到,ReentrantLock基本上是在AbstractQueuedSynchronizer类的基础上实现的,AbstractQueuedSynchronizer是真正的幕后英雄。

AbstractQueuedSynchronizer提供了一个支持FIFO(先进先出)等待队列的阻塞锁同步框架,我们借着分析ReentrantLock的过程也顺便了解AbstractQueuedSynchronizer

ReentrantLock类中定义了一个内部类Syn,它继承自AbstractQueuedSynchronizer,并添加了一些支持锁的常用操作,如lock方法(加锁),tryRelease(释放锁)方法等。

Syn根据是否公平,又有不同的子类,包括FairSync(用于实现公平锁)、NonfairSync(用于实现非公平锁)。通过ReentrantLock类构造函数中的boolean fair参数,如果fair为true,则将ReentrantLock实例中的sync初始化为FairSync,否则设置为NonfairSync

1.lock()&unlock()

这两个方法是ReentrantLock中最基础的方法,我们先来看一下非公平的实现:

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

这里涉及到三个方法:compareAndSetStatesetExclusiveOwnerThreadacquire,这三个方法都是AbstractQueuedSynchronizer中的方法。

compareAndSetState:简称cas,是一种无锁算法,底层对应的是一个CPU指令,该指令接收3个参数(内存地址、预期值、新值),执行过程为判断内存地址的值是否是预期值,如果是的话,就将该内存地址的值更新为新值,否则什么都不做。

setExclusiveOwnerThread:设置当前锁被哪个线程占有。

acruire:具体实现如下:

// 获得锁
public final void acquire(int arg) {
    // 如果tryAcquire成功,则直接返回
    // 如果tryAcquire失败,则会执行addWaiter将当前线程入队,然后执行acquireQueued,返回true说明当前线程被唤醒了,已经拿到了锁,但是需要被中断,返回false说明已经成功获得了锁,并且没有被中断
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();  // 执行中断
}

// 尝试获得锁
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

// 非公平获得锁过程
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState(); // 获取当前同步状态
    if (c == 0) { // 0表示当前锁未被占有
        if (compareAndSetState(0, acquires)) { // 执行cas操作抢占锁
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) { // 锁被当前线程占有,可重入
        int nextc = c + acquires;
        if (nextc < 0) // 溢出,超过int型的容量限制
            throw new Error("Maximum lock count exceeded");
        setState(nextc); // 将status更新
        return true;
    }
    return false;
}


// 将当前线程加入到等待队列,队列基于链表实现
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        // 如果可以一次操作成功,则操作完成后立即返回
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // compareAndSetTail失败,说明存在竞争,重新执行插入(无限循环尝试)
    enq(node);
    return node;
}

// 线程将会被阻塞在这里,直到获得锁时结束,返回的boolean值表示是否需要被中断
final boolean acquireQueued(final Node node, int arg) {
    try {
        boolean interrupted = false; // 默认没有中断
                
        for (;;) {
            final Node p = node.predecessor();
            // 前继节点是head,并且当前线程成功获得锁,说明前继节点已经释放了锁,可以出队了
            if (p == head && tryAcquire(arg)) {
                setHead(node); // 将当前节点设置为head
                p.next = null; // help GC
                return interrupted;
            }
            // 获取锁失败时需要被阻塞,并且检查当前线程需要被中断
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } catch (RuntimeException ex) {
        cancelAcquire(node);
        throw ex;
    }
}

上面涉及到多个方法,逻辑较为复杂,简单总结一下:当一个线程尝试获得锁时,先执行一次cas指令,如果成功,则获得锁,如果失败,则进入等待队列,直到被唤醒,在等待队列中自旋等待时,不响应中断,但是会被是否有中断记录下来,等拿到锁后再响应中断。

我们再看unlock方法。

public void unlock() {
    sync.release(1);
}

只有一行代码,再看看具体实现:

public final boolean release(int arg) {
    if (tryRelease(arg)) { // 释放成功
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);  // 唤醒头节点的后继节点
        return true;
    }
    return false;
}

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 当前线程不是占有锁的线程,理论上不应该出现这种情况,抛异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) { // c=0说明此次释放之后,已经没有线程占有锁了
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

上面的lock与unlock我们看的都是非公平的实现,下面我们再来看一下公平锁的实现:

final void lock() {
    acquire(1);
}

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (isFirst(current) &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

我们可以看到,公平锁与非公平锁的区别其实只有一句:

isFirst(current)

公平锁,只有当isFirst返回true,并且cas操作成功,才会获得锁。看来公平性的关键就在这里了,我们再来看一下isFirst的实现。

final boolean isFirst(Thread current) {
    Node h, s;
    // head为空说明队列为空,当前线程是第一个节点
    // head不为空,但是current是head的第一个后继,也说明current是实际有效的第一个节点(head中的thread为null)
    return ((h = head) == null ||
            ((s = h.next) != null && s.thread == current) ||
            fullIsFirst(current));
}

// 从tail开始一直往前找,直到节点的thread为nul,如果最终发现的目标为current,说明current为第一个节点
final boolean fullIsFirst(Thread current) {
    Node h, s;
    Thread firstThread = null;
    if (((h = head) != null && (s = h.next) != null &&
         s.prev == head && (firstThread = s.thread) != null))
        return firstThread == current;
    Node t = tail;
    while (t != null && t != head) {
        Thread tt = t.thread;
        if (tt != null)
            firstThread = tt;
        t = t.prev;
    }
    return firstThread == current || firstThread == null;
}

整理一下上面的逻辑:当当前线程处于队列当中第一个实际有效的线程时,isFirst才会返回true,而队列是按照FIFO来排队的,这样就保证了锁的公平性。而非公平的锁,每个线程都在不停的竞争锁,一旦acs操作成功,不管该线程处于队列中哪个位置,都可以获得锁。

2.lockInterruptibly

我们再来看一下lockInterruptibly,它支持线程在等待锁的过程中放弃等待,响应中断,我们看一下具体实现:

public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted()) // 如有中断,立即抛出中断异常
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

// 可中断的锁获取过程
private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()) //注意这里,和非中断模式的区别,非中断模式只是标记,这里会break
                break;
        }
    } catch (RuntimeException ex) {
        cancelAcquire(node);
        throw ex;
    }
    // 退出锁竞争
    cancelAcquire(node);
    // 抛出中断异常
    throw new InterruptedException();
}

ReentrantLock类代码较多,我们这里重点分析了公平锁、非公平锁、中断与非中断锁的实现逻辑,读者有兴趣也可以阅读更多的代码逻辑哈。在下一篇文章中,我们将分析ReentrantReadWriteLock读写锁,敬请期待!

参考资料

本文已迁移至我的博客:http://ipenge.com/1252.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,670评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,928评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,926评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,238评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,112评论 4 356
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,138评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,545评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,232评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,496评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,596评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,369评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,226评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,600评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,906评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,185评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,516评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,721评论 2 335

推荐阅读更多精彩内容