并发(四)

四、Atomic

image.png
  • AtomicInteger
    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    public static AtomicInteger count = new AtomicInteger(0);

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count.get());
    }

    private static void add() {
        count.incrementAndGet();
        // count.getAndIncrement();
    }

AtomicInteger的 incrementAndGet()里用了unsafe的类的getAndAddInt(),getAndAddInt()底层实现于CAS(CompareAndSwap):拿当前对象保存的值和底层的值进行对比,一样才进行对应操作,如果不一样,它会不停的取底层的值,直到一样,才进行对应的操作



image.png
  • LongAdder
   // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    public static LongAdder count = new LongAdder();

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private static void add() {
        count.increment();
    }

为什么有了AtomicLong还要在jdk8新增LongAddr

  • AtomicLong是CAS原理,是在死循环下不断尝试修改目标值,直到修改成功,在竞争不激烈的情况下,它修改成功的概率很高,反之,修改失败概率就会很高。在大量修改失败的时候,这些原子操作,就会多次的循环尝试,因此性能会收到一些影响。
  • LongAdder类与AtomicLong类的区别在于高并发时前者将对单一变量的CAS操作分散为对数组cells中多个元素的CAS操作,取值时进行求和;而在并发较低时仅对base变量进行CAS操作,与AtomicLong类原理相同

并发条件下 优先选用LongAdder

  • AtomicReference
    private static AtomicReference<Integer> count = new AtomicReference<>(0);

    public static void main(String[] args) {
        count.compareAndSet(0, 2); // 2
        count.compareAndSet(0, 1); // no
        count.compareAndSet(1, 3); // no
        count.compareAndSet(2, 4); // 4
        count.compareAndSet(3, 5); // no
        log.info("count:{}", count.get());
    }
  • AtomicIntegerFieldUpdater
public class AtomicExample5 {

    private static AtomicIntegerFieldUpdater<AtomicExample5> updater =
            AtomicIntegerFieldUpdater.newUpdater(AtomicExample5.class, "count");

    @Getter
    public volatile int count = 100;  //并发更新字段必须用volatile修饰

    public static void main(String[] args) {

        AtomicExample5 example5 = new AtomicExample5();

        if (updater.compareAndSet(example5, 100, 120)) {
            log.info("update success 1, {}", example5.getCount());
        }

        if (updater.compareAndSet(example5, 100, 120)) {
            log.info("update success 2, {}", example5.getCount());
        } else {
            log.info("update failed, {}", example5.getCount());
        }
    }
}
image.png
  • AtomicBoolean
    private static AtomicBoolean isHappened = new AtomicBoolean(false);

    // 请求总数
    public static int clientTotal = 5000;

    // 同时并发执行的线程数
    public static int threadTotal = 200;

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    test();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("isHappened:{}", isHappened.get());
    }

    private static void test() {
        if (isHappened.compareAndSet(false, true)) {
            log.info("execute");   //之会执行一次
        }
    }
image.png

三、锁

能保证同一时间,只有一个线程对共享变量操作的,除了Atomic包还有锁
java提供的锁,主要分两种:

  • synchorniezdjvm层面提供的锁,在作用对象的作用范围内,同一时刻只有一个线程操作
  • Lockjdk提供的代码层面的锁。jdk提供一个Lock的接口类,主要是依赖特殊的cpu指令。实现类里面比较有代表性的就是 ReentrantLock

1、synchorniezd


public class SynchronizedExample1 {

    // 修饰一个代码块
    public void test1(int j) {
        synchronized (this) {
            for (int i = 0; i < 10; i++) {
                log.info("test1 {} - {}", j, i);
            }
        }
    }

    // 修饰一个方法
    public synchronized void test2(int j) {
        for (int i = 0; i < 10; i++) {
            log.info("test2 {} - {}", j, i);
        }
    }

    public static void main(String[] args) {
        SynchronizedExample1 example1 = new SynchronizedExample1();
        SynchronizedExample1 example2 = new SynchronizedExample1();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> {
            example1.test2(1);
        });
        executorService.execute(() -> {
            example1.test2(2);
        });
    }
}
输出结果:
test2 1 - 0
test2 1 - 1
test2 1 - 2
test2 1 - 3
test2 1 - 4
test2 1 - 5
test2 1 - 6
test2 1 - 7
test2 1 - 8
test2 1 - 9
test2 2 - 0
test2 2 - 1
test2 2 - 2
test2 2 - 3
test2 2 - 4
test2 2 - 5
test2 2 - 6
test2 2 - 7
test2 2 - 8
test2 2 - 9
public class SynchronizedExample1 {

    // 修饰一个代码块
    public void test1(int j) {
        synchronized (this) {
            for (int i = 0; i < 10; i++) {
                log.info("test1 {} - {}", j, i);
            }
        }
    }

    // 修饰一个方法
    public synchronized void test2(int j) {
        for (int i = 0; i < 10; i++) {
            log.info("test2 {} - {}", j, i);
        }
    }

    public static void main(String[] args) {
        SynchronizedExample1 example1 = new SynchronizedExample1();
        SynchronizedExample1 example2 = new SynchronizedExample1();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> {
            example1.test1(1);
        });
        executorService.execute(() -> {
            example2.test1(2);
        });
    }
}

执行结果:
test1 1 - 0
test1 1 - 1
test1 1 - 2
test1 2 - 0
test1 2 - 1
test1 1 - 3
test1 2 - 2
test1 2 - 3
test1 1 - 4
test1 1 - 5
test1 1 - 6
test1 2 - 4
test1 1 - 7
test1 2 - 5
test1 1 - 8
test1 2 - 6
test1 1 - 9
test1 2 - 7
test1 2 - 8
test1 2 - 9

用两个不同的对象调用synchronized修饰的方法,互不影响,交叉输出,而不是example1.test2(1)执行完又example2.test2(2)执行

public class SynchronizedExample2 {

    // 修饰一个类
    public static void test1(int j) {
        synchronized (SynchronizedExample2.class) {
            for (int i = 0; i < 10; i++) {
                log.info("test1 {} - {}", j, i);
            }
        }
    }

    // 修饰一个静态方法
    public static synchronized void test2(int j) {
        for (int i = 0; i < 10; i++) {
            log.info("test2 {} - {}", j, i);
        }
    }

    public static void main(String[] args) {
        SynchronizedExample2 example1 = new SynchronizedExample2();
        SynchronizedExample2 example2 = new SynchronizedExample2();
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> {
            example1.test1(1);
        });
        executorService.execute(() -> {
            example2.test1(2);
        });
    }
}

输出结果:
test1 1 - 0
test1 1 - 1
test1 1 - 2
test1 1 - 3
test1 1 - 4
test1 1 - 5
test1 1 - 6
test1 1 - 7
test1 1 - 8
test1 1 - 9
test1 2 - 0
test1 2 - 1
test1 2 - 2
test1 2 - 3
test1 2 - 4
test1 2 - 5
test1 2 - 6
test1 2 - 7
test1 2 - 8
test1 2 - 9
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容