Java并发之semaphore源码解读

semaphore中文翻译为信号标,有些大哥习惯叫它信号量。其本质我的理解就是一个许可发放器。
正值疫情期间公园限流就是一个很好的场景来解释这件事。为了落实疫情防控的有关工作,公园管理员A(大哥大)做了100个小纸牌,给了管理员B,用于发放给游客进入公园使用,游客进公园需要持该小纸牌进入并游玩,并要求游客出门口时候要交回这个小纸,很快100个就发往完了,这时候来了第101位游客,结果发现没有纸牌可用,因此就给他搬了个马扎坐在了阴凉处歇会喝喝茶。有点像限流算法里面令牌桶的赶脚,这不是重点。
下面看下semaphore的具体实现吧

###首先看到的是 sync类这是并发包里面很多类都有的各自实现,
##李大爷通过模板模式将公用部分封装到AbstractQueuedSynchronizer 
##其他的竞态抢夺策略则灵活的交给各个并发类进行实现
abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;
        ##初始化令牌数量,对应案例中的100个小纸牌被初始化出来
        
        Sync(int permits) {
            setState(permits);
        }
       ##读取目前剩余量
        final int getPermits() {
            return getState();
        }
       #非公平的竞态获取方法
        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                #死循环进行调用尝试通过cas原子操作刷新剩余小纸牌数量
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    ##如果剩余数量小于零就直接返回负数传递给AQS父类进行排队,
                    ##如果cas成功了则返回剩余数量
                    return remaining;
            }
        }
        #游园结束归还小纸牌
        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
               #通过死循环的形式进行归还
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                     #超出最大数量这个一般不会发生比如超了int最大范围了
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }
        #减少目前令牌数量,管理员B尝试扣下来一些小纸牌,
        #这个过要原子性完成的
        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }
        #清空目前许可,类似于暴力作废所有小纸牌,把小纸牌清零了,不再往外发放
        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

接下来是两个竞争方式的代码实现

 /**
     * NonFair version
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;
       #初始化许可数
        NonfairSync(int permits) {
            super(permits);
        }
        #调用不公平方法实现竞态资源获取
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

这里啰嗦两句公平跟不公平其实就是在竞态资源时候的一些区别,不公平我实现逻辑就是上来就抢,不会检查队列是否有排队的,抢不到就再去排队,而公平的实现逻辑则是上来先检查一下目前同步器队列中有没有排队的node节点,如果已经存在了则老老实实去排队

 /**
     * Fair version
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                #检查是否有排队的如果有则返回负值,告诉AQS也就是当前类的爷爷类,去安排当前线程去排队吧
                if (hasQueuedPredecessors())
                    return -1;
               #如果没有排队的线程尝试修改目前许可数量修改成功则继续执行当前线程
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

这里对公平锁与非公平锁的
tryAcquireShared((int acquires)进行一个简单说明,我们在阅读并发里面相关代码时候会经常看到这个名称的方法,这个方法设计的灵魂是李大爷在AQS类里面做了抽象,因为线程的排队跟调度不希望被子类去实现代码也不够优雅,因此李大爷在AQS里面定义了方法进行占位,当然类似的还有tryAcquire(int arg) 他们的职责其实都是要给AQS一个反馈,目的在于告诉AQS我是让他去尝试排队呢,还是直接放行呢,放行的话就需要子类在执行时候能够正常获得竞争状态,也就是往往能够按照预期通过CAS操作修改state变量的值(这里之所以是预期是因为不同的锁表现出不同的形式)。比如重入锁要求只有是0才能尝试加锁,如果是大于0的其实是被其他线程占有了那么就返回负值,如果是本文信号量则要求state得大于等于零且修改成功了返回大于等于零就能让当前线程获得执行权力。还有可重入读写锁,通过划分state的范围,每次通过掩码计算。

剩下的一些方法都是简单的调用了

#尝试获得许可可中断的,每次申请都会检查当前线程
是否被标记为Interrupted 如果被标记了则立马抛出异常进行反馈
 public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
线程被标记后不会立马抛出异常,而是将标记记录下来当线程被唤醒时候,将interrupt状态填充回去并返回。
public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }

这里面适当展开一点,可能有不少小伙伴在读到李大爷的AQS里面一行代码会懵逼

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)①
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

懵逼产生的根源再有第一个方法比较好理解就是检查当前是不是竞争失败要停掉当前线程,这里面有个线程叫停的api LockSupport.park(),这个大哥其实是能够将线程给喊停的。

 private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
       #线程跑到这就等于会休息了
       #this是一个blocker,为了方便在jvm检测时候看清楚锁所在位置用的。
        return Thread.interrupted();

    }

我之前懵逼在这里,就是为何调用这个api,
这个api的意思其实是要解决这个问题,线程在挂起的时候有人调用了该//线程的interrupt方法,也就是给线程打了个标记我,调用interrupted方法其实是会重置标记为false,但是返回值为true。如果标记为false,在调用收回返回false,如果返回true李大爷在外面声明的interrupted变量就会被修改为true,当doAcquireShared方法执行循环时候看上面doAcquireShared代码的①那么就会为true。



 static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

调用了selfInterrupt(),李大爷这么设计其实是为了将线程park时候被修改的状态进行传递,可以理解要死死外面去,我就管排队跟唤醒,其他的操作我给你原模原样传递出去。
再具体的AQS里面的实现细节后面慢慢撕,书归正传
分析后面几个方法的作用

尝试获得许可一次性的,如果获得了许可返回true否则返回false
public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
}
一次尝试申请permits个许可,申请成功则返回true否则返回false
public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }
指定时间内尝试获取许可传入时间数量,跟时间计量单位
 public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }
释放指定数量的许可
 public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }
查询可用的许可数量
public int availablePermits() {
        return sync.getPermits();
    }

清空目前所有许可数
public int drainPermits() {
        return sync.drainPermits();
    }

减少指定数量的许可数
 protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }
判断当前是否为公平模式
public boolean isFair() {
        return sync instanceof FairSync;
    }
返回当前是否有排队的线程,通过aqs head!=tail实现,不一定准确
public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }
返回目前排队队列长度,每次获取是通过aqs迭代累加node链表,因此也不一定准确,
public final int getQueueLength() {
        return sync.getQueueLength();
    }
返回目前排队线程集合
protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,047评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,807评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,501评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,839评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,951评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,117评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,188评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,929评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,372评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,679评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,837评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,536评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,168评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,886评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,129评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,665评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,739评论 2 351