Java-并发.05a.JUC-Atomic(原子操作)& CAS

一些概念(乐观锁、CAS)

▷ 原子操作:

对int变量赋值的操作是原子的吗? - 知乎

▷ CAS:

  • CAS(Compare And Swap):比较并替换。CAS 需要有 3 个操作数:内存地址 V,旧的预期值 A,即将要更新的目标值 B。
  • CAS 指令执行时,当且仅当内存地址 V 的值与预期值 A 相等时,将内存地址 V 的值修改为 B 并返回 True,否则就什么都不做并返回 False。比较+替换是一个原子操作。

▷ 乐观锁 & 悲观锁:

  • 悲观锁: 假定会出现冲突, 加锁前检查所有冲突的可能性,每次在拿数据的时候都会上锁,拿到锁之后才可以修改临界区数据;
  • 乐观锁(Optimistic Locking): 假定不会出现冲突, 先尝试去修改数据的操作, 根据操作的返回值确定是否抢锁成功;
  • Java 中乐观锁 & 悲观锁的实现: CAS & ReentrantLock

Atomic

在 JDK 5之前 Java 语言是靠 synchronized 关键字保证同步的, synchronized 的实现是 monitor 对象, 和 ReentantLock 类似也是一种重量级的锁, 获取锁失败会导致线程的上下文切换.

Volatile 关键字能够在并发条件下,强制将修改后的值刷新到主内存中来保持内存的可见性。通过 CPU 内存屏障禁止编译器指令性重排来保证并发操作的有序性, 但 volatile 也有局限性, volatile++这种依赖当前值的操作并不能保证原子性.

在这种情况下, 不想使用重量级锁, Volatile 自增无法满足原子性, 这时候可以使用 java.util.concurrent.atomic ,支持了大部分 Java 包的原子操作:

  1. 原子更新基本类型:AtomicBoolean, AtomicInteger, AtomicLong.
  2. 原子更新数组:AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray.
  3. 原子更新引用类型:AtomicReference, AtomicStampedReference, AtomicMarkableReference.
  4. 原子更新字段类型:AtomicReferenceFieldUpdater, AtomicIntegerFieldUpdater, AtomicLongFieldUpdater.
  5. 其他:LongAdder
AtomicInteger atomicInteger = new AtomicInteger();
// incrementAndGet提供了几个方法实现原子操作:
int i = atomicInteger.get();
atomicInteger.getAndSet(0); // 当前值设置为0, 并返回之前的值
atomicInteger.compareAndSet(1,3); // 如果当前值等于1, 则更新为3
atomicInteger.decrementAndGet(1); // 自减
atomicInteger.incrementAndGet(1); // 自增

AtomicInteger 的 CAS 实现

java.util.concurrent.atomic 包下的类, 大都是调用 sun.misc.Unsafe 里的方法实现的, sun.misc.Unsafe 主要的 CAS 方法: compareAndSetObject, compareAndSetIntcompareAndSetLong, 它们都是 native 方法, 最终实现在 Hotspot 的 unsafe.cpp, 最终调用了 C++ 的 Atomic::cmpxchg.

public class AtomicInteger extends Number implements java.io.Serializable {

// 值的内存偏移量
private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value");

// 值
private volatile int value;

// 最终调用的是unsafe.compareAndSetInt, 传参是:
// this:此 AtomicInteger对象
// VALUE: 值在AtomicInteger对象中的偏移量
// expectedValue:期望值
// newValue:新值
public final boolean compareAndSet(int expectedValue, int newValue) {
return U.compareAndSetInt(this, VALUE, expectedValue, newValue);
}
}

unsafe.compareAndSetInt 是 native 函数:

// unsafe.cpp

// UNSAFE_ENTRY 和 UNSAFE_END 都是宏,在预编译期间会被替换成真正的代码
// 下面的 jboolean、jlong 和 jint 等是一些类型定义(typedef)
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);

// 根据偏移量,计算 value 的地址。这里的 offset 就是 AtomaicInteger.value的Offset
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);

// 调用 Atomic 中的函数 cmpxchg,该函数声明于 Atomic.hpp 中
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
//  Atomic.hpp

atomic.cppunsigned Atomic::cmpxchg(unsigned int exchange_value, volatile unsigned int* dest, unsigned int compare_value) {
assert(sizeof(unsigned int) == sizeof(jint), "more work to do");

/*
* 根据操作系统类型调用不同平台下的重载函数,这个在预编译期间编译器会决定调用哪个平台下的重载
* 函数。相关的预编译逻辑如下:
*
* atomic.inline.hpp:
* #include "runtime/atomic.hpp"
*
* // Linux
* #ifdef TARGET_OS_ARCH_linux_x86
* # include "atomic_linux_x86.inline.hpp"
* #endif
*
* // 省略部分代码
*
* // Windows
* #ifdef TARGET_OS_ARCH_windows_x86
* # include "atomic_windows_x86.inline.hpp"
* #endif
*
* // BSD
* #ifdef TARGET_OS_ARCH_bsd_x86
* # include "atomic_bsd_x86.inline.hpp"
* #endif
*
*
*/
return (unsigned int)Atomic::cmpxchg((jint)exchange_value, (volatile jint*)dest, (jint)compare_value);
}

