1.AQS的底层数据结构
AQS.jpg
- 使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架
- 利用了一个int类型表示状态
- 使用方法是继承
- 子类通过继承并通过实现他的方法管理其状态{ acquire 和 release }的方法操纵状态
- 可以同时实现排他锁和共享锁模式(独占、共享)
2.AQS - CountDownLatch
闭锁
CountDownLatch.png
- 源码
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
// Sync 重写 AbstractQueuedSynchronizer的tryAcquireShared
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// Sync 重写 AbstractQueuedSynchronizer的tryReleaseShared
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
public void countDown() {
sync.releaseShared(1);
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
package com.ctgu.juc_project.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class CountDownLatchExample1 {
private final static int threadCount = 200;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (Exception e) {
log.error("exception", e);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
Thread.sleep(100);
log.info("{}", threadNum);
Thread.sleep(100);
}
}
package com.ctgu.juc_project.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CountDownLatchExample2 {
private final static int threadCount = 200;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (Exception e) {
log.error("exception", e);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await(10, TimeUnit.MILLISECONDS);
log.info("finish");
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
Thread.sleep(10);
log.info("{}", threadNum);
}
}
3.AQS - Semaphore
package com.ctgu.juc_project.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
public class SemaphoreExample1 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
semaphore.acquire(); // 获取一个许可
test(threadNum);
semaphore.release(); // 释放一个许可
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
package com.ctgu.juc_project.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
public class SemaphoreExample2 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(4);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
semaphore.acquire(1); // 获取多个许可
test(threadNum);
semaphore.release(1); // 释放多个许可
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
package com.ctgu.juc_project.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@Slf4j
public class SemaphoreExample3 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
if (semaphore.tryAcquire()) { // 尝试获取一个许可
test(threadNum);
semaphore.release(); // 释放一个许可
}
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
package com.ctgu.juc_project.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@Slf4j
public class SemaphoreExample4 {
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 尝试获取一个许可
test(threadNum);
semaphore.release(); // 释放一个许可
}
} catch (Exception e) {
log.error("exception", e);
}
});
}
exec.shutdown();
}
private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}
}
4.AQS - CyclicBarrier
原理就像游戏匹配机制,要五个人开始游戏,得五个线程先同时就绪,再一起运行。
package com.ctgu.juc_project.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class CyclicBarrierExample1 {
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}
package com.ctgu.juc_project.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
public class CyclicBarrierExample2 {
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
try {
barrier.await(2000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.warn("BarrierException", e);
}
log.info("{} continue", threadNum);
}
}
package com.ctgu.juc_project.example.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class CyclicBarrierExample3 {
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
log.info("callback is running");
});
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}
5.ReentrantLock与Condition
- 传统的synchronized 锁
package com.ctgu.communication;
import java.util.ArrayList;
import java.util.List;
public class Main {
public static void main(String[] args) {
// Object o = new Object();
// new ThreadForNum1(o).start();
// new ThreadForNum2(o).start();
List<Apple> basket = new ArrayList<>();
for (int i = 0; i < 3; i++) {
new Framer("farmer" + i, basket).start();
}
new Child("child", basket).start();
// new Framer().start();
// new Framer().start();
// new Framer().start();
// new Child().start();
}
}
package com.ctgu.communication;
public class Apple {
private int id;
public Apple(int id) {
this.id = id;
}
@Override
public String toString() {
return "Apple{" +
"id=" + id +
'}';
}
}
package com.ctgu.communication;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class Child extends Thread {
private List<Apple> basket;
public Child(String name, List<Apple> basket) {
super(name);
this.basket = basket;
}
@Override
public void run() {
while(true){
synchronized (basket){
while (basket.size() == 0) {
try {
basket.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Apple apple = basket.remove(0);
System.out.println(Thread.currentThread().getName()
+ ",拿走了一个苹果:" + apple
+ ",目前框里有:" + basket.size());
basket.notify();
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package com.ctgu.communication;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class Framer extends Thread {
private List<Apple> basket;
public Framer(String name, List<Apple> basket) {
super(name);
this.basket = basket;
}
@Override
public void run() {
int id = 0;
while (true) {
synchronized (basket) {
while (basket.size() == 10) {
try {
basket.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Apple apple = new Apple(id++);
basket.add(apple);
System.out.println(Thread.currentThread().getName()
+ ",放入了一个苹果:" + apple
+ ",目前框里有:" + basket.size());
basket.notify();
}
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
- java代码层面的AQS状态锁
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Main {
public static void main(String[] args) {
PlateLock plateLock = new PlateLock(10);
Thread consumer1 = new Thread(new ConsumerLock(plateLock), "ConsumerThread-1");
Thread consumer2 = new Thread(new ConsumerLock(plateLock), "ConsumerThread-2");
Thread producer1 = new Thread(new ProducerLock(plateLock), "ProducerThread-1");
Thread producer2 = new Thread(new ProducerLock(plateLock), "ProducerThread-2");
consumer1.start();
// consumer2.start();
// producer1.start();
producer2.start();
}
}
class PlateLock {
private LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
private int capacity;
private Lock lock = new ReentrantLock();
private Condition empty = lock.newCondition();
private Condition Full = lock.newCondition();
public PlateLock(int capacity) {
if (capacity <= 0) {
throw new IllegalArgumentException("容量不能小于0");
}
this.capacity = capacity;
}
public void getEgg() {
lock.lock();
try {
while (queue.size() == 0) {
empty.await();
System.out.println("消费光了,等待生产");
}
int egg = queue.take();
System.out.println(Thread.currentThread().getName()+ ":消费者取得蛋:" + egg);
Full.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void addEgg() {
lock.lock();
try {
while (queue.size() >= capacity) {
Full.await();
System.out.println("仓库已满,等待消费");
}
System.out.println(Thread.currentThread().getName()+ ":生产者生产蛋:" + queue.size());
queue.put(queue.size());
empty.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
class ConsumerLock implements Runnable {
private PlateLock plateLock;
public ConsumerLock(PlateLock plateLock) {
this.plateLock = plateLock;
}
@Override
public void run() {
while (true) {
plateLock.getEgg();
}
}
}
class ProducerLock implements Runnable {
private PlateLock plateLock;
public ProducerLock(PlateLock plateLock) {
this.plateLock = plateLock;
}
@Override
public void run() {
while (true) {
plateLock.addEgg();
}
}
}