Java 多线程并发【7】CAS 和原子类

CAS

CAS ,全称 compare and swap ,比较和交换,用于解决多线程并发情况下,使用锁造成性能损耗的一种机制。CAS操作包含三个操作数——内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。

回顾线程安全的中的内容,线程的并发同步方案分为三种:

  • 互斥同步
  • 非阻塞同步
  • 无同步方案

互斥同步是最安全的保障线程安全的手段,但它最主要的问题就是进行线程阻塞和唤醒会带来性能问题,因此互斥同步也称为阻塞同步。从处理问题的角度上,互斥同步属于悲观的并发策略:总是认为不加同步措施,就会出现问题,无论数据是否发生竞争,都进行加锁、用户态内核态切换、维护锁计数器和检查是否有被阻塞的线程需要唤醒等操作。

而与之对立的是基于冲突检测的乐观并发策略:先进行操作,如果没有其他线程竞争数据,操作就成功了;如果有其他线程进行数据争用,采取其他补救措施(重试),这种乐观的并发策略的多数实现都不需要挂起线程,因此称为非阻塞同步。

Java 中非阻塞同步实现方案是 CAS 操作,JDK 中封装了 sun.misc.Unsafe 类提供了快速的 CAS 操作方法,但并没开放给开发者直接使用,而是通过 J.U.C 中其他的一些 API 来间接使用,常见的就是原子类。

AtomicInteger i = new AtomicInteger();
i.incrementAndGet();

// incrementAndGet 内部调用
public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

底层原理

以原子类为切入点,我们深入源码查看它的底层实现:

public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

这里的 unsafe 是一个 jdk.internal.misc.Unsafe 类型:

private static final Unsafe unsafe = Unsafe.getUnsafe();

查看 getUnsafe 方法中的源码:

private static final Unsafe theUnsafe = new Unsafe();
    
public static Unsafe getUnsafe() {
    return theUnsafe;
}

相当于是 Unsafe 默认创建了一个静态变量,供外部通过 getUnsafe() 访问。

AtomicInteger 中的 ++ 操作,调用了 Unsafe 的非静态方法 getAndAddInt(Object, long, int) ,这个方法的实现是:

@IntrinsicCandidate
public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(o, offset);
    } while (!weakCompareAndSetInt(o, offset, v, v + delta));
    return v;
}

在这个方法中实现了一个典型的 CAS,通过 do..while 循环,不断去获取值,并且尝试进行修改。

首先简单看看如何获取值的:

/** Volatile version of {@link #getInt(Object, long)}  */
@IntrinsicCandidate
public native int getIntVolatile(Object o, long offset);

看起来这是一个 native 方法,在 src/hotspot/share/opto/library_call.cpp 中找到了相关的内容:

case vmIntrinsics::_getIntVolatile: return inline_unsafe_access(!is_store, T_INT, Volatile, false);

循环的终止条件是 !weakCompareAndSetInt(o, offset, v, v + delta),这个方法的定义是:

@IntrinsicCandidate
public final boolean weakCompareAndSetInt(Object o, long offset, int expected, int x) {
    return compareAndSetInt(o, offset, expected, x);
}

内部实际调用 compareAndSetInt

    @IntrinsicCandidate
    public final native boolean compareAndSetInt(Object o, long offset, int expected, int x);

compareAndSetInt 也是个 native 方法,对应 C 语言代码逻辑是:

case vmIntrinsics::_compareAndSetInt:  return inline_unsafe_load_store(T_INT,    LS_cmp_swap,      Volatile);
bool LibraryCallKit::inline_unsafe_load_store(const BasicType type, const LoadStoreKind kind, const AccessKind access_kind) {
  // ...
  switch (kind) {
    case LS_cmp_exchange: {
      result = access_atomic_cmpxchg_val_at(base, adr, adr_type, alias_idx, oldval, newval, value_type, type, decorators);
      break;
    }
    case LS_cmp_swap_weak:
      decorators |= C2_WEAK_CMPXCHG;
    case LS_cmp_swap: {
      result = access_atomic_cmpxchg_bool_at(base, adr, adr_type, alias_idx, oldval, newval, value_type, type, decorators);
      break;
    }
    case LS_get_set: {
      result = access_atomic_xchg_at(base, adr, adr_type, alias_idx, newval, value_type, type, decorators);
      break;
    }
    case LS_get_add: {
      result = access_atomic_add_at(base, adr, adr_type, alias_idx, newval, value_type, type, decorators);
      break;
    }
    default:
      ShouldNotReachHere();
  }
  assert(type2size[result->bottom_type()->basic_type()] == type2size[rtype], "result type should match");
  set_result(result);
  return true;
}

