Java技术专题-CLH队列原理介绍

前提概要

CHL Node FIFO队列之前,我们先分析这种队列的几个要素。首先要了解的是自旋锁,所谓自旋锁即是某一线程去尝试获取某个锁时,如果该锁已经被其他线程占用的话,此线程将不断循环检查该锁是否被释放,而不是让此线程挂起或睡眠。它属于为了保证共享资源而提出的一种锁机制,与互斥锁类似,保证了公共资源在任意时刻最多只能由一条线程获取使用,不同的是互斥锁在获取锁失败后将进入睡眠或阻塞状态。下面利用代码实现一个简单的自旋锁


public class SpinLock {
    
      private static Unsafe unsafe = null;
      private static final long valueOffset;
      private volatile int value = 0;

      static {
        try {
                unsafe=getUnsafeInstance(); 
                valueOffset = unsafe.objectFieldOffset(SpinLock.class.
                getDeclaredField("value"));
        } catch (Exception ex) {
            throw new Error(ex);
        }
     }

    private static Unsafe getUnsafeInstance() throws     
                SecurityException,NoSuchFieldException, 
                IllegalArgumentException,IllegalAccessException {
          Field theUnsafeInstance = Unsafe.class.getDeclaredField("theUnsafe");
          theUnsafeInstance.setAccessible(true);
          return (Unsafe) theUnsafeInstance.get(Unsafe.class);
    }

    public void lock() {
       for (;;) {
            int newV = value + 1;
            if(newV==1)
                if (unsafe.compareAndSwapInt(this, valueOffset, 0, newV)){
                     return ;
            }
        }
    }

    public void unlock() {
           unsafe.compareAndSwapInt(this, valueOffset, 1, 0);
    }
}

这是一个很简单的自旋锁,主要看加粗加红的两个方法lock和unlock,Unsafe仅仅是为操作提供了硬件级别的原子CAS操作,暂时忽略此类,只要知道它的作用即可,我们将在后面的“原子性如何保证”小节中对此进行更加深入的阐述。

对于lock方法,假如有若干线程竞争,能成功通过CAS操作修改value值为newV的线程即是成功获取锁的线程,将直接通过,而其他的线程则不断在循环检测value值是否又改回0,而将value改为0的操作就是获取锁的线程执行完后对该锁进行释放,通过unlock方法释放锁,释放后若干线程又对该锁竞争。

如此一来,没获取的锁也不会被挂起或阻塞,而是不断循环检查状态。图2-5-9-3可加深自旋锁的理解,五条线程轮询value变量,t1获取成功后将value置为1,此状态时其他线程无法竞争锁,t1使用完锁后将value置为0,剩下的线程继续竞争锁,以此类推。这样就保证了某个区域块的线程安全性。**

image

自旋锁适用于锁占用时间短,即锁保护临界区很小的情景,同时它需要硬件级别操作,也要保证各缓存数据的一致性,另外,无法保证公平性,不保证先到先获得,可能造成线程饥饿在多处理器机器上,每个线程对应的处理器都对同一个变量进行读写,而每次读写操作都将要同步每个处理器缓存,导致系统性能严重下降。

AQS运作进阶

看Craig, Landin, and Hagersten发明的CLH锁如何优化同步带来的花销,其核心思想是:通过一定手段将所有线程对
某一共享变量轮询竞争转化为一个线程队列且队列中的线程各自轮询自己的本地变量。这个转化过程由两个要点,一是构建怎样的队列&如何构建队列,为了保证公平性,构建的将是一个FIFO队列,构建的时候主要通过移动尾部节点tail实现队列的排队,每个想获取锁的线程创建一个新节点并通过CAS原子操作将新节点赋予tail,然后让当前线程轮询前一节点的某个状态位。

如图,如此就成功构建线程排队队列;二是如何释放队列,执行完线程后只需将当前线程对应的节点状态位置为解锁状态,由于下一节点一直在轮询,可获取到锁。

image

CLH锁的核心思想貌似是将众多线程长时间对某资源的竞争,通过有序化这些线程转化为只需对本地变量检测。唯一存在竞争的地方就是在入队列之前对尾节点tail的竞争,但竞争的线程的数量已经少了很多,且比起所有线程直接对某资源竞争的轮询次数也减少了很多,节省了很多CPU缓存同步操作,大大提升系统性能,利用空间换取性能。

下面提供一个简单的CLH锁实现代码,lock与unlock两方法提供加锁解锁操作,每次加锁解锁必须将一个CLHNode对象作为参数传入,lock方法的for循环是通过CAS操作将新节点插入队列,而while循环则是检测前驱节点的锁状态位,一旦前驱节点锁状态位允许则结束检测让线程往下执行。解锁操作先判断当前节点是否为尾节点,如是则直接将尾节点置为空,此时说名仅仅只有一条线程在执行,否则将当前节点的锁状态位置为解锁状态。

