Java-并发.05d.JUC-Sync(同步器)

CountDownLatch、CyclicBarrier、Semaphore 在 java1.5 被引入,它们也都是基于 AQS 实现的,

  • ReentrantLock 是基于 Exclusive(独占),只有一个线程可以执行;
  • CountDownLatch、CyclicBarrier、Semaphore 基于 Share(共享),多个线程可同时执行;

计数器 CountDownLatch

CountDownLatch.await 能够使一个线程等待, 直到计数器归于 0 后再继续执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。

CountDownLatch 提供了类似计数器的同步手段, 构造器和主要方法:

// 构造初值=count 的计数器
public CountDownLatch(int count) { };

//将count值减1
public void countDown() { };

//调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public void await() throws InterruptedException { };

//和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };

Example:

public class Test {
public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(2);

new Thread(() -> {
Thread.sleep(3000);
latch.countDown();
}).start();

new Thread(() -> {
Thread.sleep(6000);
latch.countDown();
}).start();

latch.await(); // 在这里阻塞直到latch执行过两次countDown()
}
}

CountDownLatch 也是基于 AQS 实现,但是对于 AQS.state 计数器的使用有区别:

  • 在 CountDownLatch 中,同步状态 State 表示 CountDownLatch 的计数器的初始值,当 State==0 时,表示无锁状态,且一旦 State 变为 0,就永远处于无锁状态了,此时所有线程在 await 上等待的线程都可以继续执行。
  • 而在 ReentrantLock 中,State==0 时,虽然也表示无锁状态,但是只有一个线程可以重置 State 的值。这就是共享锁的含义。

信号量 Semaphore

Semaphore 翻译成字面意思为 “信号量”,Semaphore 可以控同时访问的任务个数,通过 acquire(int) 获取n个许可,如果没有就等待; release(int) 释放n个许可。

  • 如果线程 acquire 不到指定资源数(资源=0 或 acquire 的大于剩余资源),线程阻塞
  • release 释放许可,并唤醒队列中一个节点(线程)

Example:

class Pool {
private static final int MAX_AVAILABLE = 100; // 可同时访问资源的最大线程数
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
protected Object[] items = new Object[MAX_AVAILABLE]; //共享资源
protected boolean[] used = new boolean[MAX_AVAILABLE];
public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
private synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null;
}
private synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}

➤ 构造器和主要方法:

//参数permits表示许可数目,即同时可以允许多少线程进行访问
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

//多了一个fair表示是否是公平的,即等待时间越久的越先获取许可
public Semaphore(int permits, boolean fair) {
sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
}

//获取一个许可
public void acquire() throws InterruptedException { }

//获取permits个许可
public void acquire(int permits) throws InterruptedException { }

//释放一个许可
public void release() { }

//释放permits个许可
public void release(int permits) { }

➤ acquire 代码:

//Semaphore方法
public void acquire() throws InterruptedException {
//传递参数为1,说明要获取1个信号量资源
sync.acquireSharedInterruptibly(1);
}

//AQS的方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//(1)如果线程被中断,则抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
//(2)否则调用Sync子类方法尝试获取,分为公平和非公平策略
if (tryAcquireShared(arg) < 0)
//(3)如果获取失败则放入阻塞队列.然后再次尝试,如果使用则调用park方法挂起当前线程
doAcquireSharedInterruptibly(arg);
}

tryAcquireShared: 由 Sync 子类实现

  • 公平策略:若队列非空,先入队
  • 非公平策略:先尝试 CAS state

tryAcquireShared (公平策略)代码:

