Semaphore源码分析

1.概述

这篇文章主要用来介绍Semaphore源码。应该也是最后一篇了。还有一个chm感觉有些地方还有点问题,就先不发了。等哪天看懂了再说!Semaphore其实在平时还是比较常用的,可以用到限流的等作用。也就是信号量级的意思。下面我们来看一下实现!

2.实现结构和原理

底层不用说,也是通过AQS进行构建了,类似于我们之前看到过的CountDownLatch。只不过对于不同的场景给出了不同的实现而已。

因此这里肯定有一个sync的内部类!这里还提供了公平和非公平两种实现,因此存在一个非公平的内部类和公平的内部类。同样默认为非公平的,毕竟性能好,除非特殊需求,采用公平实现!

我们平时使用的数据库连接池,对象池,等有数量的缓存池也是通过此思想实现的!

3.具体方法实现原理

构造方法

public Semaphore(int permits, boolean fair) {

    sync = fair ? new FairSync(permits) : new NonfairSync(permits);

}

acquire方法的实现

public void acquire(int permits) throws InterruptedException {

    if (permits < 0) throw new IllegalArgumentException();

    sync.acquireSharedInterruptibly(permits);

}

我们来看一下非公平的实现:

final int nonfairTryAcquireShared(int acquires) {

    //无限循环其实就是自旋获取。

    for (;;) {

        //获取剩余可用

        int available = getState();

        //获取减去需要的剩余

        int remaining = available - acquires;

        //<0直接return。代表需要阻塞。获取成功,返回当前剩余数>0,不需要阻塞

        if (remaining < 0 ||

            compareAndSetState(available, remaining))

            return remaining;

    }

}

公平实现:

protected int tryAcquireShared(int acquires) {

    for (;;) {

        //因为是公平实现,所以如果队列中有线程再等待,我们直接返回-1

        if (hasQueuedPredecessors())

            return -1;

        int available = getState();

        int remaining = available - acquires;

        if (remaining < 0 ||

            compareAndSetState(available, remaining))

            return remaining;

    }

}

这里说一下,返回负数,就是需要阻塞的意思。

public final void acquireSharedInterruptibly(int arg)

        throws InterruptedException {

    if (Thread.interrupted())

        throw new InterruptedException();

    if (tryAcquireShared(arg) < 0)

        doAcquireSharedInterruptibly(arg);

}

因为之前AQS没有分析acquireSharedInterruptibly这个方法,这里我们简单的看一下

private void doAcquireSharedInterruptibly(int arg)

    throws InterruptedException {

        //将当前结点插入到队列

    final Node node = addWaiter(Node.SHARED);

    boolean failed = true;

    try {

        for (;;) {

            //获取前一个结点,如果是head,重新尝试获取

            final Node p = node.predecessor();

            if (p == head) {

                int r = tryAcquireShared(arg);

                //获取成功

                if (r >= 0) {

                    setHeadAndPropagate(node, r);

                    p.next = null; // help GC

                    failed = false;

                    return;

                }

            }

            if (shouldParkAfterFailedAcquire(p, node) &&

                parkAndCheckInterrupt())

                throw new InterruptedException();

        }

    } finally {

        if (failed)

            cancelAcquire(node);

    }

}

我们再来看一下释放资源的方法:

public final boolean releaseShared(int arg) {

    if (tryReleaseShared(arg)) {

        doReleaseShared();

        return true;

    }

    return false;

}

//这里就是将状态更新而已

protected final boolean tryReleaseShared(int releases) {

    for (;;) {

        int current = getState();

        int next = current + releases;

        if (next < current) // 这里表示溢出了。也就是超过了int最大值

            throw new Error("Maximum permit count exceeded");

        if (compareAndSetState(current, next))

            return true;

    }

}

真正释放的逻辑再下面方法中

private void doReleaseShared() {

    for (;;) {

        Node h = head;

        if (h != null && h != tail) {

            int ws = h.waitStatus;

            if (ws == Node.SIGNAL) {

                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

                    continue;            // loop to recheck cases

                unparkSuccessor(h);

            }

            else if (ws == 0 &&

                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

                continue;                // loop on failed CAS

        }

        if (h == head)                   // loop if head changed

            break;

    }

}

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容