public class CLHLock {

private static Unsafe unsafe = null;

private static final long valueOffset;

private volatile CLHNode tail;

public class CLHNode {

private boolean isLocked = true;

}

static {

try {

unsafe = getUnsafeInstance();

valueOffset = unsafe.objectFieldOffset(CLHLock.class.getDeclaredField("tail"));

} catch (Exception ex) {

    throw new Error(ex);

}

}

public void lock(CLHNode currentThreadNode) {

CLHNode preNode = null;

for (;;) {

    preNode = tail;

    if (unsafe.compareAndSwapObject(this, valueOffset, tail,currentThreadNode))

        break;

    }

    if (preNode != null)

      while (preNode.isLocked) {

      }

}

public void unlock(CLHNode currentThreadNode) {

  if (!unsafe.compareAndSwapObject(this, valueOffset, currentThreadNode,null))

       currentThreadNode.isLocked = false;

  }

}

private static Unsafe getUnsafeInstance() throws 
        
            SecurityException,NoSuchFieldException, 
IllegalArgumentException,IllegalAccessException {

    Field theUnsafeInstance = Unsafe.class.getDeclaredField("theUnsafe");

    theUnsafeInstance.setAccessible(true);

    return (Unsafe) theUnsafeInstance.get(Unsafe.class);

}

}

AQS运作CLH简介

在CLH锁核心思想的影响下,Java并发包的基础框架AQS以CLH锁作为基础而设计,其中主要是考虑到CLH锁更容易实现取消与超时功能。

  • 比起原来的CLH锁已经做了很大的改造,主要从两方面进行了改造:节点的结构与节点等待机制在结构上引入了头结点和尾节点,他们分别指向队列的头和尾,尝试获取锁、入队列、释放锁等实现都与头尾节点相关,并且每个节点都引入前驱节点和后后续节点的引用;

  • 在等待机制上由原来的自旋改成阻塞唤醒。通过前驱后续节点的引用一节节连接起来形成一个链表队列,对于头尾节点的更新必须是原子的。下面详细看看入队、检测挂起、释放出队、超时、取消等操作。

image

CLH队列

入队,整块逻辑其实是用一个无限循环进行CAS操作,即用自旋方式竞争直到成功。将尾节点tail的旧值赋予新节点node的前驱节点,并尝试CAS操作将新节点node赋予尾节点tail,原先的尾节点的后续节点指向新建节点node。完成上面步骤就建立起一条链表队列。

for (;;) {

Node t = tail;

node.prev = t;

if (compareAndSetTail(t, node)) {

  t.next = node;

  return node;

}

}

检测挂起,上面我们说到节点等待机制已经被AQS作者由自旋机制改造成阻塞机制一个新建的节点完成入队操作后,如果是自旋则直接进入循环检测前驱节点是否为头结点即可,但现在被改为阻塞机制,当前线程将首先检测是否为头结点且尝试获取锁,如果当前节点为头结点并成功获取锁则直接返回,当前线程不进入阻塞,否则将当前线程阻塞。代码简化如下:

for (;;) {

if (node.prev == head)

    if(尝试获取锁成功){ 尝试CMPCXHG 硬件方式修改状态成功

         head=node;

         node.next=null;

         return;

    }

    阻塞线程

}

释放出队,出队的主要工作是负责唤醒等待队列中后续节点,让所有等待节点环环相接,每条线程有序地往下执行。代码简化如下:

Node s = node.next;

唤醒节点s包含的线程

超时,在支持超时的模式下需要LockSupport类的parkNanos方法支持,线程在阻塞一段时间后会自动唤醒,每次循环将累加消耗时间,当总消耗时间大于等于自定义的超时时间时就直接分返。代码简化如下:

for (;;) {

尝试获取锁

if (nanosTimeout <= 总消耗时间)

  return;

  LockSupport.parkNanos(this, nanosTimeout);

}

取消,队列中等待锁的队列可能因为中断或超时而涉及到取消操作,这种情况下被取消的节点不再进行锁竞争。此过程主要完成的工作是将取消的节点移除,先将节点的。先将节点node状态设置成取消,再将前驱节点pred的后续节点指向node的后续节点,这里由于涉及到竞争,必须通过CAS进行操作,CAS操作就算失败也不必理会,因为已经改了节点的状态,在尝试获取锁操作中会循环对节点的状态判断。

node.waitStatus = Node.CANCELLED;

Node pred = node.prev;

Node predNext = pred.next;

Node next = node.next;

compareAndSetNext(pred, predNext, next);

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容