Java高并发编程详解:深入理解并发核心库
上QQ阅读APP看书,第一时间看更新

2.1 AtomicInteger详解

本节首先对比一下被synchronized关键字和显式锁Lock(将在2.2节详细讲解)进行同步的int类型和AtomicInteger类型在多线程场景下的性能表现,然后再介绍AtomicInteger的内部原理和使用方法。

2.1.1 性能测试对比

任何新工具的出现,都是为了解决某个具体问题而诞生的,否则就没有存在的必要了,原子类型就是一种无锁的、线程安全的、使用基本数据类型和引用类型的很好的解决方案。在学习使用它之前,我们先来对比一下不同同步手段的性能表现。

程序代码:SynchronizedVsLockVsAtomicInteger.java


package com.wangwenjun.concurrent.juc.automic;

import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.profile.StackProfiler;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.openjdk.jmh.runner.options.TimeValue;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

// 度量批次为10次
@Measurement(iterations = 10)
// 预热批次为10次
@Warmup(iterations = 10)
// 采用平均响应时间作为度量方式
@BenchmarkMode(Mode.AverageTime)
// 时间单位为微秒
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public class SynchronizedVsLockVsAtomicInteger
{
    @State(Scope.Group)
    public static class IntMonitor
    {
        private int x;
        private final Lock lock = new ReentrantLock();
        // 使用显式锁Lock进行共享资源同步
        public void lockInc()
        {
            lock.lock();
            try
            {
                x++;
            } finally
            {
                lock.unlock();
            }
        }

        // 使用synchronized关键字进行共享资源同步
        public void synInc()
        {
            synchronized (this)
            {
                x++;
            }
        }
    }

    // 直接采用AtomicInteger
    @State(Scope.Group)
    public static class AtomicIntegerMonitor
    {
        private AtomicInteger x = new AtomicInteger();

        public void inc()
        {
            x.incrementAndGet();
        }
    }

    // 基准测试方法
    @GroupThreads(10)
    @Group("sync")
    @Benchmark
    public void syncInc(IntMonitor monitor)
    {
        monitor.synInc();
    }

    // 基准测试方法
    @GroupThreads(10)
    @Group("lock")
    @Benchmark
    public void lockInc(IntMonitor monitor)
    {
        monitor.lockInc();
    }

    // 基准测试方法
    @GroupThreads(10)
    @Group("atomic")
    @Benchmark
    public void atomicIntegerInc(AtomicIntegerMonitor monitor)
    {
        monitor.inc();
    }

    public static void main(String[] args) throws RunnerException
    {
        Options opts = new OptionsBuilder()
                .include(SynchronizedVsLockVsAtomicInteger.class.getSimpleName())
                .forks(1)
                .timeout(TimeValue.seconds(10))
                .addProfiler(StackProfiler.class)
                .build();
        new Runner(opts).run();
    }
}

运行上面的基准测试方法将很容易对比出哪种解决方案的效率更高。

基准测试结果输出


Benchmark     Mode  Cnt  Score   Error  Units
atomic         avgt   10  0.436 ± 0.034  us/op
lock           avgt   10  0.714 ± 0.026  us/op
sync           avgt   10  0.933 ± 0.035  us/op

AtomicInteger>显式锁Lock>synchronized关键字

从基准测试的结果不难看出,AtomicInteger的表现更优,在该基准测试的配置中,我们增加了StackProfiler,因此很容易窥探出AtomicInteger表现优异的原因。


synchronized关键字的线程堆栈
 68.5%         BLOCKED
 30.4%         RUNNABLE
  1.1%         WAITING

显式锁Lock的线程堆栈
 79.2%         WAITING
 20.8%         RUNNABLE

AtomicInteger的线程堆栈
 91.0%         RUNNABLE
  9.0%         WAITING

AtomicInteger线程的RUNNABLE状态高达91%,并且没有BLOCKED状态,而synchronized关键字则相反,BLOCKED状态高达68.5%,因此AtomicInteger高性能的表现也就不足为奇了。

2.1.2 AtomicInteger的基本用法

与int的引用类型Integer继承Number类一样,AtomicInteger也是Number类的一个子类,除此之外,AtomicInteger还提供了很多原子性的操作方法,本节将为大家逐一介绍。在AtomicInteger的内部有一个被volatile关键字修饰的成员变量value,实际上,AtomicInteger所提供的所有方法主要都是针对该变量value进行的操作。