C++ 的 Atomic::cmpxchg 在不同的平台有不同的实现, 以 Windows 为例:

// atomic_windows_x86.inline.hpp
#define LOCK_IF_MP(mp) __asm cmp mp, 0 \
__asm je L0 \
__asm _emit 0xF0 \
__asm L0:
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) { // alternative for InterlockedCompareExchange
int mp = os::is_MP();
__asm {
mov edx, dest
mov ecx, exchange_value
mov eax, compare_value LOCK_IF_MP(mp)
cmpxchg dword ptr [edx], ecx
}
}

上面的代码由 LOCK_IF_MP 预编译标识符和 cmpxchg 函数组成。为了看到更清楚一些,我们将 cmpxchg 函数中的 LOCK_IF_MP 替换为实际内容。如下:

inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {  
// 判断是否是多核 CPU
int mp = os::is_MP();
__asm { // 将参数值放入寄存器中
mov edx, dest // 注意: dest 是指针类型,这里是把内存地址存入 edx 寄存器中
mov ecx, exchange_value
mov eax, compare_value
// LOCK_IF_MP
cmp mp, 0 /*
* 如果 mp = 0,表明是线程运行在单核 CPU 环境下。此时 je 会跳转到 L0 标记处,
* 也就是越过 _emit 0xF0 指令,直接执行 cmpxchg 指令。也就是不在下面的 cmpxchg 指令
* 前加 lock 前缀。
*/
je L0 /*
* 0xF0 是 lock 前缀的机器码,这里没有使用 lock,而是直接使用了机器码的形式。至于这样做的
* 原因可以参考知乎的一个回答:
* https://www.zhihu.com/question/50878124/answer/123099923
*/
_emit 0xF0L0: /*
* 比较并交换。简单解释一下下面这条指令,熟悉汇编的朋友可以略过下面的解释:
* cmpxchg: 即“比较并交换”指令
* dword: 全称是 double word,在 x86/x64 体系中,一个
* word = 2 byte,dword = 4 byte = 32 bit
* ptr: 全称是 pointer,与前面的 dword 连起来使用,表明访问的内存单元是一个双字单元
* [edx]: [...] 表示一个内存单元,edx 是寄存器,dest 指针值存放在 edx 中。
* 那么 [edx] 表示内存地址为 dest 的内存单元
*
* 这一条指令的意思就是,将 eax 寄存器中的值(compare_value)与 [edx] 双字内存单元中的值
* 进行对比,如果相同,则将 ecx 寄存器中的值(exchange_value)存入 [edx] 内存单元中。
*/
cmpxchg dword ptr [edx], ecx
}
}

CAS 的实现离不开处理器的支持。如上面源代码所示,程序会根据当前处理器的类型来决定是否为 cmpxchg 指令添加 lock 前缀。如果程序是在多处理器上运行,就为 cmpxchg 指令加上 lock 前缀(lock cmpxchg)。反之,如果程序是在单处理器上运行,就省略 lock 前缀(单处理器自身会维护单处理器内的顺序一致性,不需要 lock 前缀提供的内存屏障效果)。

intel 的手册对 lock 前缀的说明如下:

  • 确保对内存的读/改/写操作原子执行。在 Pentium 及 Pentium 之前的处理器中,带有 lock 前缀的指令在执行期间会锁住总线,使得其他处理器暂时无法通过总线访问内存。很显然,这会带来昂贵的开销。从 Pentium 4,Intel Xeon 及 P6 处理器开始,intel 在原有总线锁的基础上做了一个很有意义的优化:如果要访问的内存区域(area of memory)在 lock 前缀指令执行期间已经在处理器内部的缓存中被锁定(即包含该内存区域的缓存行当前处于独占或以修改状态),并且该内存区域被完全包含在单个 缓存行(cache line) 中,那么处理器将直接执行该指令。由于在指令执行期间该缓存行会一直被锁定,其它处理器无法读 / 写该指令要访问的内存区域,因此能保证指令执行的原子性。这个操作过程叫做 缓存锁定(cache locking),缓存锁定将大大降低 lock 前缀指令的执行开销,但是当多处理器之间的竞争程度很高或者指令访问的内存地址未对齐时,仍然会锁住总线。

  • 禁止该指令与之前和之后的读和写指令重排序。

  • 把写缓冲区中的所有数据刷新到内存中。

缓存行(cache line)和 MESI 协议参考: [[../21.Operating-System/01.CPU_Cache]]

上面的第 2 点和第 3 点所具有的内存屏障效果,JUC 的 Atomic.CAS 操作 足以同时实现 volatile 读和 volatile 写的内存语义。

@ref: https://www.cnblogs.com/huansky/p/15746624.html

AtomicReference

AtomicReference:原子性更新引用

