快速结论

  • Java的CAS是通过汇编命令cmpxchg进行的包装与定制实现atomic memory operation (AMO) i
  • CAS由于是进行主动比较,因此compareAndSet一定会放在一个自旋中,它适用于冲突(自旋次数)比较少的场景。在某些激烈的场景下可能性能还不如互斥锁。
  • CPU汇编层面没有实现CAS的公平调用,需要上层业务自行实现

CAS的简介

如果没有听过CAS的话,下面是简要入门,比如下面可以达到最终总是100,而不会小于100

AtomicInteger a = new AtomicInteger(0)
def runnable = {
    println "a=${a.incrementAndGet()}, ${Thread.currentThread().getName()}"
}
100.times {
    new Thread(runnable).start()
}
Thread.currentThread().join()

CAS入门文章

配置GDB断点(选读)

本文将通过GDB调试JamVM实现对CAS的源码分析,首先要搭建GDB调试环境,然后才能进行测试

测试代码如下

import java.util.concurrent.atomic.AtomicInteger;

class A{
    public static void main(String[] args){
        AtomicInteger a = new AtomicInteger(1);
        System.out.println("a = " + a.incrementAndGet());
    }
}

接着使用Javac进行编译

javac A.java

最后使用JamVM与GDB进行断点测试

jamvm -cp . A

断点打在natives.ccompareAndSwapInt方法中,就可以分析调用栈了

CAS在JVM上的流程分析

CAS在Java层上的实现

CAS在Java中最后的实现都是sun.misc.Unsafe类,而它几乎全部是Native方法,几乎没有什么可以分析,值得注意的是,在不同JVM中实现的过程不一定一样,但是最后肯定都有一个循环与CAS,下文是OracleJDK的实现

// 1. 开发者调用JDK
AtomicInteger.incrementAndGet();
// 2. JDK内部自旋并不断尝试(getAndAddInt)
int var5;
do {
  var5 = getIntVolatile(this, offset);
} while(!compareAndSwapInt(this, offset, var5, var5 + 1));
return var5;

其中offset是通过Unsafe方法获取var5相对于AtomicInteger这个class的native内存地址偏移

上面的 getIntVolatilecompareAndSwapInt 均是native方法,其中thisAtomicInteger这个结构体

  • this与offset: 表示内存中的数值
  • var5: 是旧的数值
  • var5+1: 是新的数值

在多线程下,如果没有竞争,1~2次就可以完成循环;在有一定竞争情况下,也就是memory被反复改,它通过不断自旋实现重试

CAS在native层的实现

下面是迷你虚拟机JamVM的unsafe.compareAndSwapInt的C语言实现,最终还是调用到了汇编,通过CPU硬件实现

首先分析objectFieldOffset,它本质是获取Class的Slot中的“value”对应的偏移量

// 在Java中,获取结构体的偏移位置
// Field f = AtomicInteger.class.getDeclaredField("value")
// valueOffset = unsafe.objectFieldOffset(f);
//JamVM natives.c
uintptr_t *objectFieldOffset(Class *class, MethodBlock *mb, uintptr_t *ostack) {
    FieldBlock *fb = fbFromReflectObject((Object*)ostack[1]);

    *(long long*)ostack = (long long)(uintptr_t)
                          &(INST_DATA((Object*)NULL, int, fb->u.offset));
    return ostack + 2;
}

在本文的 AtomicInteger 中,valueOffset为12常量,为什么要做这一步呢,因此CAS基本上都是native操作,需要直接操作内存,后续将通过valueOffset获取到 value

接着分析compareAndSwapInt

// 在Java中,调用如下
// compareAndSwapInt(this, offset, var5, var5 + 1)
//JamVM natives.c
uintptr_t *compareAndSwapInt(Class *class, MethodBlock *mb, uintptr_t *ostack) {
    long long offset = *((long long *)&ostack[2]);
    unsigned int *addr = (unsigned int*)((char *)ostack[1] + offset);
    unsigned int expect = ostack[4];
    unsigned int update = ostack[5];
    int result;
      //调用平台CPU特定的汇编代码实现
    result = COMPARE_AND_SWAP_32(addr, expect, update);
    *ostack++ = result;
    return ostack;
}

// x86-64 的汇编实现宏
#define COMPARE_AND_SWAP_32(addr, old_val, new_val)        \
({                                                         \
    char result;                                           \
    __asm__ __volatile__ ("                                \
        lock;                                              \
        cmpxchgl %4, %1;                                   \
        sete %0"                                           \
    : "=q" (result), "=m" (*addr)/*out*/                   \
    : "m" (*addr), "a" (old_val), "r" (new_val) /*in*/     \
    : "memory");     /*Clobbers, reload from memory*/      \
    result;                                                \
})

通过使用CLion对源码进行文本搜索compareAndSwapInt方法,可以快速找到CAS的实现类

`

这里的汇编格式阅读起来就比较费劲了,你可以选择学一下它的格式(如果要看JVM代码,早晚都得学),并通过这里这里的文档了解q,m,r是什么意思

如果你不想看的话,直接上HopperDisassembler将二进制的进行反编译,注意这里正好操作是反过来的

                     compareAndSwapInt:
0000000000030e80         mov        rcx, qword [ds:rdx+0x10]
0000000000030e84         mov        rax, qword [ds:rdx+0x20]
0000000000030e88         add        rcx, qword [ds:rdx+0x8]
0000000000030e8c         mov        rsi, qword [ds:rdx+0x28]
/* COMPARE_AND_SWAP_32 start */
0000000000030e90         lock cmpxchg dword [ds:rcx], esi
0000000000030e94         sete       al
0000000000030e97         movsx      rax, al
/* COMPARE_AND_SWAP_32 end */
0000000000030e9b         mov        qword [ds:rdx], rax
0000000000030e9e         lea        rax, qword [ds:rdx+0x8]
0000000000030ea2         ret

你可能需要知道各个寄存器的相关约定

最终COMPARE_AND_SWAP_32的伪代码如下,如果兴趣非常强,可以看网上开源的Verilog实现

//下文的 <- 指赋值的意思,Intel文档也是这样写的
--INPUT:
rcx <- memory <- addr
al <- old_val
esi <- new_val
--OUTPUT:
rax <- result
rcx <-memory <- addr
--PROCEDURE:
//cmpxchg dword [ds:rcx], esi
IF al == rcx
THEN
  ZF <- 1;
  rcx <- esi;
ELSE
  ZF <- 0;
  al <- rcx;
FI;
//sete       al
IF ZF == 1
THEN
  al <- 0;
ELSE
  al <- 1;
FI;
//movsx      rax, al
rax <- al
return rax

用C语言简化是这样的

if(old_val == *addr){
  *addr = new_val;
  return true;
} else{
  old_val = *addr;
  return false;
}

详细汇编分析可以参考这篇类似的文章Is x86 CMPXCHG atomic?

注意上面CPU里的Lock是用来锁多核下的Bus实现内存屏障,而不是高级语言中锁代码段的,只是碰巧名称类似而已

附录

CMPXCHG 的一些介绍

通过查询Intel文档,可以得知

OpcodeInstructionDescription
0F B0/rCMPXCHG r/m8,r8Compare AL(accumulator, 累加器) with r/m8. If equal, ZF is set and r8 is loaded into r/m8. Else, clear ZF and load r/m8 into AL.

调用方法

CMPXCHG DEST, SRC

伪代码

IF accumulator == DEST
THEN
  ZF <- 1;
  DEST <- SRC;
ELSE
  ZF <- 0;
  accumulator <- DEST;
FI;

CAS中ABA的问题

使用AtomicStampedReference