1. AtomicInteger的创建

public AtomicInteger():创建AtomicInteger的初始值为0。

public AtomicInteger(int initialValue):创建AtomicInteger并且指定初始值,无参的AtomicInteger对象创建等价于AtomicInteger(0)。

2. AtomicInteger的Incremental操作

x++或者x=x+1这样的操作是非原子性的,要想使其具备原子性的特性,我们可以借助AtomicInteger中提供的原子性Incremental的操作方法。

int getAndIncrement():返回当前int类型的value值,然后对value进行自增运算(在2.1.3节中我们将学习到该方法的内部原理),该操作方法能够确保对value的原子性增量操作。


public static void main(String[] args)
{
    final AtomicInteger ai = new AtomicInteger(5);
    // 返回AtomicInteger的int值,然后自增(在多线程的情况下,下面的断言未必正确)
    assert ai.getAndIncrement() == 5;
    // 获取自增后的结果(在多线程的情况下,下面的断言未必正确)
    assert ai.get() == 6;
}

int incrementAndGet():直接返回自增后的结果,该操作方法能够确保对value的原子性增量操作。


public static void main(String[] args)
{
    // 定义AtomicInteger,初值为5
    final AtomicInteger ai = new AtomicInteger(5);
    // 返回value自增后的结果
    assert ai.incrementAndGet() == 6;
    assert ai.get() == 6;
}

3. AtomicInteger的Decremental操作

x--或者x=x-1这样的自减操作同样也是非原子性的,要想使其具备原子性的特性,我们可以借助AtomicInteger中提供的原子性Decremental的操作方法。

int getAndDecrement():返回当前int类型的value值,然后对value进行自减运算(在2.1.3节中我们将学习到该方法的内部原理),该操作方法能够确保对value的原子性减量操作。


AtomicInteger ai = new AtomicInteger(5);
assert ai.getAndDecrement() == 5;
assert ai.get() == 4;

int decrementAndGet():直接返回自减后的结果,该操作方法能够确保对value的原子性减量操作。


AtomicInteger ai = new AtomicInteger(5);
assert ai.decrementAndGet() == 4;
assert ai.get() == 4;

4. 原子性地更新value值

boolean compareAndSet(int expect, int update):原子性地更新AtomicInteger的值,其中expect代表当前的AtomicInteger数值,update则是需要设置的新值,该方法会返回一个boolean的结果:当expect和AtomicInteger的当前值不相等时,修改会失败,返回值为false;若修改成功则会返回true。


// 定义一个AtomicInteger类型的对象ai并且指定初值为10
AtomicInteger ai = new AtomicInteger(10);
// 调用compareAndSet方法,expect的值为100,修改肯定会失败
assert !ai.compareAndSet(100, 12);
// 修改并未成功,因此新值不等于12
assert ai.get() != 12;
// 执行了compareAndSet更新方法之后,ai的返回值依然为10,因为修改失败
assert ai.get() == 10;

// 调用compareAndSet方法,expect的值为10,修改成功(多线程情况下并不能担保百分之百成功,// 关于这一点,在2.1.3节中会为大家讲解)
assert ai.compareAndSet(10, 12);
// 断言成功
assert ai.get() == 12;

boolean weakCompareAndSet(int expect, int update):目前版本JDK中的该方法与compareAndSet完全一样,源码如下所示。


// compareAndSet方法源码
public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
// weakCompareAndSet方法源码
public final boolean weakCompareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

通过源码我们不难发现两个方法的实现完全一样,那么为什么要有这两个方法呢?其实在JDK 1.6版本以前双方的实现是存在差异的,compareAndSet方法的底层主要是针对Intel x86架构下的CPU指令CAS:cmpxchg(sparc-TSO,ia64的CPU架构也支持),但是ARM CPU架构下的类似指令为LL/SC:ldrex/strex(ARM架构下的CPU主要应用于当下的移动互联网设备,比如在智能手机终端设备中,高通骁龙、华为麒麟等系列都是基于ARM架构和指令集下的CPU产品),或许在运行Android的JVM设备上这两个方法底层存在着差异。

