Java-并发.05b.JUC-AQS()

AQS 简介

ReentrantLock 中有一个 Sync 类型的成员, 根据调用不同的构造方法, sync 被初始化为 NonFairSync (非公平锁, 默认) 或者 FairSync (公平锁), 这两种 Sync 都继承自 AbstractQueuedSynchronizer, 简称 AQS.

AQS 是 java.util.concurrent 的核心, CountDownLatch, Semaphore, ReentrantLock 等都有一个内部类是 AQS 类的子类.

图-ReentrantLock-AQS UML:
ReentrantLock AQS

AbstractQueuedSynchronizer 有几个重要的成员变量:

  • 1 计数器 private volatile int state ;
  • 2 等待线程的队列:双向队列,由 head 和 tail 两个 Node 类型的引用,表示头尾;
  • 3 从 AbstractOwnableSynchronizer 继承来的 exclusiveOwnerThread (Thread 类型);

计数器 statevolatile修饰的, 作用是记录资源:

  • 如果是 ReentrantLock,state 作用是记录锁被重入的次数, 初值是 0, 重入一次+1, 释放一次-1, 计数器为 0 表示没有线程持有该锁, 是 free 的;
  • 如果是 Semaphore,state记录的是可用资源的数量,acquire(int) 可以尝试获取指定个数的资源;

尝试 CAS 修改计数器失败的线程, 会被放入队列尾部;

exclusiveOwnerThread 用来记录当前该锁被哪个线程占用(但不是 volatile 的, 此处有疑问)

➤ AbstractQueuedSynchronizer 抽象类提供的主要的属性和方法:

public abstract class AbstractQueuedSynchronizer {
private transient volatile Node head; // 双向队列头
private transient volatile Node tail; // 双向队列尾
private volatile int state; // 重入计数器

// Lock.lock() 调用了 sync.acquire(1) 方法, 最终调用到 AQS.acquire(1):
public final void acquire(int arg) {
// tryAcquire 由具体类实现,获取state
// FairSync 和 NonFairSync 分别实现了 tryAcquire
if (!tryAcquire(arg) &&
// addWaiter: 当前线程放入等待队列
// acquireQueued: 如果队列中只有刚放入的线程,则尝试唤醒
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// selfInterrupt:中断当前线程,让出CPU
selfInterrupt();
}

// Lock.unlock() 调用了 sync.release(1) 方法, 最终调用到 AQS.acquire(1):
public final boolean release(int arg) {
// tryRelease 由具体类实现,释放state
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 从队列中取出第一个可释放的节点, unpark之
unparkSuccessor(h);
return true;
}
return false;
}
}

static final class Node {
// CANCELLED,值为1,表示当前的线程被取消
// SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark
// CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中
// PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行
// 值为0,表示当前节点在sync队列中,等待着获取锁

/** waitStatus value to indicate thread has cancelled. */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking. */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition. */
static final int CONDITION = -2;
/** waitStatus value to indicate the next acquireShared should * unconditionally propagate. */
static final int PROPAGATE = -3;

volatile int waitStatus;
volatile Thread thread;

volatile Node prev;
volatile Node next;
}

acquire

AQS.acquire(int)解析

  public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

// 参数 node 即刚添加到队尾的节点
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor(); // 返回node.pre
if (p == head && tryAcquire(arg)) {
// pre是head,即队列中只有 node 一个
// tryAcquire: 再尝试 CAS
// 进了这个if 说明抢到锁了,返回false
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node)) // 根据pre.state判断当前线程能否park
interrupted |= parkAndCheckInterrupt(); // park 当前线程
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
  • tryAcquire(int):参数是 int 类型,表示获取多少个资源,tryAcquire 由具体的类实现, ReentrantLock 的 NonfairSync 和 FairSync 都分别实现了 tryAcquire:
    • ReentrantLock.FairSync.tryAcquire:公平方式,如果队列里还有其他线程,返回 false
    • ReentrantLock.NonfairSync.tryAcquire:非公平方式,直接尝试对 state 进行CAS,返回CAS的结果 // 解析 -> Java-并发.05b.JUC-Lock
  • addWaiter: 向队列尾添加(当前线程的)Node
  • acquireQueued:向队列添加完 Node 后,接下来 acquireQueued() 的参数即刚添加的 Node
    • 先判断队列是否只有一个 Node(因头节点不存储信息,这里只需判断 Node 的 pre 是否是 head 引用),如果是队列唯一的线程,那么再尝试一下CAS,如果CAS成功即算抢锁成功,如果CAS不成功
    • shouldParkAfterFailedAcquire 检查一下当前线程是否可以 park,只有当该节点的前驱结点的状态为 SIGNAL 时,才可以对该结点所封装的线程进行 park 操作。否则,将不能进行 park 操作
    • parkAndCheckInterrupt:调用 LockSupport.park(this) 停止当前的线程,让出 CPU

release

AQS.release()解析

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
  • tryRelease(int):参数是 int 类型,表示释放多少个资源,tryRelease 也需要具体类实现:
  • unparkSuccessor(h):唤醒 head 节点后面的节点,唤醒使用的是调 LockSupport.unpark()

condition

➤ 除了上述两个方法,AQS 还有一个重要的内部类 ConditionObject,这个内部类用于实现 ReentrantLock 等的 Condition 机制,一个 Lock 可以创建多个 ConditionObject 对象。Condition 工作机制参考 ReentrantLock Java-并发.05b.JUC-Lock