Java Semaphore 小试

Semaphore 被称为信号量,JUC 中非常重要也是很常见的一个类,内部通过 AQS 实现,主要控制线程间同步,应用场景例如限流。
Semaphore 底层基于 AQS 类似 ReentrantLock 实现了公平和公平的机制,AQS 中 state 被设计为许可证的剩余数量。注意 AQS 设计思想包括独占锁和共享锁,Semaphore 正是利用了 AQS 其锁共享机制。

创建 Semaphore

Semaphore 包含两个构造方法,分别用来创建公平和非公平的锁机制,默认为非公平机制,必须要指定的还有“许可证” permits 的数量。

// 公平
Semaphore fiarSp = new Semaphore(10, true);
// 非公平
Semaphore noFiarSp = new Semaphore(10);

state 控制

获取锁

下方选取非公平锁示例

        final int nonfairTryAcquireShared(int acquires) {
            // CAS 自旋
            for (;;) {
                // 当前可用
                int available = getState();
                // 占用后剩余数量
                int remaining = available - acquires;
                // CAS
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

如果剩余数量不足 1 - 0 == -1

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 获取共享锁
        if (tryAcquireShared(arg) < 0)
             // 进入队列
            doAcquireSharedInterruptibly(arg);
    }

进入队列,包装线程节点 Shared Thread Node,使用 LockSupport 挂起线程。

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

释放和唤醒

调用 Semaphore.release 方法。

    public void release() {
        sync.releaseShared(1);
    }

调用子方法释放锁

    public final boolean releaseShared(int arg) {
         // 调用子方法 tryReleaseShared 操作 state
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

CAS 归还许可证

        protected final boolean tryReleaseShared(int releases) {
            // 自旋直到 “归还” (CAS)成功。
            for (;;) {
                // 假设剩余 0 
                int current = getState();
                // next = 0 + 1
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

移除当前 Node 并唤醒队列(链表)中状态符合的线程

     */
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。