int getAndAdd(int delta):原子性地更新AtomicInteger 的value值,更新后的value为value和delta之和,方法的返回值为value的前一个值,该方法实际上是基于自旋+CAS算法实现的(Compare And Swap)原子性操作。


// 定义一个AtomicInteger类型的对象ai并且指定初始值为10
AtomicInteger ai = new AtomicInteger(10);
// 调用getAndAdd方法,返回value的前一个值为10
assert ai.getAndAdd(2) == 10;
// 调用get方法返回AtomicInteger的value值,当前返回值为12
assert ai.get() == 12;

int addAndGet(int delta):该方法与getAndAdd(int delta)一样,也是原子性地更新AtomicInteger的value值,更新后的结果value为value和delta之和,但是该方法会立即返回更新后的value值。


// 定义一个AtomicInteger类型的对象ai并且指定初始值为10
AtomicInteger ai = new AtomicInteger(10);
// 调用addAndGet方法,返回当前value的值
assert ai.addAndGet(2)==12;
// 调用get方法返回AtomicInteger的value值,当前返回值为12
assert ai.get() == 12;

5. AtomicInteger与函数式接口

自JDK1.8增加了函数式接口之后,AtomicInteger也提供了对函数式接口的支持。

int getAndUpdate(IntUnaryOperator updateFunction):原子性地更新AtomicInteger的值,方法入参为IntUnaryOperator接口,返回值为value更新之前的值。


@FunctionalInterface
public interface IntUnaryOperator {
    // 入参为被操作数,对应于AtomicInteger的当前value值
    int applyAsInt(int operand);
}

IntUnaryOperator为函数式接口,有且仅有一个接口方法(非静态,非default),接口方法的返回值即AtomicInteger被更新后的value的最新值。


// 定义一个AtomicInteger类型的对象ai并且指定初始值为10
AtomicInteger ai = new AtomicInteger(10);
// 调用getAndUpdate方法并且传入lambda表达式,返回结果为value的前一个值
assert ai.getAndUpdate(x -> x + 2) == 10;
// 调用get方法返回AtomicInteger的value值,当前返回值为12
assert ai.get() == 12;

int updateAndGet(IntUnaryOperator updateFunction):原子性地更新AtomicInteger的值,方法入参为IntUnaryOperator接口,该方法会立即返回更新后的value值。


// 定义一个AtomicInteger类型的对象ai并且指定初始值为10
AtomicInteger ai = new AtomicInteger(10);
// 调用updateAndGet方法并且传入lambda表达式,返回结果为value更新后的值
assert ai.updateAndGet(x -> x + 2) == 12;
// 调用get方法返回AtomicInteger的value值,当前返回值为12
assert ai.get() == 12;

int getAndAccumulate(int x, IntBinaryOperator accumulatorFunction):原子性地更新AtomicInteger的值,方法入参为IntBinaryOperator接口和delta值x,返回值为value更新之前的值。


@FunctionalInterface
public interface IntBinaryOperator {
    // 该接口在getAndAccumulate方法中,left为AtomicInteger value的当前值,    // right为delta值,返回值将被用于更新AtomicInteger的value值
    int applyAsInt(int left, int right);
}

IntBinaryOperator为函数式接口,有且仅有一个接口方法(非静态,非default),接口方法的返回值即AtomicInteger被更新后的value的最新值。


// 定义一个AtomicInteger类型的对象ai并且指定初值为10
AtomicInteger ai = new AtomicInteger(10);
int result = ai.getAndAccumulate(5, new IntBinaryOperator()
{
    @Override
    public int applyAsInt(int left, int right)
    {
        assert left == 10;
        assert right == 5;
        return left + right;
    }
});
assert result == 10;
assert ai.get() == 15;

上面的代码片段可以用lambda表达式简化,简写后的代码如下。


// 定义一个AtomicInteger类型的对象ai并且指定初值为10
AtomicInteger ai = new AtomicInteger(10);
int result = ai.getAndAccumulate(5, Integer::sum);
assert result == 10;
assert ai.get() == 15;

int accumulateAndGet(int x, IntBinaryOperator accumulatorFunction):该方法与getAndAccumulate类似,只不过会立即返回AtomicInteger的更新值。


// 定义一个AtomicInteger类型的对象ai并且指定初值为10
AtomicInteger ai = new AtomicInteger(10);
int result = ai.accumulateAndGet(5, Integer::sum);
assert result == 15;
assert ai.get() == 15;

