Java-并发.05b.JUC-Lock(锁)

可重入锁: ReentrantLock

ReentrantLock 是”可重入锁”: 一个线程已经持有锁的情况下, 重复对该锁进行 lock() 操作, 能立刻获得锁且不会被阻塞.

ReentrantLock reentrantLock = new ReentrantLock();

reentrantLock.lock();
try{ ... } catch { ... }
finally{
reentrantLock.unlock(); // 在finally里释放锁
}

ReentrantLock 的构造函数 ReentrantLock(boolean fair) 可以返回公平锁(true)和非公平锁(false).

  • 公平锁(Fair):加锁时先检查是否有排队等待的线程,如果队列非空,当前线程先进入队列尾,尝试唤醒队列中第一个节点的线程,即获取锁的顺序同调用 lock 的顺序一致;
  • 非公平锁(Nonfair):加锁时不考虑排队等待问题,当前线程直接尝试获取锁,获取不到则将当前线程包装为节点并插入队尾;

ReentrantLock()默认构造是 非公平锁

锁的实现

AQS 概述

ReentrantLock 的实现基于 AQS(AbstractQueuedSynchronizer),继承了 AQS 的计数器 state、双向同步队列,ReentrantLock 的条件对象 Condition 是 AQS 的内部类。

-> Java-并发.05b.JUC-AQS

Lock.lock() 实现

ReentrantLock.lock()调用栈如下(以 NonFairSync 为例):

ReentrantLock.lock()
NonFairSync.acquire(1)
AbstractQueuedSynchronizer.acquire(1)
NonFairSync.tryAcquire(1) // ReentrantLock实现的,此处以非公平锁为例
AbstractQueuedSynchronizer.addWaiter(Node) // 当前线程入对尾
AbstractQueuedSynchronizer.acquireQueued(Node) // 如果队列中只有当前线程,尝试唤醒

过程大致如下:

AbstractQueuedSynchronizer.acquire() 的分析见 Java-并发.05b.JUC-AQS

  • NonFairSync.tryAcquire():
    • 如果 state = 0 ,尝试对 state CAS(0,1) 操作, (CAS(0,1) 意即为如果 state 等于期望值0则设置为1),CAS 成功, 成功获取到该锁, 并把 exclusiveOwnerThread 置为当前线程的引用, lock() 成功返回;
    • 如果 state != 0,则检查 exclusiveOwnerThread 是否等于线程自己,如果是,则对 state+1,lock() 成功返回;
  • AQS.addWaiter() & AQS.acquireQueued():
    • CAS 不成功, 表明已经有线程持有该锁, 且 exclusiveOwnerThread 不等于当前线程, 创建当前线程的 AQS.Node 对象, 并插入 AQS 的队尾, 并调用 LockSupport.park() 使当前 Thread 进入 Blocked

LockSupport.park() 最终调用了 unsafe.park(Native 方法), 作用是 block 当前线程, 具体参考 LockSupport中的park与unpark原理-CSDN博客
Java 线程的阻塞以及唤醒,都是依靠操作系统来完成的。举例来说,对于符合 posix 接口的操作系统(如 macOS 和绝大部分的 Linux),上述操作是通过 pthread 的互斥锁(mutex)来实现的。此外,这些操作将涉及系统调用,需要从操作系统的用户态切换至内核态,其开销非常之大。

用线程 A/B 抢锁的场景说明 ReentrantLock 工作流程:

  • 线程 A 获取到锁: A 是首个获取锁的线程, CAS 成功, 获取到锁后, 设置 exclusiveOwnerThread 为 A 的线程 ID
  • 线程 B 获取锁: A 已经持有锁, 所以 B 在这里 CAS 失败, 线程 B 的 Node 被放入队尾, 然后 B 线程 park;
  • 线程 A 释放锁: CAS 状态值-1, 然后取队列的首节点( 注意队列的 head 节点不存储信息, 这里取的是 head 后面的节点), 然后 LockSupport.unpark(B) 唤醒线程 B, 此时 B 仍在队列;
  • 线程 B 唤醒执行: B 唤醒后自旋调用 tryAcquire() 再次尝试获取锁, 若成功则把自己从队列删除(AQS.head 设置为 B 节点, 并清除 B 节点信息)

Lock.unlock() 实现

ReentrantLock.unlock() 调用栈如下:

ReentrantLock.unlock()
NonFairSync.release(1)
AbstractQueuedSynchronizer.release(1)
Sync.tryRelease(1) // 具体实现类,释放state
AbstractQueuedSynchronizer.unparkSuccessor // 取出队列第一个节点线程,unpark唤醒
  • 只允许已经持有锁的线程调用 unlock(), 否则 unlock() 会抛出 IllegalMonitorStateException 异常
  • 已经持有锁的线程, 每次调用 unlock() 计数器都会-1, 直到计数器等于 0, 这时候表示锁全部被解开了, 再从 AQS 的队列取出第一个节点进行唤醒;

测锁与超时

ReentrantLock lock = new ReentrantLock();

if(lock.tryLock(1000, TimeUnit.MILLISECOND)) {
// 超时时间内尝试获取锁失败立刻返回
}
if(lock.tryLock()) {
// 尝试获取锁失败立刻返回
}

