Java并发编程之 atomic

x33g5p2x  于11个月前 转载在 Java  
字(9.7k)|赞(0)|评价(0)|浏览(84)

多线程的变量自增问题

当程序更新一个变量时,如果多线程同时更新这个变量,可能得到期望之外的值,比如变量i=0,A线程更新i+1,B线程也更新i+1,经过两个线程操作之后可能i不等于2,而是等于1。因为A和B线程在更新变量i的时候拿到的i都是0,这就是线程不安全的更新操作:

比如这段代码的运行结果已经很难等于50000了(数字再调大一些就更不可能了):

public class Main {
    int i = 0;

    void inc() {
        for (int j = 0; j < 25000; j++) i++;
    }

    public static void main(String[] args) {
        Main main = new Main();
        List<Thread> threads = new ArrayList<>();
        for (int i = 0; i < 2; i++) {
            threads.add(new Thread(main::inc));
        }
        threads.forEach(Thread::start);
        threads.forEach((t) -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(main.i);
    }
}

通常我们可以使用synchronized来解决这个问题

synchronized解决多线程的变量自增问题

synchronized会锁住整个对象,其余线程想要拿到对象锁的话就只能阻塞:

final Object lock = new Object();

void inc() {
    synchronized (lock) {
        for (int j = 0; j < 25000; j++) i++;
    }
}
void inc() {
    synchronized (this) {
        for (int j = 0; j < 25000; j++) i++;
    }
}
synchronized void inc() {
    for (int j = 0; j < 25000; j++) i++;
}

但是synchronized的缺点就是它的悲观锁机制导致其余线程想要读也只能阻塞(别谈什么时间问题,下面会有演示)

而Java从JDK 1.5开始提供了java.util.concurrent.atomic包,这个包中的原子操作类AtomicXXX提供了一种用法简单、性能高效、线程安全地更新一个变量的方式:

原子类解决多线程的变量自增问题

AtomicInteger i = new AtomicInteger(0);

void inc() {
    for (int j = 0; j < 25000; j++) i.incrementAndGet();
}

原子类对多线程自增操作的原子性是通过CAS保证的

原子类保证原子性的原理

原子类保证原子性是通过CAS实现的

AtomicInteger为例,它的核心就是compareAndSet()方法和getAndAddInt()方法;其中getAndAddInt()方法的核心是Unsafe类的CompareAndSwapInt()本地方法:

public final boolean compareAndSet(int expect, int update) {
    // 期望值 ↑ , ↑ 更新值
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

如果CAS失败了,则继续循环,执行do块中的内容;否则说明CAS成功,返回交换后的值。

原子更新基本类型类

使用原子的方式更新基本类型,java.util.concurrent.atomic包提供了以下3个类:

  • AtomicBoolean
  • AtomicInteger
  • AtomicLong

Unsafe类提供的CAS本地方法只有三个:

public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);

因此对int类型的数据更新则是用compareAndSwapInt()方法,对long类型的数据更新用compareAndSwapLong()方法,对bool类型的数据进行更新则是先将bool类型的数据转化为int类型的数据,然后再使用compareAndSwapInt()实现:

public final boolean compareAndSet(boolean expect, boolean update) {
    int e = expect ? 1 : 0;
    int u = update ? 1 : 0;
    return unsafe.compareAndSwapInt(this, valueOffset, e, u);
}

原子更新数组

使用原子的方式更新数组,java.util.concurrent.atomic包提供了以下3个类:

  • AtomicIntegerArray
  • AtomicLongArray
  • AtomicReferenceArray

AtomicIntegerArray为例,它的方法与AtomicInteger大致相同,只是多了个参数为数组下标比如:

addAndGet(int i, int delta)

就是将数组下标为i的元素加上delta并返回:

AtomicIntegerArray integerArray = new AtomicIntegerArray(new int[]{1,2});
System.out.println(integerArray.addAndGet(0, 1)); // 2
System.out.println(integerArray); // [2, 2]

数组通过构造方法传递进去,然后AtomicIntegerArray会将当前数组复制一份,所以当AtomicIntegerArray对内部的数组元素进行修改时,不会影响传入的数组。

原子更新引用类型

使用原子的方式更新引用类型,java.util.concurrent.atomic包提供了以下3个类:

