(四) 源码分析

ReentrantLock + Condition

// condition 使用
public static void main(String[] args) throws InterruptedException {
  ReentrantLock lock = new ReentrantLock();
  Condition condition = lock.newCondition();
  lock.lock();
  condition.await();
  condition.signal();
  lock.unlock();
}

翻阅源码过程中发现,CAS使用的技巧。

在操作Node节点,线程状态(waitStatus)时无论是否已加锁成功 都会使用CAS。思考原因!!

new ReentrantLock()

public ReentrantLock() {
  // 默认使用非公平锁
  sync = new NonfairSync();
}

// 实现类
static final class NonfairSync extends Sync {
  private static final long serialVersionUID = 7316153563782823691L;

  /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
  // lock方法
  final void lock() {
    if (compareAndSetState(0, 1))
      setExclusiveOwnerThread(Thread.currentThread());
    else
      acquire(1);
  }
    
  // 尝试获取锁的方法 调用父类Sync的实现
  protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
  }
}

lock.newCondition()

// ReentrantLock
public Condition newCondition() {
  return sync.newCondition();
}

// Sync
final ConditionObject newCondition() {
  return new ConditionObject();
}
// AbstractQueuedSynchronizer
// AQS中定义了 ConditionObject
public class ConditionObject implements Condition, java.io.Serializable {
  private static final long serialVersionUID = 1173984872572414699L;
  /** First node of condition queue. */
  private transient Node firstWaiter;
  /** Last node of condition queue. */
  private transient Node lastWaiter;

  /**
         * Creates a new {@code ConditionObject} instance.
         */
  public ConditionObject() { }
  ...
}

lock.lock()

// ReentrantLock
public void lock() {
  sync.lock();
}