读写锁

  • 如果一个数据结构只有很少线程修改其值, 但是有很多线程读取, 这种数据结构非常适合用读写锁 ReentrantReadWriteLock
  • writeLock 一旦被持有, 排除其他的写锁和读锁;
  • readLock 一旦被持有, 排斥写锁, 但不排斥其他的读锁;
ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
Lock wl = rwl.writeLock(); // 获取写锁
Lock rl = rwl.readLock(); // 获取读锁

wl.lock();
// 写操作, 这里排斥其他的读写锁
wl.unlock();

rl.lock();
// 读操作, 其他读锁仍可以进入, 写锁则不能
rl.unlock();

RW Lock 其他特点:

  • 支持 Fair 和 NoFair 两种模式
  • 支持 Condition: 只有写锁支持 newCondition(), 读锁不支持这个方法, 为什么这样设计? // 当持有写锁时, 读锁是可以任意访问的, 即使拿到了读锁的 Condition 也没有意义, 因为读线程之前不需要协调
  • 可以降级: 写锁可以降级为读锁, 当线程先拿到读锁, 接着拿到写锁, 此时写锁是被降级为读锁的

@ref: JUC锁: ReentrantReadWriteLock详解 | Java 全栈知识体系

条件对象: Condition

一个锁可以创建多个 ConditionObject 对象,ReentrantLock.newCondition() 返回一个 ConditionObject, 它是 AbstractQueuedSynchronizer 的一个内部类;

使用示例:

ReentrantLock reentrantLock = new ReentrantLock();
Condition condition = reentrantLock.newCondition();

reentrantLock.lock();
try {
while(!resource_is_available) {
condition.await(); //1 失去锁 & waiting状态 & 进入condition的等待集
}

// here acquired lock, doSomething...

condition.signalAll(); //2 把condition等待集里的所有线程移除
} catch() {...}
finally {
reentrantLock.unlock(); //3 永远在finally里释放锁
}

条件对象的实现

区分 AQS 的队列和 Condition 的队列:
同步队列”也即 AQS 的队列,通过 Node.prev 属性和 Node.next 属性来维护的队列;
条件队列”是 ConditionObject 的队列,通过 Node.nextWaiter 属性来维护队列(也看出条件队列是单向队列);
另外,有些书将 prev 属性和 next 属性维护的队列称为“同步队列”,将 nextWaiter 维护的队列称为“等待队列”;
根据源码的注释,其实两个队列都可以称为“等待队列”,因此特以“同步队列”和“条件队列”来区分

await 的实现解析:

public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 1. 将当前线程包装成Node,尾插入到「等待队列」中
Node node = addConditionWaiter();

// 2. 释放当前线程所占用的lock,并唤醒「同步队列」中的下一个节点
int savedState = fullyRelease(node);

int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 3. 调用park,当前线程在此 wait 并让出CPU
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
} /* 如果线程恢复运行,退出这个 while 循环的条件可能是:
情况1是线程被interrupt,走到break;
情况2是线程被放入了「同步队列」,即其他线程调用signal or signalAll
*/

// 4. 从上面的while中醒来,不断自旋尝试获取到lock
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 5. 处理被中断的情况
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

signal 的实现解析:

public final void signal() {
//1. 先检测当前线程是否已经获取lock,持有lock的线程才可以signal
if (!isHeldExclusively())
throw new IllegalMonitorStateException();

//2. 获取等待队列中第一个节点(头节点)
Node first = firstWaiter;
if (first != null)
//3. 将刚获取的节点,状态变更为CONDITION,并插入到「同步队列」
doSignal(first);
}

signalAll 的实现解析:

private void doSignalAll(Node first) {
// 清空 condition 的 lastWaiter & firstWaiter
lastWaiter = firstWaiter = null;
do {
// 从头遍历「等待队列」所有节点
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first); // 节点放入「同步队列」
first = next;
} while (first != null);
}

线程调用完 signal()/signalAll() 后,一般都会再调用 unlock(),释放 state 计数器,并从「同步队列」唤醒一个线程;

@ref: 深入理解Java并发线程协作类Condition | Java程序员进阶之路

比较 ReentrantLock 和 synchronized

  1. ReentrantLock 可以”可中断的”获取锁 void lockInterruptibly() throws InterruptedException
  2. ReentrantLock 可以尝试非阻塞地获取锁 boolean tryLock()
  3. ReentrantLock 可以超时获取锁,通过 tryLock(timeout, unit)
  4. ReentrantLock 可以实现公平锁,通过 new ReentrantLock(true) 实现
  5. ReentrantLock 对象可以同时绑定多个 Condition 对象,只需要多次调用 newCondition() 方法即可。而在 synchronized 中只能使用一个对象的 wait(), notify(), notifyAll()
  6. Condition 对应的方法是 await(), signal(), signalAll(), Object 对应的方法 wait(), notify(), notifyAll()
  7. ReentrantLock 的实现是 AQS, synchronized 实现模型是 Monitor

注: ReentrantLock.lockInterruptibly() : 当调用 lockInterruptibly 时如果线程有中断标志, 则抛出 InterruptedException, 如果调用 InterruptedException 没有拿到锁, 线程进入 Blocked 状态, 是可以被 Interrupt 的
@ref Java多线程进阶(二)—— J.U.C之locks框架:接口 - 透彻理解Java并发编程 - SegmentFault 思否