protected int tryAcquireShared(int acquires) {
for (;;) {
//查询是否当前线程节点的前驱节点也在等待获取该资源,有的话直接返回
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

上面提到 ReentrantLock 对 state 的操作是基于独占模式,Semaphore 是基于共享模式,二者区别:

  • 独占模式的 acquire:
    • 如果 state = 0 则尝试 CAS(state, 0, 1)
    • 如果 state > 0 则需要测试 exclusiveThread 是否等于当前线程,是则 state+=1
  • 共享模式的 acquire:
    • CAS(state, available, available-1),available 是当前的 state 值,表示“available”的许可数量

➤ release:

public void release() {
//(1)arg=1
sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
//(2)尝试释放资源
if (tryReleaseShared(arg)) {
//(3)资源释放成功则调用park方法唤醒AQS队列里面最先挂起的线程
doReleaseShared();
return true;
}
return false;
}

protected final boolean tryReleaseShared(int releases) {
for (;;) {
//获取当前信号量值
int current = getState();
//将当前信号量值增加releases,这里为增加1
int next = current + releases;
//移除处理
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//使用CAS保证更新信号量值的原子性
if (compareAndSetState(current, next))
return true;
}
}

循环栅栏 CyclicBarrier

CyclicBarrier 是一个辅助同步器类,在 JDK1.5时随着J.U.C 一起引入。

  • CyclicBarrier 功能类似 CountDownLatch 区别是:
    • CountDownLatch 计数器变为 0 之前,所有调用 await 的线程都需等待,计数器变为 0 后这些线程可以继续执行,但一旦计数器变为 0 就不可重置了;
    • CyclicBarrier 构造时同样需要指定计数器 N,工作线程调用 CyclicBarrier.await 后会阻塞,直到 N 个工作线程都调用了 await,这些阻塞的线程才可以继续执行;
  • 两种 await 的方式: CyclicBarrier.await(), CyclicBarrier.await(timeout,TimeUnit) ,后一种可以设置等待的超时;
  • CyclicBarrier.await 可以响应 InterruptedException,和 BrokenBarrierException
    • 如果正在 await 的线程 1 被打断,那么线程 1 的 await 抛出 InterruptedException,其他正在 await 的线程抛出 BrokenBarrierException
    • 如果线程 1 使用带超时的 await,并且超时了,线程 1 的 await 抛出 TimeoutException,其他正在 await 的线程抛出 BrokenBarrierException
    • 如果有线程调用了 CyclicBarrier.reset,其他正在 await 的线程抛出 BrokenBarrierException

调用了 CyclicBarrier.await 的线程退出等待状态的条件有:1 产生上面几种异常,2 或者达到了 await 的数量

如图,ABCD 四个线程都到达 Barrier 后,才可以同时“穿过”栅栏
![[../_images/Java-并发.05c.JUC-Sync-2023-05-21-1.png]]

Example:

public class CyclicBarrierTest  {

public static void main(String[] args) throws Exception{

ExecutorService service = Executors.newFixedThreadPool(3) ;

// CyclicBarrier 可以指定一个最后执行的任务
CyclicBarrier cyclicBarrier = new CyclicBarrier(3,()->{
System.out.println("全部线程都调用了await,则执行这里");
}) ;

for(int i = 0 ; i < 3 ; i++) {
final int number = i ;
service.execute(()->{
try {
System.out.println("线程 Num" + number + " start");
cyclicBarrier.await(); // 线程在此wait,直到达成条件
} catch (Exception e) {
// 这里应该分别处理 InterruptedException & BrokenBarrierException
// 如果使用了带超时的 await,这里还需要捕获 TimeoutException
e.printStackTrace();
}
});
}
service.shutdown();
}
}

使用 CyclicBarrier 时,对异常的处理一定要小心,比如线程在到达栅栏前就抛出异常,其它已经到达栅栏的线程会一直等待(因为没有还没有满足总数),最终导致程序无法继续向下执行。

线程可以通过以下几种机制避免上述情况:

  • 1 使用带超时的 await
  • 2 捕获到异常,尝试再次 await 流程(重试)
  • 3 重试失败调用 reset

CyclicBarrier reset 相关的源码:

//将屏障重置为其初始状态。
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//唤醒所有等待的线程继续执行,并设置屏障中断状态为true
breakBarrier(); // break the current generation
//唤醒所有等待的线程继续执行,并设置屏障中断状态为false
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}

private void breakBarrier() {
generation.broken = true;//表示当代因为线程被中断,已经发成损坏了
count = parties;//重置count值
trip.signalAll();//调用Condition的signalAll方法,唤醒所有await的线程
}

private void nextGeneration() {
// signal completion of last generation
trip.signalAll();//调用Condition的signalAll方法,唤醒所有await的线程

count = parties;//重置count值
//生成新的Generation,表示上一代的所有线程已经唤醒,进行更新换代
generation = new Generation();
}
  • reset 方法会调用 breakBarrier(该方法意为栅栏损坏),该方法设置 broken 标志、重置 count、唤醒所有 await 线程;

  • await 方法中也会调用 breakBarrier:当收到 InterruptedException,或者超时,都会调用 breakBarrier;

参考:

Phaser

PhaserJDK1.7开始引入的一个同步工具类,适用于一些需要分阶段的任务的处理。它的功能与 CyclicBarrierCountDownLatch有些类似,类似于一个多阶段的栅栏,比较三者:

同步器 作用
CountDownLatch 倒数计数器,初始时设定计数器值,线程可以在计数器上等待,当计数器值归 0 后,所有等待的线程继续执行
CyclicBarrier 循环栅栏,初始时设定参与线程数,当线程到达栅栏后,会等待其它线程的到达,当到达栅栏的总数满足指定数后,所有等待的线程继续执行
Phaser 多阶段栅栏,可以在初始时设定参与线程数,也可以中途注册/注销参与者,当到达的参与者数量满足栅栏设定的数量后,会进行阶段升级(advance)

Phaser 中几个重要的概念:

(1)phase(阶段)

CyclicBarrier中,只有一个栅栏,线程在到达栅栏后会等待其它线程的到达。

Phaser 也有栅栏,在 Phaser 中,栅栏的名称叫做phase(阶段),在任意时间点,Phaser 只处于某一个phase(阶段),初始阶段为0,最大达到 Integerr.MAX_VALUE,然后再次归零。当所有parties参与者都到达后,phase值会递增。

如果看过之前关于 CyclicBarrier 的文章,就会知道,Phaser 中的 phase(阶段)这个概念其实和CyclicBarrier中的Generation很相似,只不过Generation没有计数。

(2)parties(参与者)

parties(参与者) 其实就是 CyclicBarrier中的参与线程的概念。

CyclicBarrier 中的参与者在初始构造指定后就不能变更,而 Phaser 既可以在初始构造时指定参与者的数量,也可以中途通过 registerbulkRegisterarriveAndDeregister 等方法注册/注销参与者。

(3)arrive(到达) / advance(进阶)

Phaser 注册完 parties(参与者) 之后,参与者的初始状态是 unarrived 的,当参与者 到达(arrive) 当前阶段(phase)后,状态就会变成 arrived。当阶段的到达参与者数满足条件后(注册的数量等于到达的数量),阶段就会发生 进阶(advance) ——也就是 phase 值+1。

(4)Termination(终止)

代表当前Phaser对象达到终止状态,有点类似于CyclicBarrier中的栅栏被破坏的概念。

(5)Tiering(分层)

Phaser 支持分层(Tiering) —— 一种树形结构,通过构造函数可以指定当前待构造的 Phaser 对象的父结点。之所以引入Tiering,是因为当一个 Phaser 有大量 参与者(parties) 的时候,内部的同步操作会使性能急剧下降,而分层可以降低竞争,从而减小因同步导致的额外开销。

在一个分层 Phasers 的树结构中,注册和撤销子 Phaser 或父 Phaser 是自动被管理的。当一个 Phaser 的参与者(parties) 数量变成0时,如果有该 Phaser 有父结点,就会将它从父结点中溢移除。

示例

(1)通过 Phaser 控制多个线程的执行时机:有时候我们希望所有线程到达指定点后再同时开始执行,我们可以利用CyclicBarrierCountDownLatch来实现,这里给出使用 Phaser 的版本。

public class PhaserTest1 {
public static void main(String[] args) {
Phaser phaser = new Phaser();
for (int i = 0; i < 10; i++) {
phaser.register(); // 注册各个参与者线程
new Thread(new Task(phaser), "Thread-" + i).start();
}
}
}

class Task implements Runnable {
private final Phaser phaser;

Task(Phaser phaser) {
this.phaser = phaser;
}

@Override
public void run() {
int i = phaser.arriveAndAwaitAdvance(); // 等待其它参与者线程到达
// do something
System.out.println(Thread.currentThread().getName() + ": 执行完任务,当前phase =" + i + "");
}
}

(2) 通过 Phaser 实现开关。在以前讲CountDownLatch时,我们给出过以CountDownLatch实现开关的示例,也就是说,我们希望一些外部条件得到满足后,然后打开开关,线程才能继续执行,我们看下如何用Phaser来实现此功能。

public class PhaserTest2 {

public static void main(String[] args) throws IOException {
Phaser phaser = new Phaser(1); // 注册主线程,当外部条件满足时,由主线程打开开关
for (int i = 0; i < 10; i++) {
phaser.register(); // 注册各个参与者线程
new Thread(new Task2(phaser), "Thread-" + i).start();
}

// 外部条件:等待用户输入命令
System.out.println("Press ENTER to continue");
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
reader.readLine();

// 打开开关
phaser.arriveAndDeregister();
System.out.println("主线程打开了开关");
}
}

class Task2 implements Runnable {
private final Phaser phaser;

Task2(Phaser phaser) {
this.phaser = phaser;
}

@Override
public void run() {
int i = phaser.arriveAndAwaitAdvance(); // 等待其它参与者线程到达

// do something
System.out.println(Thread.currentThread().getName() + ": 执行完任务,当前phase =" + i + "");
}
}

(3) 通过Phaser控制任务的执行轮数

public class PhaserTest3 {
public static void main(String[] args) throws IOException {

int repeats = 3; // 指定任务最多执行的次数

Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("---------------PHASE[" + phase + "],Parties[" + registeredParties + "] ---------------");
return phase + 1 >= repeats || registeredParties == 0;
}
};

for (int i = 0; i < 10; i++) {
phaser.register(); // 注册各个参与者线程
new Thread(new Task3(phaser), "Thread-" + i).start();
}
}
}

