Semaphore源码解读

关键字:AQS、自旋、CAS、LockSupport、CLH阻塞队列

1. AQS

Semaphore的相关操作主要由其内部成员变量sync完成,sync有两种,分别是支持公平锁的FairSync和不公平锁的NonfairSync,两种都是基于AQS扩展而来。我们在声明一个信号量对象的时候,sync便在构造函数里被初始化。这里先简单介绍以下AQS,后续会出一篇文章详细解读。

AQS全名为AbstractQueuedSynchronizer,即抽象队列同步器,是并发包作者Doug Lea为了解决在Java 1.5之前synchronized性能问题而开发的并发框架,主要实现有ReentrantLock,ReentrantReadWriteLock, CountDownLatch, Semaphore等,和synchronized对标的便是ReentrantLock。

AQS内维护了一个volatile类型的int 成员变量state,以及一个双向CLH队列,线程尝试修改state属性值,修改成功便表明成功获取锁,否则进入CLH队列并阻塞,直到持有锁的线程释放,并唤醒CLH队列中的线程。

2. 初始化信号量

我们在初始化Semaphore的时候,便指定了state的值,表明可以获取的最大信号量,线程尝试获取信号量即对state减去相应的值,修改成功便表明成功获取信号量,否则进入CLH队列并阻塞,直到持有信号量的线程释放,state加1,并唤醒CLH队列中的全部线程。

sync由构造函数进行初始化


//构造permits个数量的不公平锁
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

//根据fair构造permits个数量的公平&不公平锁
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
    

3. 加锁操作(获取信号量)

Semaphore提供了8中常用的加锁操作,可分为三大类,即获取一定数量的共享锁&是否支持中断&获取不到是否阻塞,以下8中操作便是其两两组合。

//通过sync获取共享锁(可中断)
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

//通过sync获取共享锁(不可中断)
public void acquireUninterruptibly() {
    sync.acquireShared(1);
}

//尝试获取1个共享锁,获取不到则立刻返回false,不进行阻塞    
public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}
    
//尝试获取1个共享锁,获取不到则等待timeout时间后返回false  
public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
 
//尝试获取permits个共享锁,获取不到则立刻返回false,不进行阻塞   
public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;
}

//尝试获取permits个共享锁,获取不到则等待timeout时间后返回false  
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

先来看一下获取可中断锁


public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //线程是否被中断,中断则抛出中断异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 1.首先尝试获取共享锁
    // 2.获取成功则进行相应的业务逻辑,获取失败进入阻塞队列
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

获取信号量锁时又分为公平锁和不公平锁,以下分别是两种锁是如何获取的


static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        //自旋
        for (;;) {
            //阻塞队列中是否已经有节点在等待,如有则直接返回获取失败
            //这个判断就是和不公平锁的区别,不公平锁不管队列中是否有节点等待,上来就抢锁
            if (hasQueuedPredecessors())
                return -1;
            //通过CAS设置state
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        //调用父方法
        /*
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
        */
        return nonfairTryAcquireShared(acquires);
    }
}

获取锁失败请求入队

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    //将当前节点放入阻塞队列
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        // 自旋
        for (;;) {
            //获取当前节点的上一节点
            final Node p = node.predecessor();
            //上一节点如果是队头
            if (p == head) {
                //再次尝试获取args数量的共享锁,r为剩余的共享数量
                int r = tryAcquireShared(arg);
                //获取成功
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            //上一节点如果不是队头,即阻塞队列中已经有节点在等待或者是队头但获取锁失败则执行以下逻辑
            //1.将当前节点的有效前驱节点标示为可唤醒状态
            //2.将当前节点阻塞,等待被唤醒或中断
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
    }
//获取当前节点的上一个节点
final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
        throw new NullPointerException();
    else
        return p;
}


/*
 *
 * pred 上一个节点
 * node 当前节点
 * CANCELLED =  1;SIGNAL    = -1;CONDITION = -2;PROPAGATE = -3;
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //上一个节点已经处于可唤醒状态则直接返回
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
         //节点已失效,将失效节点的前驱节点赋值为当前节点的前驱节点,直到前驱节点不存在已经取消的情况
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        //通过CAS将有效的前驱节点的状态修改为可唤醒状态 
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}


private static final boolean compareAndSetWaitStatus(Node node,
                                                     int expect,
                                                     int update) {
    return unsafe.compareAndSwapInt(node, waitStatusOffset,
                                    expect, update);
}

private final boolean parkAndCheckInterrupt() {
    //线程阻塞在这里
    LockSupport.park(this);
    //线程被唤醒时从这里开始执行
    return Thread.interrupted();
}

现在来对比分析一下尝试获取(tryAcquire)、不可中断、最大尝试时间分别是如何处理的

tryAcquire

//尝试获取,获取不到直接返回了
public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
}
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

设置最大获取时间

private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            //注意这里的队头只是一个虚拟节点,真正存放线程的节点为队列的第二个节点,以下提到的队头同样
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                //区别在这里,线程只会park一定时间,过期后再次尝试获取失败便直接返回
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

不可中断锁


private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //和可中断锁区别在这里,可中断这里直接抛出异常了,但是不可中断锁只是设置一个值便又去获取了
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

4. 解锁操作

//调用sync的releaseShared方法进行解锁,每次解锁数量为1
public void release() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
//通过自旋+CAS将共享锁的数量加回去
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        //CLH队列不为空,即有线程在等待获取锁
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                //唤醒头节点的next节点
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        //刚才被唤醒的线程已将head设置为head的下一节点,所以这里不会相等
        //所以这里一般会多唤醒一次,假如多唤醒的节点获取到锁,重复此逻辑,否则多唤醒的节点会继续阻塞
        if (h == head)                   // loop if head changed
            break;
    }
}

这里再贴一下节点被唤醒时的逻辑

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
            //线程被唤醒后从这里再次执行    
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,463评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,868评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,213评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,666评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,759评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,725评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,716评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,484评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,928评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,233评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,393评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,073评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,718评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,308评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,538评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,338评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,260评论 2 352

推荐阅读更多精彩内容