旧的线程安全的集合: 任何集合类都可以通过使用同步包装器变成线程安全的:
List<E> synchArrayList = Collections.synchronizedList(new ArrayList<E>()); |
java.util.concurrent
包提供了线程安全的集合, 继承关系如下:
阻塞队列 |
Queue & Deque
Name | 是否阻塞 | 是否有界 | 队列长度 | 内部实现 |
---|---|---|---|---|
ArrayBlockingQueue | 阻塞 | 有界 | 构造器指定 | 循环数组,FIFO |
LinkedBlockingQueue | 阻塞 | 有界 | 构造器指定, 默认Int.Max | 链表,FIFO |
LinkedBlockingDeque | 阻塞 | 有界 | 构造器指定, 默认Int.Max | 双向链表,FIFO |
SynchronousQueue | 阻塞 | 有界 | 1 | |
PriorityBlockingQueue | 阻塞 | 无界 | 构造器指定, 默认11, 无限扩容 | 二叉堆 |
DelayQueue | 阻塞 | 无界 | 初始empty, 无限扩容 | |
ConcurrentLinkedQueue | 非阻塞 | 无界 | 初始empty, 无限扩容 | 单向链表 |
ConcurrentLinkedDeque | 非阻塞 | 无界 | 初始empty, 无限扩容 | 双向链表 |
线程安全的队列可以分为 阻塞队列 , 非阻塞队列, 按照是否可无限扩容分为 有界队列 , 无界队列 :
阻塞队列:
- 当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。
- 阻塞队列一般是用锁(例如
BlockingQueue
)来实现,阻塞队列继承自接口BlockingQueue
, 常用的有:ArrayBlockingQueue
,LinkedBlockingQueue
,PriorityBlockingQueue
,LinkedBlockingDeque
;
非阻塞队列是指:
- 非阻塞队列一般是用
CAS
实现的”Lock-Free”方法, - 非阻塞队列主要有:
ConcurrentLinkedQueue
,ConcurrentLinkedDeque
;
有界/无界: 无界队列可以无限扩容,一般链表实现的队列属于无界
阻塞队列
阻塞队列一般使用condition实现消费者和生产者的”通讯”。
比如当生产者往满的队列里添加元素时会阻塞住,当消费者消费了队列中的元素后,会通过condition通知生产者当前队列可用。
BlockingQueue接口方法有put/take:
阻塞方法:
- put(E o):将元素添加到此队列尾,如果队列满将一直阻塞,可以响应中断。
- take():检索并移除此队列的头部,如果队列为空则一直阻塞,可以响应中断。
不阻塞且抛异常的方法:
- add(E o):将元素添加到此队列中,如果队列已满不会阻塞,直接抛出 IllegalStateException
- remove(): 移除队列头部的元素,如果队列为空不会阻塞,直接抛出 IllegalStateException
不阻塞且带返回值的方法:
- offer(E o): 将元素添加到队列,不阻塞,成功返回true,失败返回false;
- offer(E o, long timeout, TimeUnit unit): 带等待时间的offer方法,如果队列已满,将等待指定的时间;
- poll(long timeout, TimeUnit unit): 返回队列的头部并移除,如果队列为空,则等待指定等待的时间。如果取不到返回null;
其他方法:
drainTo(Collection<? super E> c)
: 移除此队列中所有可用的元素,并将它们添加到给定 collection 中。drainTo(Collection<? super E> c,int maxElements)
: 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中remainingCapacity()
: 返回在无阻塞的理想情况下(不存在内存或资源约束)此队列能接受的元素数量;如果没有内部限制,则返回 Integer.MAX_VALUE。
ArrayBlockingQueue
ArrayBlockingQueue 是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序
构造器
ArrayBlockingQueue(int)
都要指定数组初始大小,并且大小不再扩展。- 默认情况下 ArrayBlockingQueue 不保证访问者公平的访问队列,所谓“公平访问队列”是指:当队列可用时,可以按照阻塞的先后顺序访问队列。我们可以使用以下代码创建一个“公平的”阻塞队列:
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
- 队列满时,调用特定的插入方法会阻塞;
队列空时,调用特定的删除方法会阻塞
ArrayBlockingQueue 内部实现:
- 一个 ReentrantLock,阻塞方法,无论读写都是用这个 lock;
- 两个 Condition(notFull、notEmpty) 管理队列满或空时的阻塞状态;
在「生产者」+「消费者」情景下,因为读写都是共用同一个锁对象,由此也意味着两者无法真正并行运行,ArrayBlockingQueue 的吞吐量不如 LinkedBlockingQueue
@ref: Java多线程进阶(三二)—— J.U.C之collections框架:ArrayBlockingQueue - 透彻理解Java并发编程 - SegmentFault 思否
LinkedBlockingQueue
- LinkedBlockingQueue 是链表实现的“有界”的阻塞队列。构造函数可以指定最大长度,如果不指定则最大长度默认为
Integer.MAX_VALUE
- 插入方法
put(E e)
、offer(e, time, unit)
,如果队列满了,会阻塞调用者线程; - 获取方法
take()
、poll(time, unit)
,如果队列为空,会阻塞调用者线程; - 内部基于链表实现,
- 两个指针:head 和 last 指向链表头尾;
- 由两个锁(takeLock 与 putLock),出队和入队时加锁
- 两个 Condition(notFull、notEmpty) ,管理队列满或空时的阻塞状态。
由于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
入队代码:
/** |
以入队为例,新元素入队后,
- 如果链表还没满,需要唤醒 notFull 条件上的“入队线程”;
- 如果新元素入队前,链表中元素数量为 0,说明可能有读线程阻塞在 get 方法(也即 notEmpty 条件);
@ref: Java多线程进阶(三三)—— J.U.C之collections框架:LinkedBlockingQueue - 透彻理解Java并发编程 - SegmentFault 思否
LinkedBlockingDeque
- LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列。构造函数可以指定最大长度,如果不指定,队列的默认和最大长度为
Integer.MAX_VALUE
- 相比其他的阻塞单向队列,LinkedBlockingDeque多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法
SynchronousQueue
SynchronousQueue 特性,在某次添加元素后必须等待其他线程取走后才能继续添加,可以认为 SynchronousQueue 是一个缓存值为1的阻塞队列(虽然是属于无界的);
但是 isEmpty()方法永远返回是true,remainingCapacity() 方法永远返回是0,remove()和removeAll() 方法永远返回是false,iterator()方法永远返回空,peek()方法永远返回null。
SynchronousQueue 没有使用 lock,而是使用了 CAS(一种名为“Dual stack and Dual queue”的无锁算法实现。)
SynchronousQueue 有两种不同的模式:公平模式 or 非公平模式(默认),
- 如果采用公平模式:这种模式下 transferer 被初始化队列,如果队列为空,先发起 get 的线程可以先从阻塞中被通知;
- 如果是非公平模式:这种模式下 transferer 被初始化栈,如果队列为空,先发起 get 的线程后通知;
@ref: Java多线程进阶(三五)—— J.U.C之collections框架:SynchronousQueue - 透彻理解Java并发编程 - SegmentFault 思否
PriorityBlockingQueue
PriorityBlockingQueue 是一种无界阻塞队列,在构造的时候可以指定队列的初始容量。具有如下特点:
PriorityBlockingQueue 与之前介绍的阻塞队列最大的不同之处就是:它是一种优先级队列,也就是说元素并不是以 FIFO 的方式出/入队,而是以按照权重大小的顺序出队,所以队列中的元素必须是可以比较的,元素必须实现
Comparable
接口;PriorityBlockingQueue 是真正的无界队列(仅受内存大小限制),它不像 ArrayBlockingQueue 那样构造时必须指定最大容量,也不像 LinkedBlockingQueue 默认最大容量为
Integer.MAX_VALUE
,虽然 PriorityBlockingQueue 也支持构造函数指定大小,但因为自动扩容所以元素数量不会受限制;由于 PriorityBlockingQueue 无界队列,所以插入元素永远不会阻塞线程;但是当队列为空时,取出操作(take)会阻塞线程;
内部实现:
- PriorityBlockingQueue 底层是一种基于数组实现的堆结构,排序等功能的实现与 PriorityQueue 类似;
- 一个 ReentrantLock 锁对象,一个 notEmpty 条件对象
@ref Java多线程进阶(三四)—— J.U.C之collections框架:PriorityBlockingQueue - SegmentFault 思否
DelayQueue
DelayQueue
是 JDK1.5时,随着J.U.C 包一起引入的一种阻塞队列,它实现了 BlockingQueue 接口,底层基于已有的PriorityBlockingQueue实现:
- 队列中的元素必须实现 Delayed 接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。 Delayed 接口继承了 Comparable 接口,必须实现 compareTo 来指定元素的顺序。
- 由于DelayQueue内部委托了PriorityBlockingQueue对象来实现所有方法,所以能以堆的结构维护元素顺序,这样剩余时间最小的元素就在堆顶,每次出队其实就是删除剩余时间≤0的最小元素。
我们可以将 DelayQueue 运用在以下应用场景:
- 定时任务调度。使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从 DelayQueue 中获取到任务就开始执行,从比如 TimerQueue 就是使用 DelayQueue 实现的。
实现:
- DelayQueue 的主要成员:
- ReentrantLock lock: 保证线程安全
- Thread leader:最早调用 get 并阻塞的线程
- Condition available:条件对象,get 并阻塞的线程在此等待
- 因为是最小堆,所以堆顶是剩余时间最小的元素,每次 take 时:
- 如果堆顶时间未到,调用 get 的线程阻塞在 available;
- 如果堆顶元素时间到了,则取出堆顶;
实际的 put / take 更复杂,因为涉及到 Leader-Follower 机制:
- leader 是最早调用 take 并阻塞的线程;loader 阻塞在 available(条件对象),调用的是带超时时间的
awaitNanos(delay)
- 其他后续来 take 的线程也在 available 阻塞,用的是无限阻塞;
- loader 醒来后,先检测队头的节点是否到期,如果是则取走队头,并唤醒其他在 available 上的第一个线程,线程醒来(是在 take 函数的
await()
),
/** |
@ref: Java多线程进阶(三六)—— J.U.C之collections框架:DelayQueue - 透彻理解Java并发编程 - SegmentFault 思否
非阻塞队列
ConcurrentLinkedQueue
- ConcurrentLinkedQueue是一个基于链接节点的无边界的线程安全队列,它采用FIFO原则对元素进行排序。采用“wait-free”算法(即CAS)来实现的。
- ConcurrentLinkedQueue的结构是单向链表和head/tail两个指针,因为入队时需要修改队尾元素的next指针,以及修改tail指向新入队的元素两个CAS动作无法原子,所以需要的特殊的算法,见:Java 理论与实践: 非阻塞算法简介
ConcurrentLinkedDeque
- ConcurrentLinkedDeque是一种基于双向链表的无界链表。
- 与大多数集合类型不同,其size方法不是一个常量操作。因为链表的异步性质,确定当前元素的数量需要遍历所有的元素,所以如果在遍历期间有其他线程修改了这个集合,size方法就可能会报告不准确的结果。
- 批量的操作:包括添加、删除或检查多个元素,比如addAll()、removeIf()或者removeIf() 或forEach()方法,这个类型并不保证以原子方式执行。由此可见如果想保证原子访问,不得使用批量操作的方法。
List
CopyOnWriteArrayList
ArrayList 底层使用数组,实现了 List 接口,并且提供了 get(i) set(i)这种随机访问的方法,
线程安全的版本是 Vector,通过 Synchronized 整个方法实现了线程安全,但是性能太差,
CopyOnWriteArrayList 是一种线程安全的 ArrayList,更适合读多写少的场景
实现:
- 主要成员:Object 数组、ReentrantLock 锁
- get:不加锁
- add/remove:这些会更改数组的方法,都是用 ReentrantLock 加锁,使用
Arrays.copyOf
将旧内容拷贝入新数组,然后替换掉旧数组
public boolean add(E e) { |
写操作因为要 copy 整个旧数组,代价还是较高,适用于写非常少的场景
@ref:
Set
ConcurrentSkipListSet
ConcurrentSkipListSet的实现非常简单,其内部引用了一个ConcurrentSkipListMap对象,所有API方法都是调用了ConcurrentSkipListMap。
ConcurrentSkipListSet和TreeSet,它们虽然都是有序的集。但是:
第一,它们的线程安全机制不同,TreeSet是非线程安全的,而ConcurrentSkipListSet是线程安全的;
第二,ConcurrentSkipListSet是通过ConcurrentSkipListMap实现的,而TreeSet是通过TreeMap实现的;
Map
ConcurrentHashMap(JDK 1.8)
在 JDK1.7之前,ConcurrentHashMap 是通过分段锁机制来实现的,所以其最大并发度受 Segment 的个数限制。因此,在 JDK1.8中,ConcurrentHashMap 的实现原理摒弃了这种设计,而是选择了与 HashMap 类似的数组+链表+红黑树的方式实现,而加锁则采用 CAS 和 synchronized 实现。
1.8 中 ConcurrentHashMap 使用了 CAS + Synchronized 两种方式,put 流程如下:
- 计算 index,方式同 HashMap: index = hash & (n -1 )
- 如果 table[i] = null, CAS 插入这个位置
- 如果 table[i] 是链表 or 红黑树,则 synchronized 锁住 table[i],也即此处的 Node 对象
- 如果 table[i].hash = MOVED, 即“ForwardingNode 节点”,说明此时 HashMap 正在扩容,则调用 transfer 协助迁移
/** |
➤ 扩容 tryPresize() 实现:
- 通过 CAS 保证只能由一个线程进行桶数组的扩容;
- 对链表进行红黑树转换的时候(触发阈值 8 ), 如果桶数组小于 64,则不进行红黑树转换,而是进行扩容,把数组长度扩大到原来的两倍;
- 然后把旧数组中的所有元素,迁移到新数组中去;
➤ 迁移 transfer() 的实现:
- 如果 table[i]处节点的类型是 ForwardingNode,则说明这个节点已经迁移完成了;
- transfer 可以多个线程并发执行;
- 因为桶数组和计算 index(参考 hashMap )的特性,扩容前和扩容后的位置只有 2 种可能,在原位置 or 原位置+oldCap,这种处理方式非常利于扩容时多个线程同时进行的数据迁移操作,因为旧 table 的各个桶中的结点迁移不会互相影响,所以就可以用分治的方式,将整个 table 数组划分为很多部分,每一部分包含一定区间的桶,每个数据迁移线程处理各自区间中的结点;
此处的 forwarding 节点的作用,与 copying GC 时用到的对象 forwarding ptr 作用非常类似
➤ 计算元素个数 sum() 的实现:
- ConcurrentHashMap 的键值对计数逻辑与 LongAdder 的实现类似:一个 long 型的 base,另外是 cell[] 数组;
- 如果 CAS base 成功,直接在 base 上累加,如果 CAS 失败了,也即发生冲突,线程会根据自己的 hash,找到 cell[i],然后对该 cell 进行 CAS+1;
- 计算 sum 时,和 LongAdder 一样,也是一个瞬时值
final long sumCount() { |
@ref::
- Java多线程进阶(二三)—— J.U.C之collections框架:ConcurrentHashMap(1) 原理 - 透彻理解Java并发编程 - SegmentFault 思否
- Java多线程进阶(二四)—— J.U.C之collections框架:ConcurrentHashMap(2) 扩容 - 透彻理解Java并发编程 - SegmentFault 思否
ConcurrentHashMap(JDK 1.7)
- 数据分段存储,每个段有一个写锁(分段锁),当一个线程占用某个段的锁时,其他段也可以正常访问,有效分散了阻塞的概率,而且没有读锁;
- 没有读锁是因为put/remove动作是个原子动作(比如put是一个对数组元素/Entry 指针的赋值操作),读操作不会看到一个更新动作的中间状态;
- 每次扩容为原来容量的2倍,ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment进行扩容;
- 在获取size操作的时候,不是直接把所有segment的count相加就可以可到整个ConcurrentHashMap大小,也不是在统计size的时候把所有的segment的put、remove、clean方法全部锁住,这种方法太低效。
在累加count操作过程中,之前累加过的count发生变化的几率非常小,所有ConcurrentHashMap的做法是先尝试2(RETRIES_BEFORE_LOCK)次通过不锁住Segment的方式统计各个Segment大小,如果统计的过程中,容器的count发生了变化,再采用加锁的方式来统计所有的Segment的大小。 - putIfAbsent(k,v):当k已经存在时返回已存在的v。
➤ 内部实现:
concurrencyLevel
: 并行级别、也是Segment 数,默认是 16,也就是说 ConcurrentHashMap 有 16 个 Segments,所以理论上,最多可以同时支持 16 个线程并发写,只要它们的操作分别分布在不同的 Segment 上。这个值可以在初始化的时候设置为其他值,但是一旦初始化以后,它是不可以扩容的。
- 实例化ConcurrentHashMap时带参数时,会根据参数调整table的大小,假设参数为100,最终会调整成256,确保table的大小总是2的幂次方, 为什么?
- put操作:
计算桶位置: i = (table.size-1) & hash
如果 table[i] == null : 用自旋+CAS 改变 table[i] 的值
如果 f = table[i] != null : Synchronized(f)锁住f节点 - size的实现: 类似 LongAdder
ConcurrentSkipListMap
- JDK6新增的并发优化的SortedMap,以SkipList实现。SkipList是红黑树的一种简化替代方案,是个流行的有序集合算法。Concurrent包选用它是因为它支持基于CAS的无锁算法,而红黑树则没有好的无锁算法。
- ConcurrentSkipListMap 的key是有序的;
- 与ConcurrentHashMap相比,ConcurrentSkipListMap 支持更高的并发。ConcurrentSkipListMap 的存取时间是log(n),和线程数几乎无关。也就是说在数据量一定的情况下,并发的线程越多,ConcurrentSkipListMap越能体现出优势。
- 它的size()比较特殊,需要遍历所有元素;
Deprecated: Vector & HashTable
Vector和HashTable已经被弃用,取而代之的是ArrayList和HashMap,如果要使用线程安全的容器,可以用Collections转换:
List<E> syncList = Collections.synchronzedList(new ArrayList<E>()); |