CountDownLatch、CyclicBarrier、Semaphore 在 java1.5 被引入,它们也都是基于 AQS 实现的,
- ReentrantLock 是基于 Exclusive(独占),只有一个线程可以执行;
- CountDownLatch、CyclicBarrier、Semaphore 基于 Share(共享),多个线程可同时执行;
计数器 CountDownLatch
CountDownLatch.await 能够使一个线程等待, 直到计数器归于 0 后再继续执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行。
CountDownLatch 提供了类似计数器的同步手段, 构造器和主要方法:
// 构造初值=count 的计数器 |
Example:
public class Test { |
CountDownLatch 也是基于 AQS 实现,但是对于 AQS.state 计数器的使用有区别:
- 在 CountDownLatch 中,同步状态 State 表示 CountDownLatch 的计数器的初始值,当
State==0
时,表示无锁状态,且一旦 State 变为 0,就永远处于无锁状态了,此时所有线程在 await 上等待的线程都可以继续执行。 - 而在 ReentrantLock 中,
State==0
时,虽然也表示无锁状态,但是只有一个线程可以重置 State 的值。这就是共享锁的含义。
信号量 Semaphore
Semaphore 翻译成字面意思为 “信号量”,Semaphore 可以控同时访问的任务个数,通过 acquire(int) 获取n个许可,如果没有就等待; release(int) 释放n个许可。
- 如果线程 acquire 不到指定资源数(资源=0 或 acquire 的大于剩余资源),线程阻塞
- release 释放许可,并唤醒队列中一个节点(线程)
Example:
class Pool { |
➤ 构造器和主要方法:
//参数permits表示许可数目,即同时可以允许多少线程进行访问 |
➤ acquire 代码:
//Semaphore方法 |
tryAcquireShared: 由 Sync 子类实现
- 公平策略:若队列非空,先入队
- 非公平策略:先尝试 CAS state
tryAcquireShared (公平策略)代码:
protected int tryAcquireShared(int acquires) { |
上面提到 ReentrantLock 对 state 的操作是基于独占模式,Semaphore 是基于共享模式,二者区别:
- 独占模式的 acquire:
- 如果 state = 0 则尝试 CAS(state, 0, 1)
- 如果 state > 0 则需要测试 exclusiveThread 是否等于当前线程,是则 state+=1
- 共享模式的 acquire:
- CAS(state, available, available-1),available 是当前的 state 值,表示“available”的许可数量
➤ release:
public void release() { |
循环栅栏 CyclicBarrier
CyclicBarrier 是一个辅助同步器类,在 JDK1.5时随着J.U.C 一起引入。
- CyclicBarrier 功能类似 CountDownLatch 区别是:
- CountDownLatch 计数器变为 0 之前,所有调用 await 的线程都需等待,计数器变为 0 后这些线程可以继续执行,但一旦计数器变为 0 就不可重置了;
- CyclicBarrier 构造时同样需要指定计数器 N,工作线程调用 CyclicBarrier.await 后会阻塞,直到 N 个工作线程都调用了 await,这些阻塞的线程才可以继续执行;
- 两种 await 的方式: CyclicBarrier.await(), CyclicBarrier.await(timeout,TimeUnit) ,后一种可以设置等待的超时;
- CyclicBarrier.await 可以响应 InterruptedException,和 BrokenBarrierException
- 如果正在 await 的线程 1 被打断,那么线程 1 的 await 抛出 InterruptedException,其他正在 await 的线程抛出 BrokenBarrierException
- 如果线程 1 使用带超时的 await,并且超时了,线程 1 的 await 抛出 TimeoutException,其他正在 await 的线程抛出 BrokenBarrierException
- 如果有线程调用了 CyclicBarrier.reset,其他正在 await 的线程抛出 BrokenBarrierException
调用了 CyclicBarrier.await 的线程退出等待状态的条件有:1 产生上面几种异常,2 或者达到了 await 的数量
如图,ABCD 四个线程都到达 Barrier 后,才可以同时“穿过”栅栏
![[../_images/Java-并发.05c.JUC-Sync-2023-05-21-1.png]]
Example:
public class CyclicBarrierTest { |
使用 CyclicBarrier
时,对异常的处理一定要小心,比如线程在到达栅栏前就抛出异常,其它已经到达栅栏的线程会一直等待(因为没有还没有满足总数),最终导致程序无法继续向下执行。
线程可以通过以下几种机制避免上述情况:
- 1 使用带超时的 await
- 2 捕获到异常,尝试再次 await 流程(重试)
- 3 重试失败调用 reset
CyclicBarrier reset 相关的源码:
//将屏障重置为其初始状态。 |
reset 方法会调用
breakBarrier
(该方法意为栅栏损坏),该方法设置 broken 标志、重置 count、唤醒所有 await 线程;await 方法中也会调用
breakBarrier
:当收到 InterruptedException,或者超时,都会调用 breakBarrier;
参考:
- Java多线程10 同步工具类CyclicBarrier - 简书
- Java多线程进阶(十九)—— J.U.C之synchronizer框架:CyclicBarrier - 透彻理解Java并发编程 - SegmentFault 思否
Phaser
Phaser
是JDK1.7开始引入的一个同步工具类,适用于一些需要分阶段的任务的处理。它的功能与 CyclicBarrier和CountDownLatch有些类似,类似于一个多阶段的栅栏,比较三者:
同步器 | 作用 |
---|---|
CountDownLatch | 倒数计数器,初始时设定计数器值,线程可以在计数器上等待,当计数器值归 0 后,所有等待的线程继续执行 |
CyclicBarrier | 循环栅栏,初始时设定参与线程数,当线程到达栅栏后,会等待其它线程的到达,当到达栅栏的总数满足指定数后,所有等待的线程继续执行 |
Phaser | 多阶段栅栏,可以在初始时设定参与线程数,也可以中途注册/注销参与者,当到达的参与者数量满足栅栏设定的数量后,会进行阶段升级(advance) |
Phaser 中几个重要的概念:
(1)phase(阶段)
在CyclicBarrier中,只有一个栅栏,线程在到达栅栏后会等待其它线程的到达。
Phaser 也有栅栏,在 Phaser 中,栅栏的名称叫做phase(阶段),在任意时间点,Phaser 只处于某一个phase(阶段),初始阶段为0,最大达到 Integerr.MAX_VALUE
,然后再次归零。当所有parties参与者都到达后,phase值会递增。
如果看过之前关于 CyclicBarrier 的文章,就会知道,Phaser 中的 phase(阶段)这个概念其实和CyclicBarrier中的Generation很相似,只不过Generation没有计数。
(2)parties(参与者)
parties(参与者) 其实就是 CyclicBarrier中的参与线程的概念。
CyclicBarrier 中的参与者在初始构造指定后就不能变更,而 Phaser 既可以在初始构造时指定参与者的数量,也可以中途通过 register
、bulkRegister
、arriveAndDeregister
等方法注册/注销参与者。
(3)arrive(到达) / advance(进阶)
Phaser 注册完 parties(参与者) 之后,参与者的初始状态是 unarrived 的,当参与者 到达(arrive) 当前阶段(phase)后,状态就会变成 arrived。当阶段的到达参与者数满足条件后(注册的数量等于到达的数量),阶段就会发生 进阶(advance) ——也就是 phase 值+1。
(4)Termination(终止)
代表当前Phaser对象达到终止状态,有点类似于CyclicBarrier中的栅栏被破坏的概念。
(5)Tiering(分层)
Phaser 支持分层(Tiering) —— 一种树形结构,通过构造函数可以指定当前待构造的 Phaser 对象的父结点。之所以引入Tiering,是因为当一个 Phaser 有大量 参与者(parties) 的时候,内部的同步操作会使性能急剧下降,而分层可以降低竞争,从而减小因同步导致的额外开销。
在一个分层 Phasers 的树结构中,注册和撤销子 Phaser 或父 Phaser 是自动被管理的。当一个 Phaser 的参与者(parties) 数量变成0时,如果有该 Phaser 有父结点,就会将它从父结点中溢移除。
示例
(1)通过 Phaser 控制多个线程的执行时机:有时候我们希望所有线程到达指定点后再同时开始执行,我们可以利用CyclicBarrier或CountDownLatch来实现,这里给出使用 Phaser 的版本。
public class PhaserTest1 { |
(2) 通过 Phaser 实现开关。在以前讲CountDownLatch时,我们给出过以CountDownLatch实现开关的示例,也就是说,我们希望一些外部条件得到满足后,然后打开开关,线程才能继续执行,我们看下如何用Phaser来实现此功能。
public class PhaserTest2 { |
(3) 通过Phaser控制任务的执行轮数
public class PhaserTest3 { |
(4) Phaser 支持分层功能,我们先来考虑下如何用利用 Phaser 的分层来实现高并发时的优化,在示例三中,我们其实创建了10个任务,然后10个线程共用一个 Phaser 对象,如下图:
如果任务数继续增大,那么同步产生的开销会非常大,利用 Phaser 分层的功能,我们可以限定每个 Phaser 对象的最大使用线程(任务数),如下图:
可以看到,上述 Phasers 其实构成了一颗多叉树,如果任务数继续增多,还可以将 Phaser 的叶子结点继续分裂,然后将分裂出的子结点供工作线程使用。
public class PhaserTest4 { |
@ref::