JUC并发编程-5.AbstractQueuedSynchronizer - AQS

1.AQS的底层数据结构

AQS.jpg
  • 使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架
  • 利用了一个int类型表示状态
  • 使用方法是继承
  • 子类通过继承并通过实现他的方法管理其状态{ acquire 和 release }的方法操纵状态
  • 可以同时实现排他锁和共享锁模式(独占、共享)

2.AQS - CountDownLatch

闭锁


CountDownLatch.png
  • 源码
private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        // Sync 重写 AbstractQueuedSynchronizer的tryAcquireShared
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        // Sync 重写 AbstractQueuedSynchronizer的tryReleaseShared
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

public void countDown() {
        sync.releaseShared(1);
    }

private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
package com.ctgu.juc_project.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class CountDownLatchExample1 {

    private final static int threadCount = 200;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    test(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();
        log.info("finish");
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        Thread.sleep(100);
        log.info("{}", threadNum);
        Thread.sleep(100);
    }
}
package com.ctgu.juc_project.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@Slf4j
public class CountDownLatchExample2 {

    private final static int threadCount = 200;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    test(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await(10, TimeUnit.MILLISECONDS);
        log.info("finish");
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        Thread.sleep(10);
        log.info("{}", threadNum);
    }
}

3.AQS - Semaphore

package com.ctgu.juc_project.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

@Slf4j
public class SemaphoreExample1 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    semaphore.acquire(); // 获取一个许可
                    test(threadNum);
                    semaphore.release(); // 释放一个许可
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}
package com.ctgu.juc_project.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

@Slf4j
public class SemaphoreExample2 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(4);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    semaphore.acquire(1); // 获取多个许可
                    test(threadNum);
                    semaphore.release(1); // 释放多个许可
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}
package com.ctgu.juc_project.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

@Slf4j
public class SemaphoreExample3 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    if (semaphore.tryAcquire()) { // 尝试获取一个许可
                        test(threadNum);
                        semaphore.release(); // 释放一个许可
                    }
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}
package com.ctgu.juc_project.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

@Slf4j
public class SemaphoreExample4 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 尝试获取一个许可
                        test(threadNum);
                        semaphore.release(); // 释放一个许可
                    }
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}

4.AQS - CyclicBarrier

原理就像游戏匹配机制,要五个人开始游戏,得五个线程先同时就绪,再一起运行。

package com.ctgu.juc_project.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class CyclicBarrierExample1 {

    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        barrier.await();
        log.info("{} continue", threadNum);
    }
}
package com.ctgu.juc_project.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@Slf4j
public class CyclicBarrierExample2 {

    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        try {
            barrier.await(2000, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            log.warn("BarrierException", e);
        }
        log.info("{} continue", threadNum);
    }
}
package com.ctgu.juc_project.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class CyclicBarrierExample3 {

    private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
        log.info("callback is running");
    });

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        barrier.await();
        log.info("{} continue", threadNum);
    }
}

5.ReentrantLock与Condition

  • 传统的synchronized 锁
package com.ctgu.communication;

import java.util.ArrayList;
import java.util.List;

public class Main {
    public static void main(String[] args) {
//        Object o = new Object();
//        new ThreadForNum1(o).start();
//        new ThreadForNum2(o).start();

        List<Apple> basket = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            new Framer("farmer" + i, basket).start();
        }
        new Child("child", basket).start();
//        new Framer().start();
//        new Framer().start();
//        new Framer().start();
//        new Child().start();
    }
}
package com.ctgu.communication;

public class Apple {

    private int id;

    public Apple(int id) {
        this.id = id;
    }

    @Override
    public String toString() {
        return "Apple{" +
                "id=" + id +
                '}';
    }
}
package com.ctgu.communication;

import java.util.List;
import java.util.concurrent.TimeUnit;

public class Child extends Thread {

    private List<Apple> basket;

    public Child(String name, List<Apple> basket) {
        super(name);
        this.basket = basket;
    }

    @Override
    public void run() {
        while(true){
            synchronized (basket){
                while (basket.size() == 0) {
                    try {
                        basket.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                Apple apple = basket.remove(0);
                System.out.println(Thread.currentThread().getName()
                        + ",拿走了一个苹果:" + apple
                        + ",目前框里有:" + basket.size());
                basket.notify();
            }
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
package com.ctgu.communication;

import java.util.List;
import java.util.concurrent.TimeUnit;

public class Framer extends Thread {

    private List<Apple> basket;

    public Framer(String name, List<Apple> basket) {
        super(name);
        this.basket = basket;
    }

    @Override
    public void run() {
        int id = 0;
        while (true) {
            synchronized (basket) {
                while (basket.size() == 10) {
                    try {
                        basket.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                Apple apple = new Apple(id++);
                basket.add(apple);
                System.out.println(Thread.currentThread().getName()
                        + ",放入了一个苹果:" + apple
                        + ",目前框里有:" + basket.size());
                basket.notify();
            }
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
  • java代码层面的AQS状态锁
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Main {

    public static void main(String[] args) {
        PlateLock plateLock = new PlateLock(10);
        Thread consumer1 = new Thread(new ConsumerLock(plateLock), "ConsumerThread-1");
        Thread consumer2 = new Thread(new ConsumerLock(plateLock), "ConsumerThread-2");
        Thread producer1 = new Thread(new ProducerLock(plateLock), "ProducerThread-1");
        Thread producer2 = new Thread(new ProducerLock(plateLock), "ProducerThread-2");
        consumer1.start();
//        consumer2.start();
//        producer1.start();
        producer2.start();
    }

}

class PlateLock {

    private LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
    private int capacity;
    private Lock lock = new ReentrantLock();
    private Condition empty = lock.newCondition();
    private Condition Full = lock.newCondition();

    public PlateLock(int capacity) {
        if (capacity <= 0) {
            throw new IllegalArgumentException("容量不能小于0");
        }
        this.capacity = capacity;
    }

    public void getEgg() {
        lock.lock();
        try {
            while (queue.size() == 0) {
                empty.await();
                System.out.println("消费光了,等待生产");
            }
            int egg = queue.take();
            System.out.println(Thread.currentThread().getName()+ ":消费者取得蛋:" + egg);
            Full.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void addEgg() {

        lock.lock();
        try {
            while (queue.size() >= capacity) {
                Full.await();
                System.out.println("仓库已满,等待消费");
            }
            System.out.println(Thread.currentThread().getName()+ ":生产者生产蛋:" + queue.size());
            queue.put(queue.size());
            empty.signalAll();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

}

class ConsumerLock implements Runnable {

    private PlateLock plateLock;

    public ConsumerLock(PlateLock plateLock) {
        this.plateLock = plateLock;
    }

    @Override
    public void run() {
        while (true) {
            plateLock.getEgg();
        }
    }
}

class ProducerLock implements Runnable {
    private PlateLock plateLock;

    public ProducerLock(PlateLock plateLock) {
        this.plateLock = plateLock;
    }

    @Override
    public void run() {
        while (true) {
            plateLock.addEgg();
        }
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容