实现:

  • AtomicReference<T> 有一个 T 类型的 value 成员,使用这个成员来保存引用
  • 在 static 代码块中,计算 value 在内存中的 offset
  • compareAndSet 即对 value 进行CAS
static {  
try {
valueOffset = unsafe.objectFieldOffset
(AtomicReference.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

public final boolean compareAndSet(V expect, V update) {
return unsafe.compareAndSwapObject(this, valueOffset, expect, update);
}

Atomic FieldUpdater

AtomicReferenceFieldUpdater, AtomicIntegerFieldUpdater, and AtomicLongFieldUpdater 原子性更新引用中的 field

使用 AtomicIntegerFieldUpdater 对类的 int 类型的 field 进行原子性更新:

class Test {
volatile int count;

public int getCount() {
return count;
}
}

public static void main(String[] args) throws InterruptedException {

// 创建 FieldUpdater 对象, 并与Test.class 的filed 关联起来
AtomicIntegerFieldUpdater<Test> countFieldUpdater =
AtomicIntegerFieldUpdater.newUpdater(Test.class, "count");

Test test = new Test();

ExecutorService es = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
es.execute(() -> {
for (int c = 0; c < 100; c++) {
countFieldUpdater.incrementAndGet(test);
}
});
}
es.shutdown();
es.awaitTermination(10, TimeUnit.MINUTES);
System.out.println("count: " + test.getCount());
}

使用 AtomicReferenceFieldUpdater<T, V> 对类的引用类型 field 进行原子性更新: :

  • T - The type of the object holding the updatable field
  • V - The type of the field

@ref: Java - Atomic Field Updaters: AtomicReferenceFieldUpdater, AtomicIntegerFieldUpdater, and AtomicLongFieldUpdater

AtomicStampedReference 解决 ABA 问题

AtomicReference 的通用工作模式:

AtomicReference<Object> ref = new AtomicReference<>(new Object());
Object oldVal = ref.get();

// 对旧对象做一些更新,得到新对象
Object newVal = someFunctionOfOld(oldVal);

// 如果期间没有其它线程改变了缓存值,则更新
boolean success = ref.compareAndSet(oldVal , newVal);

但是上面的模式会有 ABA 问题:

  • 线程 1 获取 Atomic 的 oldVal = A,但是因为某些原因让出了 CPU
  • 线程 2 获取到 oldVal= A,并成功 CAS(A,B)
  • 线程 2 继续 CAS(B,A)并且成功,Atomic 的最终值是A
  • 线程 1 恢复,CAS(A,B)也成功了

Java 中的 AtomicStampedReference<E> 通过版本号避免了 ABA 问题,它通过包装 [E,Integer] 的 Pair 来对对象标记版本戳 stamp,从而避免 ABA 问题:

AtomicStampedReference<Object>  ref = new AtomicStampedReference<>(new Object(),0);  // 创建AtomicStampedReference对象,持有Obj引用,版本为0

int[] stamp = new int[1];
Object oldVal = ref.get(stamp); // get 方法,一次调用获取 ref 和 stamp
int oldStamp=stamp[0]; // stamp[0] 保存的是上次的版本号

ref.compareAndSet(oldVal, new Object(), oldStamp, oldStamp + 1)

@ref:: Java多线程进阶(十四)—— J.U.C之atomic框架:AtomicReference - 透彻理解Java并发编程 - SegmentFault 思否

LongAdder

已经有 AtomicLong.addAndGet(l) 的情况下, 为什么还要设计 LongAdder?

AtomicLong 是使用 “自旋+CAS”实现, 面对大量并发的情况下有性能瓶颈:

public final long getAndAddLong(Object o, long offset, long delta) {
   long v;
   do {
       v = getLongVolatile(o, offset);
  } while (!compareAndSwapLong(o, offset, v, v + delta));
   return v;
}

LongAdder 的使用:

  • compareAndSet
  • addAndGet
  • decrementAndGet

除了 CAS,LongAdder 还提供了很多 add 场景的操作,LongAdder 更适用于 add 并发多的场景。

LongAdder 的实现:

  • 内部是 base + cell 数组,LongAdder 初始化时 cell = null
  • 如果是写操作,尝试 CAS base,
    • 如果 CAS base 成功,则只更新base
    • 如果 CAS 失败说明有竞争,这时候开始初始化 cell 数组,并通过线程 hashCode 确认 cell 数组的位置(这里是用了 THREAD_PROBE.get(Thread.currentThread()), 是不是线程 hashCode 需要再确认,通过这个 hashCode 去确认数组位置,和 HashMap 的方法类似:index = (cell.length - 1) & hash
    • 以后线程进行写入操作时,如果 cell 非空,会尝试 CAS 属于自己的 cell
    • cell 也会扩容(double cell 数组)需要注意 cell 数组大小超过 CPU 核数后则不再扩容
  • 读操作(sum 方法):
    • base + ∑cell[] ,简单的累加操作,如果当前有线程正在改写 cell,这里得到的仍是一个近似值

@ref Java多线程进阶(十七)—— J.U.C之atomic框架:LongAdder - 透彻理解Java并发编程 - SegmentFault 思否