// NonfairSync
final void lock() {
  // CAS 尝试修改 threadState 
  // unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  // 1 表示加锁状态 0 表示无锁状态  大于1 表示锁的重入次数 注意此时设置值使用了CAS操作 
  if (compareAndSetState(0, 1)){
    // 加锁成功 设置独占线程值(将当前线程设置为锁的独占线程)
    setExclusiveOwnerThread(Thread.currentThread());
  }
  else{
    // CAS加锁失败 重试
    acquire(1);
  }
}
// AQS arg = 1
public final void acquire(int arg) {
  // 尝试获取锁 正常情况会失败(这里对于抢占锁来讲只是 又走了一下lock方法的CAS操作)
  // 抢一次不行 立马再抢一下,存个侥幸心理
  if (!tryAcquire(arg) &&
      // 抢不到只好 先添加阻塞队列
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
}

// NonfairSync acquires = 1
protected final boolean tryAcquire(int acquires) {
  return nonfairTryAcquire(acquires);
}

// Sync acquires = 1
final boolean nonfairTryAcquire(int acquires) {
  final Thread current = Thread.currentThread();
  // 获取线程状态 由于未加锁成功 默认是 0 
  int c = getState();
  if (c == 0) {
    // cas 再次尝试加锁
    if (compareAndSetState(0, acquires)) {
      // 成功设置当前线程独占
      setExclusiveOwnerThread(current);
      return true;
    }
  }
  // 状态不为0  表示其他线程持有锁 判断持有锁的线程是否是当前线程
  else if (current == getExclusiveOwnerThread()) {
    // 如果是 设置重入次数
    int nextc = c + acquires;
    if (nextc < 0) // overflow  超出最大值变负数
      throw new Error("Maximum lock count exceeded");
    // 设置当前状态 unsafe 
    // 思考:AQS 中的state 是共享变量 为什么这里读取和设置不需要 CAS
    setState(nextc);
    return true;
  }
  return false;
}
// AQS 
// 此时mode = null
private Node addWaiter(Node mode) {
  // 新建Node节点 节点的nextWaiter 为null  
  // netWaiter Condition 等待队列下个节点
  Node node = new Node(Thread.currentThread(), mode);
  // Try the fast path of enq; backup to full enq on failure
  // 先快速尝试入队,失败再执行完整的入队逻辑
  Node pred = tail;
  // tail 为阻塞队列尾结点,如果尾结点不为空表示队列不为null
  if (pred != null) {
    node.prev = pred;
    // 这里使用尾插法 思考 为什么要使用尾插法 而不是头插法
    // 尝试将当前节点修改为尾节点
    if (compareAndSetTail(pred, node)) {
      // 修改成将 之前尾节点的 next节点指向当前节点
      pred.next = node;
      return node;
    }
  }
  // 如果尾结点为null 或者 CAS 尾结点操作失败(其他线程并发 导致尾结点变更) 
  // 调用enq执行真正的入队操作
  enq(node);
  return node;
}

// AQS
private Node enq(final Node node) {
  // 自旋直到入队成功
  for (;;) {
    Node t = tail;
    // 再次判断尾结点是否为null 此段代码允许并发 有可能已有新的变化
    if (t == null) { // Must initialize
      // 头尾节点为null 表示队列为null 需要初始化
      if (compareAndSetHead(new Node()))
        tail = head;
    } else {
      node.prev = t;
      // 将node节点设置为 尾结点
      if (compareAndSetTail(t, node)) {
        t.next = node;
        return t; // 思考此处返回 t 的原因是什么
      }
    }
  }
}
// AQS node 为当前节点 arg = 1 
final boolean acquireQueued(final Node node, int arg) {
  boolean failed = true;
  try {
    boolean interrupted = false;
    // 自旋 等待 被唤醒后 找头节点并尝试获取锁,失败后继续等待
    for (;;) {
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
        // 成功后设置当前节点为 头节点 将原头结点置位Null 
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return interrupted;
      }
      // 自旋 现将节点状态 设置为 -1,然后park阻塞线程
      if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
        interrupted = true;
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

// AQS 
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  int ws = pred.waitStatus;
  // 等待状态 int 类型初始化未赋值 所以首次 应该为 0 
  if (ws == Node.SIGNAL)
    /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
    return true;
  if (ws > 0) {
    /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
    do {
      node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
  } else {
    /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  }
  return false;
}

// AQS
private final boolean parkAndCheckInterrupt() {
  // 这个位置阻塞 
  LockSupport.park(this);
  return Thread.interrupted();
}

// LockSupport
public static void park(Object blocker) {
  Thread t = Thread.currentThread();
  setBlocker(t, blocker);
  UNSAFE.park(false, 0L);
  setBlocker(t, null);
}

Lock.unlock()

// ReentrantLock
public void unlock() {
  // 同样调用AQS 释放锁 每次state -1 与重入次数匹配
  sync.release(1);
}
// AQS
public final boolean release(int arg) {
  // 释放锁 tryRelease 由具体的子类 来实现如何释放锁 与acquire类似
  if (tryRelease(arg)) {
    // 释放成功后 获取头结点
    Node h = head;
    // 头结点不为null 表示阻塞队列不为空 需要执行唤醒
    // waitStatus != 0 表示阻塞线程 非初始状态
    // 已知 waitStatus 有5种  1 取消状态  0 初始状态(int默认为0) -1 阻塞等待被唤醒状态 -2 condition条件  -3 传播?暂时不懂
    // 个人理解此处为什么不使用 h.waitStatus == -1
    if (h != null && h.waitStatus != 0)
      unparkSuccessor(h);
    return true;
  }
  return false;
}
// ReentrantLock->Sync
protected final boolean tryRelease(int releases) {
  // 获取当前 锁的状态 
  int c = getState() - releases;
  // 如果当前线程非持有锁的线程 直接抛出异常
  if (Thread.currentThread() != getExclusiveOwnerThread())
    throw new IllegalMonitorStateException();
  boolean free = false;
  // c == 0 表示完全释放锁,否则表示减少重入次数
  if (c == 0) {
    free = true;
    // 设置当前锁的独占线程为Null 允许其他线程抢占锁
    setExclusiveOwnerThread(null);
  }
  // 设置锁的状态值 思考:为什么此处不使用CAS操作,而lock方法acquire时需要使用CAS
  setState(c);
  return free;
}
// AQS 唤醒 Node:head节点
private void unparkSuccessor(Node node) {
  /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
  // 正常情况 此处为 -1 阻塞等待唤醒状态
  int ws = node.waitStatus;
  if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0); // 此处为什么又使用CAS

  /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
  // 获取头节点的下个节点 
  Node s = node.next;
  if (s == null || s.waitStatus > 0) {
    // 如果next节点为null 从尾部向前遍历,找到最前的 匹配节点 作为本次唤醒节点
    s = null;
    for (Node t = tail; t != null && t != node; t = t.prev)
      if (t.waitStatus <= 0)
        s = t;
  }
  if (s != null)
    LockSupport.unpark(s.thread); // 为什么不唤醒头节点? 
    // 头结点在创建的时候是一个 new Node() 操作 可以理解为是一个Null节点,阻塞队列添加元素的时候是尾插法 也就是说第一个 等待线程会作为 tail 节点存在,所以找到头结点后要next下一个节点 作为需要唤醒的元素 
}
// AQS 加锁阻塞在这个位置,唤醒后继续从这里开始
private final boolean parkAndCheckInterrupt() {
  LockSupport.park(this);
  // Thread.interrupted() 会返回线程中断状态,如果是中断触发的唤醒操作 会返回true 
  // unpark唤醒操作返回false
  return Thread.interrupted();
}

// AQS 
final boolean acquireQueued(final Node node, int arg) {
  boolean failed = true;
  try {
    boolean interrupted = false;
    for (;;) {
      // 重新开始自旋操作 此时获取的pred节点为head节点 (唤醒的是head节点的next节点)
      final Node p = node.predecessor();
      // 重新尝试抢占锁
      if (p == head && tryAcquire(arg)) {
        // 抢占成功 将当前节点设置为Head节点  思考 此处为什么不使用CAS
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return interrupted;
      }
      if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
        // 如果是中断唤醒 将interrupted变量置为 true 后续返回true
        interrupted = true;
    }
  } finally {
    if (failed)
      cancelAcquire(node);
  }
}

//AQS 设置头节点
private void setHead(Node node) {
  head = node;
  node.thread = null;
  node.prev = null;
}

condition.await();

// AQS 
public final void await() throws InterruptedException {
  // 判断是否被中断
  if (Thread.interrupted())
    throw new InterruptedException();
  // 添加到等待队列
  Node node = addConditionWaiter();
  // 释放获得的锁 saveState 为锁state值
  int savedState = fullyRelease(node);
  int interruptMode = 0;
  // 判断是否满足阻塞条件
  while (!isOnSyncQueue(node)) {
    // 阻塞等待唤醒
    LockSupport.park(this);
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
      break;
  }
  if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    interruptMode = REINTERRUPT;
  if (node.nextWaiter != null) // clean up if cancelled
    unlinkCancelledWaiters();
  if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);
}

// AQS -> ConditionObject
private Node addConditionWaiter() {
  Node t = lastWaiter;
  // If lastWaiter is cancelled, clean out.
  if (t != null && t.waitStatus != Node.CONDITION) {
    // 清除非 Condition状态节点
    unlinkCancelledWaiters();
    t = lastWaiter;
  }
  // 使用当前线程,初始化Node节点 并将头节点、尾结点指向他
  Node node = new Node(Thread.currentThread(), Node.CONDITION);
  if (t == null)
    firstWaiter = node; // 此处为什么不先设置尾结点?
  else
    t.nextWaiter = node;
  lastWaiter = node;
  return node;
}

// AQS 
final int fullyRelease(Node node) {
  boolean failed = true;
  try {
    // 获取当前锁的状态 有可能是重入n次
    int savedState = getState();
    // 释放锁 与lock.unlock()逻辑一致
    if (release(savedState)) {
      failed = false;
      return savedState;
    } else {
      throw new IllegalMonitorStateException();
    }
  } finally {
    if (failed)
      node.waitStatus = Node.CANCELLED;
  }
}

Condition.signal()

// AQS -> ConditionObject (Condition接口)
public final void signal() {
  if (!isHeldExclusively())
    throw new IllegalMonitorStateException();
  // 从头开始准备唤醒
  Node first = firstWaiter;
  if (first != null)
    doSignal(first);
}

// AQS
private void doSignal(Node first) {
  do {
    if ( (firstWaiter = first.nextWaiter) == null)
      lastWaiter = null;
    first.nextWaiter = null;
  } while (!transferForSignal(first) &&
           (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
  /*
         * If cannot change waitStatus, the node has been cancelled.
         */
  // 尝试修改当前Node节点 状态 修改失败 尝试唤醒下一个节点
  if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    return false;

  /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
  // 状态修改成功 先重新加入阻塞队列
  // 注意这里p为node的前一个节点
  Node p = enq(node);
  int ws = p.waitStatus;
  // 为什么要修改p的状态 唤醒node节点
  if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    LockSupport.unpark(node.thread); // 唤醒
  return true;
}

问题

1. 很多地方用了CAS操作,比如修改锁的状态值AQS的state、Thread的waitStatus。在修改state时会考虑如果加锁成功状态直接修改无需CAS,但是在修改waitStatus时,及时已经获取到锁依然会使用CAS。
2. Condition等待队列为什么在signal时修改pred节点状态。
3. 为什么使用尾插法而不是头插发

ThreadPoolExecutor

new ThreadPoolExecutor()

public ThreadPoolExecutor(int corePoolSize, 
                          int maximumPoolSize,
                          long keepAliveTime, // 存活时间
                          TimeUnit unit, // 存活时间单位
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory, // 线程工厂 自定义线程属性、名称
                          RejectedExecutionHandler handler // 拒绝策略
                         ) {
  if (corePoolSize < 0 ||
      maximumPoolSize <= 0 ||
      maximumPoolSize < corePoolSize ||
      keepAliveTime < 0)
    throw new IllegalArgumentException();
  if (workQueue == null || threadFactory == null || handler == null)
    throw new NullPointerException();
  this.acc = System.getSecurityManager() == null ?
    null :
  AccessController.getContext();
  this.corePoolSize = corePoolSize;
  this.maximumPoolSize = maximumPoolSize;
  this.workQueue = workQueue;
  this.keepAliveTime = unit.toNanos(keepAliveTime);
  this.threadFactory = threadFactory;
  this.handler = handler;
}

executor()

 /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();
 
  int c = ctl.get();
  if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
      return;
    c = ctl.get();
  }
  if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
      reject(command);
    else if (workerCountOf(recheck) == 0)
      addWorker(null, false);
  }
  else if (!addWorker(command, false))
    reject(command);
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容