忽略一些无用的细节,在这个方法最后根据 kind 去执行了不同的逻辑,而在`compareAndSetInt 方法中,我们传入的 kind 是 LS_cmp_swap :

    case LS_cmp_swap: {
      result = access_atomic_cmpxchg_bool_at(base, adr, adr_type, alias_idx, oldval, newval, value_type, type, decorators);
      break;
    }

在这个分支中执行的是 access_atomic_cmpxchg_bool_at方法,方法名中的 cmpxchg 的含义是汇编指令,所以这个方法应该是 C 语言操作汇编指令的实现:

Node* GraphKit::access_atomic_cmpxchg_bool_at(Node* obj,
                                              Node* adr,
                                              const TypePtr* adr_type,
                                              int alias_idx,
                                              Node* expected_val,
                                              Node* new_val,
                                              const Type* value_type,
                                              BasicType bt,
                                              DecoratorSet decorators)
 
{
  C2AccessValuePtr addr(adr, adr_type)//
  C2AtomicParseAccess access(this, decorators | C2_READ_ACCESS | C2_WRITE_ACCESS, bt, obj, addr, alias_idx);
  if (access.is_raw()) {
    return _barrier_set->BarrierSetC2::atomic_cmpxchg_bool_at(access, expected_val, new_val, value_type);
  } else {
    return _barrier_set->atomic_cmpxchg_bool_at(access, expected_val, new_val, value_type);
  }
}

通过内存屏障来保证了操作的原子性和防止重排。

而这个终止条件的含义就是尝试更新值是否成功,如果失败就会一直去执行 do 代码块中的尝试获取值的操作。

而通过这个我们也可以看出,在 Java 中,JDK 为我们提供好了关于 CAS 能力的封装,就是原子类。

原子类

原子类底层封装了 CAS 的能力,常见的原子类基本类型包括:

AtomicBoolean
AtomicInteger
AtomicLong

而用于操作数组的包括:

AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray

用于引用类型包括:

AtomicReference<V> // 原子更新引用类型。
AtomicMarkableReferce<E> // 原子更新带有标记位的引用类型。
AtomicStampedReference<V> // 原子更新引用类型, 内部使用Pair来存储元素值及其版本号。
...

用于更新字段的类有四种:

AtomicIntegerFieldUpdater: 原子更新整型的字段的更新器。
AtomicLongFieldUpdater: 原子更新长整型字段的更新器。
AtomicStampedFieldUpdater: 原子更新带有版本号的引用类型。
AtomicReferenceFieldUpdater: 原子更新引用类型字段的更新器。

原子更新字段类都是抽象类,每次使用的时候必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。

基本类型的原子类实现

以 AtomicInteger 为例,它提供了一些常用的 API:

public final int get():获取当前的值
public final int getAndSet(int newValue):获取当前的值,并设置新的值
public final int getAndIncrement():获取当前的值,并自增
public final int getAndDecrement():获取当前的值,并自减
public final int getAndAdd(int delta):获取当前的值,并加上预期的值
void lazySet(int newValue): 最终会设置成newValue,使用lazySet设置值后,可能导致其他线程在之后的一小段时间内还是可以读到旧的值。

相比普通的 int 类型,AtomicInteger 保障了操作的原子性。比如最常见的自增操作:

i++  // // 等效于 i = i + 1 ,三步原子操作:读取 i, 进行 +1 操作,赋值给 i 。

i 为普通的 int 类型时,i++ 是三步操作,并不能保证线程安全:

var temp = i
temp = temp + 1
i = temp

而使用原子类 AtomicInteger,它提供了自增方法 getAndIncrement(),其内部逻辑:

public final int getAndIncrement() {
    return U.getAndAddInt(this, VALUE, 1);
}

getAndAddInt 方法:

    @IntrinsicCandidate
    public final int getAndAddInt(Object o, long offset, int delta) {
        int v;
        do {
            v = getIntVolatile(o, offset);
        } while (!weakCompareAndSetInt(o, offset, v, v + delta));
        return v;
    }

C语言层面也是和 compareAndSetInt 一样:

case vmIntrinsics::_getAndAddInt:    return inline_unsafe_load_store(T_INT,    LS_get_add,       Volatile);

它的 kind 是 LS_get_add ,在 inline_unsafe_load_store 方法中执行的分支是:

    case LS_get_add: {
      result = access_atomic_add_at(base, adr, adr_type, alias_idx, newval, value_type, type, decorators);
      break;
    }

也是在 Java 层面实现了 CAS ,底层实现了原子性操作。

CAS 的缺点

尽管 CAS 看起来是个很优美的方案,但这种操作并不能覆盖互斥同步的所有使用场景,并且 CAS 从语义上来说并不是完美的,它存在一个逻辑漏洞:

ABA 问题

如果一个变量 V 初次读取的时候是 A 值,并且在准备赋值的时候检查到它仍然为 A 值,那我们就能说它的值没有被其他线程改变过了吗?如果在这段期间它的值曾经被改成了 B ,后来又被改回为 A ,那 CAS 操作就会误认为它从来没有被改变过。

J.U.C 包为了解决这个 ABA 问题,提供了一个带有标记的原子引用类 AtomicStampedReference ,它可以通过控制变量值的版本来保证 CAS 的正确性,但大部分 ABA 问题并不会影响并发程序的正确性,所以也就没有什么实质作用,如果需要解决 ABA 问题,还是得用互斥同步中的方案。

AtomicStampedReference 解决 ABA 问题

AtomicStampedReference 主要维护包含一个对象引用以及一个可以自动更新的整数 “stamp” 的 pair 对象来解决 ABA 问题。

解决思路就是对每次更新的值打个标记,记录当我们第二次访问到相同的值的时候,对比标记是否更新过,就能发现值是否更改过。

public class AtomicStampedReference<V{
    private static class Pair<T{
        final T reference;  //维护对象引用
        final int stamp;  //用于标志版本
        
        private Pair(T reference, int stamp) {
            this.reference = reference;
            this.stamp = stamp;
        }
        
        static <T> Pair<T> of(T reference, int stamp) {
            return new Pair<T>(reference, stamp);
        }
    }
    
    private volatile Pair<V> pair;
    // ...
    
    /**
      * expectedReference :更新之前的原始值
      * newReference : 将要更新的新值
      * expectedStamp : 期待更新的标志版本
      * newStamp : 将要更新的标志版本
      */

    public boolean compareAndSet(V   expectedReference,
                             V   newReference,
                             int expectedStamp,
                             int newStamp)
 
{
        // 获取当前的(元素值,版本号)对
        Pair<V> current = pair;
        return
            // 引用没变
            expectedReference == current.reference &&
            // 版本号没变
            expectedStamp == current.stamp &&
            // 新引用等于旧引用
            ((newReference == current.reference &&
            // 新版本号等于旧版本号
            newStamp == current.stamp) ||
            // 构造新的 Pair 对象并 CAS 更新
            casPair(current, Pair.of(newReference, newStamp)));
    }

    private boolean casPair(Pair<V> cmp, Pair<V> val) {
        // 调用 Unsafe 的 compareAndSwapObject() 方法 CAS 更新 pair 的引用为新引用
        return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
    }
}


原文始发于微信公众号(八千里路山与海):Java 多线程并发【7】CAS 和原子类

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由半码博客整理,本文链接:https://www.bmabk.com/index.php/post/85117.html

(0)
小半的头像小半

相关推荐

发表回复

登录后才能评论
半码博客——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!