  • AtomicReference:原子更新引用类型
User user0 = new User("小明", 18);
AtomicReference<User> userAtomicReference = new AtomicReference<>(user0);
System.out.println(userAtomicReference.get()); // User{name='小明', age=18}
System.out.println(userAtomicReference.compareAndSet(user0, new User("小红", 20))); // true
System.out.println(userAtomicReference.get()); // User{name='小红', age=20}
  • AtomicReferenceFieldUpdater:原子更新引用类型里的字段
User user0 = new User("小明", 18);
AtomicReferenceFieldUpdater<User, String> updater = AtomicReferenceFieldUpdater.newUpdater(User.class, String.class, "name");
System.out.println(user0); // User{name='小明', age=18}
System.out.println(updater.compareAndSet(user0, "小明", "小红")); // true
System.out.println(user0); // User{name='小红', age=18}
  • AtomicMarkableReference::原子更新带有标记位的引用类型。可以原子更新一个布尔类型的标记位和引用类型。构造方法是AtomicMarkableReference(V initialRef,boolean initialMark)
AtomicMarkableReference<User> userMarkable=new AtomicMarkableReference<>(user0,true);
System.out.println(userMarkable.getReference()); // User{name='小明', age=18}
System.out.println(userMarkable.compareAndSet(user0, new User("小红", 20), true, false)); // true
System.out.println(userMarkable.getReference()); // User{name='小红', age=20}
System.out.println(userMarkable.compareAndSet(user0, new User("小白", 30), true, false)); // false
System.out.println(userMarkable.getReference()); // User{name='小红', age=20}

原子更新字段类

要想原子地更新字段类需要两步。第一步,因为原子更新字段类都是抽象类,每次使用的时候必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。第二步,更新类的字段(属性)必须使用public volatile修饰符。
使用原子的方式更新引用类型,java.util.concurrent.atomic包提供了以下3个类:

  • AtomicIntegerFieldUpdater:原子更新整型的字段的更新器
User user0 = new User("小明", 18);
AtomicIntegerFieldUpdater<User> updater = AtomicIntegerFieldUpdater.newUpdater(User.class, "age");
System.out.println(user0); // User{name='小明', age=18}
System.out.println(updater.compareAndSet(user0, 18, 20)); // true
System.out.println(user0); // User{name='小明', age=20}
  • AtomicLongFieldUpdater:原子更新长整型的字段的更新器
  • AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于原子的更新数据和数据的版本号,可以解决使用CAS进行原子更新时可能出现的ABA问题。

ABA问题及AtomicStampedReference解决方案

ABA问题

public class Main {
    static AtomicReference<String> string = new AtomicReference<>("A");

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + ": A -> B :" + string.compareAndSet("A", "B"));
        });
        t1.start();
        t1.join();

        Thread t2 = new Thread(() -> {
            System.out.println(Thread.currentThread().getName() + ": B -> A :" + string.compareAndSet("B", "A"));
        });
        t2.start();
        t2.join();
        System.out.println(Thread.currentThread().getName() + ": A -> C :" + string.compareAndSet("A", "C"));
    }
}

AtomicStampedReference解决ABA问题

public class Main {
    static AtomicStampedReference<String> string = new AtomicStampedReference<>("A", 0);

    public static void main(String[] args) throws InterruptedException {
        int s = string.getStamp();
        Thread t1 = new Thread(() -> {
            int stamp = string.getStamp();
            System.out.println(Thread.currentThread().getName() + ": A -> B :" + string.compareAndSet("A", "B", stamp, stamp + 1) + " -- stamp: " + stamp + " -> " + (stamp + 1));
        });
        t1.start();
        t1.join();

        Thread t2 = new Thread(() -> {
            int stamp = string.getStamp();
            System.out.println(Thread.currentThread().getName() + ": B -> A :" + string.compareAndSet("B", "A", stamp, stamp + 1) + " -- stamp: " + stamp + " -> " + (stamp + 1));
        });
        t2.start();
        t2.join();
        System.out.println(Thread.currentThread().getName() + ": A -> C :" + string.compareAndSet("A", "C", s, s + 1) + " -- stamp: " + s + " -> " + (s + 1) + " x ");
    }
}

LongAdder