class Task3 implements Runnable {
private final Phaser phaser;

Task3(Phaser phaser) {
this.phaser = phaser;
}

@Override
public void run() {
while (!phaser.isTerminated()) { //只要Phaser没有终止, 各个线程的任务就会一直执行
int i = phaser.arriveAndAwaitAdvance(); // 等待其它参与者线程到达
// do something
System.out.println(Thread.currentThread().getName() + ": 执行完任务");
}
}
}

(4) Phaser 支持分层功能,我们先来考虑下如何用利用 Phaser 的分层来实现高并发时的优化,在示例三中,我们其实创建了10个任务,然后10个线程共用一个 Phaser 对象,如下图:

../_images/Java-并发.05c.JUC-Sync-2023-05-21-2.png

如果任务数继续增大,那么同步产生的开销会非常大,利用 Phaser 分层的功能,我们可以限定每个 Phaser 对象的最大使用线程(任务数),如下图:

../_images/Java-并发.05c.JUC-Sync-2023-05-21-3.png

可以看到,上述 Phasers 其实构成了一颗多叉树,如果任务数继续增多,还可以将 Phaser 的叶子结点继续分裂,然后将分裂出的子结点供工作线程使用。

public class PhaserTest4 {
private static final int TASKS_PER_PHASER = 4; // 每个Phaser对象对应的工作线程(任务)数

public static void main(String[] args) throws IOException {

int repeats = 3; // 指定任务最多执行的次数
Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("---------------PHASE[" + phase + "],Parties[" + registeredParties + "] ---------------");
return phase + 1 >= repeats || registeredParties == 0;
}
};

Tasker[] taskers = new Tasker[10];
build(taskers, 0, taskers.length, phaser); // 根据任务数,为每个任务分配Phaser对象

for (int i = 0; i < taskers.length; i++) { // 执行任务
Thread thread = new Thread(taskers[i]);
thread.start();
}
}

private static void build(Tasker[] taskers, int lo, int hi, Phaser phaser) {
if (hi - lo > TASKS_PER_PHASER) {
for (int i = lo; i < hi; i += TASKS_PER_PHASER) {
int j = Math.min(i + TASKS_PER_PHASER, hi);
build(taskers, i, j, new Phaser(phaser));
}
} else {
for (int i = lo; i < hi; ++i)
taskers[i] = new Tasker(i, phaser);
}

}
}

class Task4 implements Runnable {
private final Phaser phaser;

Task4(Phaser phaser) {
this.phaser = phaser;
this.phaser.register();
}

@Override
public void run() {
while (!phaser.isTerminated()) { //只要Phaser没有终止, 各个线程的任务就会一直执行
int i = phaser.arriveAndAwaitAdvance(); // 等待其它参与者线程到达
// do something
System.out.println(Thread.currentThread().getName() + ": 执行完任务");
}
}
}

@ref::