6. 其他方法

void set(int newValue):为AtomicInteger的value设置一个新值,通过对前面内容的学习,我们知道在AtomicInteger中有一个被volatile关键字修饰的value成员属性,因此调用set方法为value设置新值后其他线程就会立即看见。

void lazySet(int newValue):set方法修改被volatile关键字修饰的value值会被强制刷新到主内存中,从而立即被其他线程看到,这一切都应该归功于volatile关键字底层的内存屏障。内存屏障虽然足够轻量,但是毕竟还是会带来性能上的开销,比如,在单线程中对AtomicInteger的value进行修改时没有必要保留内存屏障,而value又是被volatile关键字修饰的,这似乎是无法调和的矛盾。幸好追求性能极致的JVM开发者们早就考虑到了这一点,lazySet方法的作用正在于此。

程序代码:LazySetVsSet.java


package com.wangwenjun.concurrent.juc.automic;

import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 当对性能有异议的时候,JMH这把瑞士军刀总能帮我们找到答案,在该类中,我们写
 * 了两个基准测试方法用于对比set方法和lazyset方法的性能表现
 */
@Measurement(iterations = 10)
@Warmup(iterations = 10)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Thread)
public class LazySetVsSet
{
    private AtomicInteger ai;

    @Setup(Level.Iteration)
    public void setUp()
    {
        this.ai = new AtomicInteger(0);
    }

    @Benchmark
    public void testSet()
    {
        this.ai.set(10);
    }

    @Benchmark
    public void testLazySet()
    {
        this.ai.lazySet(10);
    }

    public static void main(String[] args) throws RunnerException
    {
        Options opts = new OptionsBuilder()
                .include(LazySetVsSet.class.getSimpleName())
                .forks(1)
                .build();
        new Runner(opts).run();
    }
}

运行上面的基准测试代码,我们很容易就能得到合理的判断,运行结果如下。


Benchmark                    Mode  Cnt  Score    Error  Units
LazySetVsSet.testLazySet     avgt   10  0.003 ±  0.001  us/op
LazySetVsSet.testSet         avgt   10  0.028 ±  0.006  us/op

int get():返回AtomicInteger的value当前值。

2.1.3 AtomicInteger内幕

经过了详细的AtomicInteger的使用方法的学习,本节就来看看AtomicInteger类的内部原理,以更加深入地了解AtomicInteger的内幕。


// Unsafe是由C++实现的,其内部存在着大量的汇编 CPU指令等代码,JDK实现的
// Lock Free几乎完全依赖于该类
private static final Unsafe unsafe = Unsafe.getUnsafe();
// valueOffset将用于存放value的内存地址偏移量
private static final long valueOffset;
static {
    try {
        // 获取value的内存地址偏移量
        valueOffset = unsafe.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"));
    } catch (Exception ex) { throw new Error(ex); }
}
// 我们不止一次地说过,在AtomicInteger的内部有一个volatile修饰的int类型成员属性value private volatile int value;

1. compareAndSwapInt源码分析——CAS算法

CAS包含3个操作数:内存值V、旧的预期值A、要修改的新值B。当且仅当预期值A与内存值V相等时,将内存值V修改为B,否则什么都不需要做。

compareAndSwapInt方法是一个native方法,提供了CAS(Compare And Swap)算法的实现,AtomicInteger类中的原子性方法几乎都借助于该方法实现。


...
public final boolean weakCompareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
...
public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
...
public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}
...
// Unsafe 内部方法getAndAddInt源码
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
    return var5;
}

进入Unsafe源码中我们会看到compareAndSwapInt源码。


/**
*由于该方法无法正常反编译,因此笔者在此将方法的入参名进行了一下修改,也许与大家看到的
*的源码存在一些出入
* object:该入参是地址偏移量所在的宿主对象
* valueOffSet:该入参是object对象某属性的地址偏移量,是由Unsafe对象获得的
* expectValue:该值是我们期望value当前的值,如果expectValue与实际的当前
*               值不相等,那么对value的修改将会失败,方法的返回值也会变为false
* newValue:新值
*/
public final native boolean compareAndSwapInt(Object object, long valueOffSet,     int expectValue, int newValue);

