Java多线程编程五 锁

并发包的锁

LockSupport 工具类

JDK 中的 rt.jar 包里面的 LockSupport 是一个工具类,它的主要作用是挂起和唤醒线程。

LockSupport 类与每个使用它的线程都会关联一个许可证,在默认情况下调用LockSupport类的方法的线程是不持有许可证的。LockSupport是使用Unsafe类实现的,其主要方法:

  1. static void park();

    如果调用park方法的线程已经拿到了与Support关联的许可证,则调用LockSupport.park() 时会马上返回,否则会被阻塞挂起,默认情况下线程不持有许可证。

    如果其他线程调用了此线程的interrupt方法,设置了中断标志或线程被虚假唤醒,则阻塞线程也会返回,park最好使用循环判断执行。

  2. static void unpark(Thread thread);

    如果thread未获得LockSupport与Thread类的关联许可,则让thread持有;

    若thread之前被park挂起,则thread被唤醒;

    若thread之前没有调用park,则调用unpark后再调用park方法,其会立即返回。

  3. static void parkNanos(long nanos);

    与park的不同之处,如果线程没有拿到LockSupport的许可支持,将会在nanos时间后自动返回。

    使用 jsatck pid命令可以查看线程堆栈信息

  4. static void park(Object blocker);

    Thread 类中有一个volatile Object parkBlocker 变量,用来存放blocker变量;

  5. static void parkNanos(Object blocker,long nanos);

    相比4 多了超时时间。

  6. static void parkUtil(Object blocker,long deadline);
    这个时间时一个时间点,

AQS 抽象同步队列

它维护了一个volatile int state(代表共享资源)和一个FIFO(双向队列)线程等待队列(多线程争用资源被阻塞时会进入此队列)

AQS的实现依赖内部的同步队列(FIFO双向队列),如果当前线程获取同步状态失败,AQS会将该线程以及等待状态等信息构造成一个Node,将其加入同步队列的尾部,同时阻塞当前线程,当同步状态释放时,唤醒队列的头节点。

AQS队列存储被阻塞队列,其可拥有多个条件变量 ConditionObject 内部类对象,内部类对象有 signal();signalAll();await();方法分别与 synchronized 同步资源对应,相应条件变量阻塞的线程被存到该条件变量维护的条件队列,该队列是单向链表,一个AQS 可拥有多个condition。

原理图示与类图

aqs1.png
aqs2.png

AQS重要属性与模板方法

AQS将大部分的同步逻辑均已经实现好,继承的自定义同步器只需要实现state的获取(acquire)和释放(release)的逻辑代码就可以。AQS需要子类复写的方法均没有声明为abstract,目的是避免子类需要强制性覆写多个方法,因为一般自定义同步器要么是独占方法,要么是共享方法,只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。

private transient volatile Node head;

private transient volatile Node tail;

private volatile int state;//状态int变量

static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
    static final int PROPAGATE = -3;

      
    
    volatile int waitStatus;

  
    volatile Node prev;

   
    volatile Node next;

   
    volatile Thread thread;

   
    Node nextWaiter;
}
//模板方法

//独占方式。尝试获取资源,成功则返回true,失败则返回false。
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
//独占方式。尝试释放资源,成功则返回true,失败则返回false。
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}
//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
//共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}
protected boolean isHeldExclusively() {
    throw new UnsupportedOperationException();
}

独占模式的独占与释放

//1.获取资源
public final void acquire(int arg) {
    //获取资源成功即结束,失败则封装节点为EXCLUSIVE添加线程到等待队列,然后中断当先线程
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
//2.(接1)获取资源需要具体子类去实现
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
//3.(接1)封装节点为EXCLUSIVE后,添加线程到等待队列尾部
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    //简单测试添加到队列
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //结尾节点为null或添加失败,使用CAS自旋添加节点
    enq(node);
    return node;
}
//4.(接3)节点入队 头节点为null则设置为头节点
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

//5.(接1)中断当前线程
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}
//1.释放资源,并释放等待队列阻塞的一个线程
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
//2.释放资源具体实现
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}
//3.释放一个合适的被阻塞挂起的线程
private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

独占锁 ReentrantLock

retranlock.png

Sysc类直接继承自AQS,它的子类NonfairSync 和 FairSysn 分别实现的获取锁的非公平与公平策略。


void lock();获取锁

void interruptibly();对中断进行响应

boolean tryLock();尝试获取锁,不会引起当前线程阻塞

boolean tryLock(long timeout, TimeUnit unit);与tryLock不同,设置了超时时间,时间超时未获取锁则返回false;

void unlock();释放锁

具体实现

//非公平锁的实现
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
//公平锁的实现
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        //当前节点有先驱节点则获取失败
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

读写锁 ReentrantReadWriteLock

rwlock.png

通过state高16位表示读状态,低16位表示写状态。写锁可重入,是独占锁。读锁为共享锁

读锁实现

static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
protected final int tryAcquireShared(int unused) {
    /*
    * Walkthrough:
    * 1. If write lock held by another thread, fail.
    * 2. Otherwise, this thread is eligible for
    *    lock wrt state, so ask if it should block
    *    because of queue policy. If not, try
    *    to grant by CASing state and updating count.
    *    Note that step does not check for reentrant
    *    acquires, which is postponed to full version
    *    to avoid having to check hold count in
    *    the more typical non-reentrant case.
    * 3. If step 2 fails either because thread
    *    apparently not eligible有资格 or CAS fails or count
    *    saturated, chain to version with full retry loop.
    */
    Thread current = Thread.currentThread();
    int c = getState();
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        //高十六位
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null ||
                rh.tid != LockSupport.getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}

写锁实现

protected final boolean tryAcquire(int acquires) {
    /*
    * Walkthrough:
    * 1. If read count nonzero or write count nonzero
    *    and owner is a different thread, fail.
    * 2. If count would saturate, fail. (This can only
    *    happen if count is already nonzero.)
    * 3. Otherwise, this thread is eligible for lock if
    *    it is either a reentrant acquire or
    *    queue policy allows it. If so, update state
    *    and set owner.
    */
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);
        //低十六位
        return true;
    }
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

StampedLock(JDK8)

内部有不可重入锁,可相互转换

  1. 写锁 writeLock 独占锁
  2. 悲观读锁 readLock 共享锁
  3. 乐观读锁 tryOptimisticRead 利用版本号,在本地存快照,不使用锁
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容