LongAdder继承自Striped64

LongAddrAtomic都能保证long类型数据的原子性,但是LongAddr的性能要优于AtomicLong

public class Main {
    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            test(
                    AtomicLong::new,
                    AtomicLong::getAndIncrement,
                    "AtomicLong"
            );
        }

        for (int i = 0; i < 10; i++) {
            test(
                    LongAdder::new,
                    LongAdder::increment,
                    "LongAdder"
            );
        }
    }

    public static <T> void test(Supplier<T> supplier, Consumer<T> consumer, String name) {
        T adder = supplier.get();
        List<Thread> list = new ArrayList<>();

        for (int i = 0; i < 5; i++) {
            list.add(new Thread(() -> {
                for (int j = 0; j < 500000; j++) consumer.accept(adder);
            }));
        }

        long start = System.nanoTime();

        list.forEach(Thread::start);
        list.forEach((t) -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        long end = System.nanoTime();
        System.out.println(name + ":" + adder + ":用时 " + (end - start) / 1000000);
    }
}

LongAdderAtomicLong性能高的原因

AtomicLong使用CAS,在并发量很大时,会有大量线程在CAS自旋等待,因此耗时;而LongAdder采用的累加单元(Cell)的方式:在竞争时,设置多个累加单元,Thread-0累加Cell[0],Thread-1累加Cell[1]…(类似于分段锁的原理)最后将结果汇总。这样他们在累加时操作的是不同的Cell变量,减少了CAS的重试次数(DoubleAdder也是一样的道理)。

LongAdder的累加单元数量不超过CPU核数,因此CPU核数越多的电脑性能提升越明显。

synchronized和原子类的效率

synchronized的效率并非永远都要低于原子类,但是他们的性能都要远远低于Adder:

public class Main {
    Long count0 = 0L;
    AtomicLong count1 = new AtomicLong(0);
    LongAdder count2 = new LongAdder();

    synchronized void synchronizedInc() {
        for (int i = 0; i < 10000; i++) {
            count0++;
        }
    }

    void atomicInc() {
        for (int i = 0; i < 10000; i++) {
            count1.incrementAndGet();
        }
    }

    void adderInc() {
        for (int i = 0; i < 10000; i++) {
            count2.add(1);
        }
    }

    public static void main(String[] args) {
        long start0 = System.currentTimeMillis();
        Main t0 = new Main();

        List<Thread> threads0 = new ArrayList<>();
        for (int i = 0; i < 10; i++) threads0.add(new Thread(t0::synchronizedInc));
        threads0.forEach(Thread::start);
        threads0.forEach((o) -> {
            try {
                o.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(t0.count0 + "-->synchronized耗时-->" + (System.currentTimeMillis() - start0));
        System.out.println("-----------------------------");
        long start1 = System.currentTimeMillis();
        Main t1 = new Main();
        List<Thread> threads1 = new ArrayList<>();
        for (int i = 0; i < 10; i++) threads1.add(new Thread(t1::atomicInc));
        threads1.forEach(Thread::start);
        threads1.forEach((o) -> {
            try {
                o.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(t1.count1 + "-->AtomicLong耗时-->" + (System.currentTimeMillis() - start1));
        System.out.println("-----------------------------");
        long start2 = System.currentTimeMillis();
        Main t2 = new Main();
        List<Thread> threads2 = new ArrayList<>();
        for (int i = 0; i < 10; i++) threads2.add(new Thread(t2::adderInc));
        threads2.forEach(Thread::start);
        threads2.forEach((o) -> {
            try {
                o.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(t1.count1 + "-->LongAdder耗时-->" + (System.currentTimeMillis() - start2));
    }
}

Adder的效率肯定是最好的,那我们就看看synchronized和原子类:

当自增次数较少时,原子类的效率是优于synchronized的:

当自增次数达到一定次数时,synchronized和原子类的性能就相近了:

超过这个阈值后,synchronized的性能就会优于原子类,且值越大优势越明显:

因为并发量大时,原子类CAS出错概率会增大导致不断自旋等待,这是非常消耗时间的;而synchronized虽然导致线程阻塞等待,但是由于自增操作非常快,耗时很短,所以每个线程都不会阻塞过长的时间,此时synchronized的性能就会优于原子类。

相关文章