通过对compareAndSwapInt方法的简单分析,我们不禁会产生一个疑问,既然可以通过AtomicInteger获得当前值,那么为什么还会出现expectValue和AtomicInteger当前值不相等的情况呢?比如下面的代码片段。


AtomicInteger ai = new AtomicInteger(2);
ai.compareAndSet(ai.get(),10);

原因是相对于synchronized关键字、显式锁Lock,AtomicInteger所提供的方法不具备排他性,当A线程通过get()方法获取了AtomicInteger value的当前值后,B线程对value的修改已经顺利完成;A线程试图再次修改的时候就会出现expectValue与value的当前值不相等的情况,因此会出现修改失败,这种方式也被称为乐观锁。对数据进行修改的时候,首先需要进行比较。

由于compareAndSwapInt是本地方法,因此我们必须打开JDK的源码才能看到相关的C++源码,打开openjdk-jdk8u/hotspot/src/share/vm/prims/unsafe.cpp文件我们会找到相关的C++代码。


UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe,              jobject obj, jlong offset, jint e, jint x))
    UnsafeWrapper("Unsafe_CompareAndSwapInt");
    oop p = JNIHandles::resolve(obj);
    // 根据地址偏移量获取内存地址
    jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
    // 调用Atomic的成员方法
    return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

在C++代码中,我们不难发现Unsafe_CompareAndSwapInt方法依赖于Atomic::cmpxchg方法,该方法实际上会调用不同CPU架构下的汇编代码(汇编代码主要用于执行相关的CPU指令)。下面打开基于x86架构的Atomic::cmpxchg源码文件openjdk-jdk8u/hotspot/src/os_cpu/bsd_x86/vm/atomic_bsd_x86.inline.hpp。


inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
  int mp = os::is_MP();
  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                    : "cc", "memory");
  return exchange_value;
}

cmpxchg是C++的一个内联函数,在其内部主要执行相关的汇编指令cmpxchgl,对汇编指令感兴趣的读者可以参阅Intel的CPU指令手册,其中就有对该指令的详细说明,地址如下:

http://heather.cs.ucdavis.edu/~matloff/50/PLN/lock.pdf

2. 自旋方法addAndGet源码分析

由于compareAndSwapInt方法的乐观锁特性,会存在对value修改失败的情况,但是有些时候对value的更新必须要成功,比如调用incrementAndGet、addAndGet等方法,本节就来分析一下addAndGet方法的实现。


public final int addAndGet(int delta) {
    // 调用Unsafe的getAndAddInt方法
    return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}

// Unsafe类中的getAndAddInt方法
public final int getAndAddInt(Object object, long valueOffset, int delta) {
    int currentValue;
    do {
        // ①
        currentValue= this.getIntVolatile(object, valueOffset);
        // ②
    } while(!this.compareAndSwapInt(object, valueOffset, currentValue,                                     currentValue+ delta));
    return currentValue;
}

▪ 在getAndAddInt方法中有一个直到型do..while循环控制语句,首先在注释①处获取当前被volatile关键字修饰的value值(通过内存偏移量的方式读取内存)。

▪ 在注释②处执行compareAndSwapInt方法,如果执行成功则直接返回,如果执行失败则再次执行下一轮的compareAndSwapInt方法。

通过上面源码的分析,incrementAndGet的执行结果有可能是11也有可能是比11更大的值。


AtomicInteger ai = new AtomicInteger(10);
//这句断言在多线程的情况下未必会成功
assert ai.incrementAndGet() == 11;

自旋方法addAndGet的执行步骤如图2-1所示。

图2-1 自旋方法addAndGet

2.1.4 AtomicInteger总结

本节学习了AtomicInteger的使用方法,并且为大家揭露了AtomicInteger的内部实现原理,本节中所涉及的断言代码assertion是基于JDK的断言语句的,要想使断言语句生效,需要在JVM参数中增加-ea(enable assertion)参数。

本节对于AtomicInteger的讲解非常细致甚至有些啰唆,其主要目的是想让读者对原子类型的原理有一个比较深入的理解。由于后文中的原子类型原理几乎与此一致,因此后续将不会再占用大量的篇幅进行细致的讲解。

另外,所有原子类型其内部都依赖于Unsafe类,2.8节将为大家介绍如何获取Unsafe实例,如何进行Java与C++的混合编程,以及如何使用Unsafe实现一些不可思议的功能。