JUC基础笔记
juc,即java.util.concurrent
包的缩写,掌握了juc,就是拿到了Java并发编程的钥匙。
在《Java并发编程实战》等书中,已经详细介绍juc用法,如果你懒得看书,或者是忘了juc的用法,想快速回忆一下,可以看我这篇教程。
本教程很长,有很多的代码示例供食用~
基础
volatile关键字
volatile
关键字不属于juc的内容,但是为了铺垫后面的内容,这里先介绍下。
当多个线程之间共享一个数据时,该数据对彼此之间是不可见的。即使是同一个数据,每个线程还是会将其保存在自己独立的内存下面。
下面的代码显示了这一特性:
class Worker implements Runnable {
public boolean flag = false;
@Override
public void run() {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.flag = true;
}
}
public class VolatileDemo {
public static void main(String[] args) {
Worker worker = new Worker();
new Thread(worker).start();
while (!worker.flag) ;
System.out.println("程序结束");
}
}
按理说,worker
线程将flag
改为true
,主线程在flag
变为true
之后会及时跳出循环,程序退出。
但是,实际运行下来程序并没有退出,这是因为worker
线程的flag
和主线程的flag
是不共享的,worker
对flag
的修改并不会影响到主线程。
要想改变这一点,需要将flag
声明为volatile
的。这个关键词的作用是,当变量被某个线程改变时,会及时刷新到主存中,读取时也会从主存中读取。可以保证变量是线程之间可见的。
要想让上面的程序及时退出,将上面的flag
声明改为:
public volatile boolean flag = false;
这样worker
对flag的改变对于主线程就是可见的了,程序可以及时退出了。
原子性
如果一个变量需要被多个线程同时访问,对其进行操作就要格外当心。除了可见性问题,可以使用volatile
修正,还有原子性问题。
如果一个变量的操作需要多步完成,操作可以细分,则该操作就不具备原子性,例如i++
操作就不具备原子性。在并发操作时,就可能因为线程执行非原子操作导致数据读写不一致的情况。
我们可以通过给操作加上synchronized
关键字,让操作只能允许一个线程进行,来实现操作的原子性。
另一种实现原子性的方法是使用CAS操作
(Compare And Swap)。CAS操作由CPU直接提供,CAS需要下面三个操作数:
- valueOffset:变量在内存中的位置
- expect:变量的预估值
- update:变量的更新值
CAS的操作过程:
- 从valueOffset取出value,若等于expect,则将valueOffset的值设为update
- 否则不进行任何操作
那么想要将i++
变为原子的,只需要将valueOffset
设为i
,expect
设为读取到的i
的值,update
设为i+1
。这样,只有当数据一致时,才会执行i+1
操作。
在java.util.concurrent.atomi
下,提供了很多原子变量,这些变量都具备:
- 使用
volatile
确保变量可见性 - 使用CAS操作确保操作是原子的
例如,下面的代码:
package cn.offer.juc;
import java.util.concurrent.atomic.AtomicInteger;
class AtomWorker implements Runnable {
private AtomicInteger i = new AtomicInteger();
@Override
public void run() {
while (true) {
System.out.println(i.addAndGet(1));
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class Atomicity {
public static void main(String[] args) {
AtomWorker worker = new AtomWorker();
for (int i = 0; i < 10; ++i) {
new Thread(worker).start();
}
}
}
就可以保证各个线程不会读到重复的i
。
ThreadPool
线程池的概念不再介绍,这里只介绍juc提供的线程池操作。
要想说线程池,就不得不说一下juc的Executor
执行框架,在这个框架下,所有的并发执行单位都以“任务”的形式存在,将任务提交给ExecutorService
,即可实现任务的并发调度执行。
ExecutorService
可以有很多种,它负责接收任务,执行任务,使用Executors
可以创建各种ExecutorService
,有下面几种常用的:
- newSingleThreadExecutor:单一线程,任务会顺序执行
- newCachedThreadPool:大小不受限制的线程池
- newFixedThreadPool:大小固定的线程池,当线程不够时,任务需要等待
- newScheduledThreadPool:大小固定线程池,支持定时及周期性任务执行
ExecutorService
提供了下面的将Runnable
任务提交执行的方法:
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
关于Future
的使用,见Callable和Future,这里不关心。
下面的代码演示了将线程提交给线程池执行:
package cn.offer.juc;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class Task implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getId() + "执行");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getId() + "执行完毕");
}
}
public class ThreadPoolDemo {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 5; ++i) {
service.submit(new Task());
}
}
}
Callable和Future
使用传统的Runnable
,可以启动一个线程并发执行,但是run
方法是没有返回值的,如果我们想要线程能够返回一个值,就可以使用Callable
+Future
。
我们想要一个线程能返回值,这时候让其实现java.util.concurrent.Callable
接口,在泛型中指定返回类型,例如,我们让一个worker返回字符串:
class CallableWorker implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(2000);
return "运行完毕";
}
}
这个call
方法和传统的run
方法相比,有两个不同:
- 方法有返回值
- 方法可以抛出异常
那么,如何执行呢?一般使用ExecutorService
来执行,该接口中有如下这个方法:
<T> Future<T> submit(Callable<T> task);
Future
用于查询执行的Callable
(或Runnable
)的执行结果、是否完成等信息。该接口的定义如下:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
这些函数的解释如下:
- cancel:尝试取消任务的执行。
- mayInterruptIfRunning:是否允许取消已经启动但是没有执行的任务。
- 返回:有如下几种情况:
- 任务已经完成,返回false
- 任务还没有启动,取消任务,返回取消结果。
- 任务已经启动,但是
mayInterruptIfRunning
为false
,返回false - 任务已经启动,且
mayInterruptIfRunning
为true
,取消任务,返回取消结果。
- isCancelled:返回任务是否在其正常结束之前被取消。
- isDone:任务是否结束。不论是任务正常结束、抛出异常、被cancel,该函数都会返回true。
- get():阻塞直到任务结束,随后获取其返回值。
- get(timeout, unit):在指定的
timeout
时间内等待任务结束并获取结果,如果超过这个时间没有结束,抛出TimeoutException
异常。
另外说明一下get
方法可能抛出的其它异常:
- CancellationException:在等待途中任务被cancel
- ExecutionException:任务抛出了异常
- InterruptedException:阻塞过程中被打断
通过Future
,我们就可以获取任务的返回值了:
public class CallableDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CallableWorker worker = new CallableWorker();
// 单一线程执行器
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(worker);
// 获取线程的执行结果
System.out.println("执行结果:" + future.get());
}
}
你也可以用Future
做很多其它事情,就看你自己发挥了。
锁
Lock
Lock
接口的定义如下:
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
下面简单介绍一下这些方法:
- lock:获取锁。如果锁已经被占用,会阻塞。
- lockInterruptibly:可中断地等待锁,如果等待被中断,抛出
InterruptedException
。 - tryLock:尝试获取锁,如果获取失败,返回false。
- tryLock(time, unit):在timeout时间内尝试获取锁,如果在这段时间内获取到锁,返回true,如果没有,返回false。
- unlock:释放锁
- newCondition:返回一个绑定到该锁的
Condition
示例,关于Condition,见Condition一节。
我们一般会使用到下面这个Lock
的实现类:
- ReentrantLock:可重入锁,也叫递归锁。指的是,当一个线程获取锁之后,再次获取时,不需要重复等待,可以直接获取锁。
- 构造时将
fair
设为true
,表示公平锁,公平锁指的是严格按照先来先得的顺序排队等待去获取锁。 - 构造时将
fair
设为false
,表示非公平锁,非公平锁每次获取锁时,是先直接尝试获取锁,获取不到,再按照先来先得的顺序排队等待。 - 默认是非公平锁。
- 构造时将
锁的操作不难,下面我们重点介绍下读写锁。
ReadWriteLock
读写锁指的是没有线程进行写操作时,多个线程可同时进行读操作,当有线程进行写操作时,其它读写操作只能等待。
即,对于读写锁来说,“读-读能共存,读-写不能共存,写-写不能共存”。
ReadWriteLock
接口定义如下:
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
其中,readLock
用于获取读锁,writeLock
用于获取写锁。
我们一般使用实现类ReentrantReadWriteLock
,即可重入的读写锁。
下面我们来看一个具体的例子:
package cn.offer.juc;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class PublicValue {
public volatile static int value = 0;
public static ReadWriteLock lock = new ReentrantReadWriteLock();
}
class LockWorker implements Runnable {
// 该worker是读者还是写者
private boolean isReader;
public LockWorker(boolean isReader) {
this.isReader = isReader;
}
@Override
public void run() {
if (this.isReader) {
// 读者
while (true) {
PublicValue.lock.readLock().lock();
System.out.println(Thread.currentThread().getId() +
"读取到:" + PublicValue.value);
PublicValue.lock.readLock().unlock();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} else {
// 写者
while (true) {
PublicValue.lock.writeLock().lock();
System.out.println("写者修改value:" + PublicValue.value++);
PublicValue.lock.writeLock().unlock();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
public class LockDemo {
public static void main(String[] args) {
LockWorker writer = new LockWorker(false);
LockWorker reader = new LockWorker(true);
// 5个读者和写者
for (int i = 0; i < 5; ++i) {
new Thread(reader).start();
new Thread(writer).start();
}
}
}
读的频率比写的要高,使用读写锁,可以加快程序执行的效率。
Condition
在JDK1.5之前,线程的等待唤醒是通过wait
、notify
、notifyAll
实现的。juc提供了Condition
,可以更加方便地实现线程的等待唤醒。
Condition
是由Lock
创建的,它的await
、signal
、signalAll
分别对应上面的三个方法。
使用Condition
的好处是,使用Lock
可以创建不同的Condition
,我们可以把这些Condition
分配给不同的线程,从而实现唤醒指定不同类别的线程。
等待唤醒的一个经典案例是生产者-消费者模型,下面是这个模型的Condition
实现:
package cn.offer.juc;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class Shop {
// 库存剩余
private int production = 0;
// 总库存
private static final int maxProduction = 5;
// 锁对象
private Lock lock = new ReentrantLock();
// 生产者条件,用于等待和唤醒生产者
private Condition productionCondition = lock.newCondition();
// 消费者条件,用于等待和唤醒消费者
private Condition consumeCondition = lock.newCondition();
// 生产一个数据
public void produce() {
this.lock.lock();
try {
// 库存满了,生产者等待
while (this.production > maxProduction) {
this.productionCondition.await();
}
// 生产数据
System.out.println(Thread.currentThread().getId()
+ "生产数据:" + (++this.production));
// 进货了,可以唤醒其它消费者
this.consumeCondition.signalAll();
} catch (InterruptedException ignore) {
// ignore
} finally {
this.lock.unlock();
}
}
// 消费者
public void consume() {
this.lock.lock();
try {
// 库存空了,消费者等待
while (this.production <= 0) {
this.consumeCondition.await();
}
// 消费数据
System.out.println(Thread.currentThread().getId()
+ "消费数据:" + (--this.production));
// 数据被消费了,唤醒其它生产者
this.productionCondition.signalAll();
} catch (InterruptedException e) {
// ignore
} finally {
this.lock.unlock();
}
}
}
class Consumer implements Runnable {
private Shop shop;
public Consumer(Shop shop) {
this.shop = shop;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// ignore
}
shop.consume();
}
}
}
class Producer implements Runnable {
private Shop shop;
public Producer(Shop shop) {
this.shop = shop;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// ignore
}
shop.produce();
}
}
}
public class ConditionDemo {
public static void main(String[] args) {
Shop shop = new Shop();
Consumer consumer = new Consumer(shop);
Producer producer = new Producer(shop);
for (int i = 0; i < 5; ++i) {
new Thread(producer).start();
new Thread(consumer).start();
}
}
}
这样就可以保证资源被安全地访问。
另一个Condition
的例子是,让线程之间交替运行,例如,创建3个线程ABC,让3个线程以"ABCABCABC..."的顺序交替执行。
这可以创建3个Condition
,ConditionA
执行完毕后唤醒ConditionB
,ConditionB
执行完毕唤醒ConditionC
,ConditionC
执行完毕唤醒ConditionA
,以此类推:
package cn.offer.juc;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class ExecOrder {
public static Lock lock = new ReentrantLock();
// 不同线程名称和其condition
public static Map<String, Condition> conditionMap;
// 当前正常执行的线程名称
public static String currentExec = "A";
static {
// 为A、B、C创建condition
conditionMap = new HashMap<>();
conditionMap.put("A", lock.newCondition());
conditionMap.put("B", lock.newCondition());
conditionMap.put("C", lock.newCondition());
}
}
class Exec implements Runnable {
// 当前线程名称
private String name;
// 下一个执行线程名称
private String next;
public Exec(String name, String next) {
this.name = name;
this.next = next;
}
public void exec() {
ExecOrder.lock.lock();
try {
if (!ExecOrder.currentExec.equals(this.name)) {
// 当前不是自己执行
ExecOrder.conditionMap.get(this.name).await();
}
// 当前是自己执行
System.out.println(this.name);
// 唤醒下一个
ExecOrder.conditionMap.get(this.next).signal();
ExecOrder.currentExec = this.next;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
ExecOrder.lock.unlock();
}
}
@Override
public void run() {
while (true) {
this.exec();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class ConditionDemo2 {
public static void main(String[] args) {
new Thread(new Exec("A", "B")).start();
new Thread(new Exec("B", "C")).start();
new Thread(new Exec("C", "A")).start();
}
}
线程同步
线程之间的工作很多时候需要进行协调,例如某个线程需要等待其余线程完成了才能继续工作。juc提供了很多这种勇于协调同步线程之间工作的工具。
CountDownLatch
也叫闭锁,这个工具很简单,就是用于等待一组事件结束再继续往后执行的。
CountDownLatch
内部有一个计数器,初始值由我们指定,每次调用countDown
方法,计数器会减一。当计数器减到0,await
调用处才会终止阻塞,继续往后执行。
例如,使用4个线程对一个公共变量分别加一,主线程在它们完成后输出这个公共变量:
package cn.offer.juc;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
class PublicIntValue {
public static AtomicInteger num = new AtomicInteger(0);
}
class AddValueWorker implements Runnable {
private CountDownLatch countDownLatch;
private int sleepSeconds;
public AddValueWorker(CountDownLatch count, int sleepSeconds) {
this.countDownLatch = count;
this.sleepSeconds = sleepSeconds;
}
@Override
public void run() {
try {
Thread.sleep(this.sleepSeconds * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getId() +
"执行,num=" + PublicIntValue.num.addAndGet(1));
this.countDownLatch.countDown();
}
}
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(5);
// 启动5个线程,分别需要执行1-5秒
for (int i = 0; i < 5; ++i) {
new Thread(new AddValueWorker(countDownLatch, i + 1)).start();
}
// 等待致5个线程完成
countDownLatch.await();
System.out.println("执行结束,结果=" + PublicIntValue.num.get());
}
}
如果某个地方需要某些任务全部完成才能继续执行,则可以使用CountDownLatch
。
CyclicBarrier
栅栏用于等待其它线程,是一个线程“同步”的装置。
它的作用是,到达栅栏处,线程会阻塞,等待其它线程,必须所有线程都到达栅栏处,线程才会继续执行。
栅栏内部也有一个计数器,当线程调用await
时,计数器减一,如果此时计数器不为0,会阻塞,直到计数器为0,才会继续执行。
例如,一个简单的“掷骰子”游戏,让5个线程分别生成1-6的随机数,所有线程生成完毕之后,再公开自己的数字(打印出来):
package cn.offer.juc;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
class Player implements Runnable {
private static Random rand = new Random();
private int sleepTime;
private CyclicBarrier barrier;
public Player(CyclicBarrier barrier, int sleepTime) {
this.barrier = barrier;
this.sleepTime = sleepTime;
}
@Override
public void run() {
try {
Thread.sleep(this.sleepTime * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 生成一个1-6的随机数
int point = randomInt(1, 6);
// 等待其它线程完成再公布自己的数
try {
this.barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getId() + "的点数:" + point);
}
public static int randomInt(int min, int max) {
return rand.nextInt((max - min) + 1) + min;
}
}
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(5);
// 5个玩家
for (int i = 0; i < 5; ++i) {
new Thread(new Player(barrier, i + 1)).start();
}
}
}
可见,栅栏可以实现让快的线程等待慢的线程。
Semaphore
信号量用于控制访问资源的线程个数。
使用Semaphore
时,需要调用acquire
来获取资源,如果资源以及被占用满了,将会阻塞直到其它线程释放资源。
在使用资源结束后,一定要调用release
来释放资源,将资源让给其它线程。
package cn.offer.juc;
import java.util.concurrent.Semaphore;
class SourceWorker implements Runnable {
private Semaphore semaphore;
public SourceWorker(Semaphore semaphore) {
this.semaphore = semaphore;
}
@Override
public void run() {
try {
// 尝试获取资源
semaphore.acquire();
// 使用资源
System.out.println(Thread.currentThread().getId() + "正在使用资源...");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放资源
semaphore.release();
System.out.println(Thread.currentThread().getId() + "使用完毕,释放资源");
}
}
}
public class SemaphoreDemo {
public static void main(String[] args) {
// 资源只供3个线程同时访问
Semaphore semaphore = new Semaphore(3);
// 创建5个线程去访问资源
for (int i = 0; i < 5; ++i) {
new Thread(new SourceWorker(semaphore)).start();
}
}
}
这样就只能同时有3个线程访问资源了。这可以控制资源被并发访问的个数,可以用于控制资源访问的压力。
Exchanger
交换器用于两个线程执行到某个时间点,进行数据交换的。在某个时间点调用Exchanger
的exchange
时,如果另一个线程没有执行到exchange
,则当前线程会进行阻塞;当另一个线程也执行到exchange
,会继续运行,并将对方的数据返回过来,自己的数据也会被返回给对方。
下面是一个简单的例子,两个线程分别计算2和3的平方值,计算完毕后将结果发送给对方,一方将两个结果相加,一方将两个结果相乘,随后各自打印出来:
package cn.offer.juc;
import java.util.concurrent.Exchanger;
class ExchangeWorker implements Runnable {
// 操作数
private int num;
// 操作类型,1表示计算两个数平方和,2表示两个数平方积
private int type;
// 交换器,用于交换双方的数据
private Exchanger<Integer> exchanger;
// 模拟操作时间
private int sleepTime;
public ExchangeWorker(int num, int type, int sleepTime,
Exchanger<Integer> exchanger) {
this.num = num;
this.type = type;
this.sleepTime = sleepTime;
this.exchanger = exchanger;
}
@Override
public void run() {
try {
Thread.sleep(this.sleepTime * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
int result = this.num * this.num;
System.out.println(this.num + "平方计算完毕:" + result);
try {
int other = this.exchanger.exchange(result);
if (this.type == 1) {
System.out.println("两个数的平方和:" + (result + other));
} else {
System.out.println("两个数的平方积:" + (result * other));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class ExchangeDemo {
public static void main(String[] args) {
Exchanger<Integer> exchanger = new Exchanger<>();
new Thread(new ExchangeWorker(
2, 1, 1, exchanger)).start();
new Thread(new ExchangeWorker(
3, 2, 3, exchanger)).start();
}
}
2的平方会被先计算完毕,随后会等待3的平方的计算。计算完毕后,双方会交换互相的结果,随后输出和跟积。
数据结构
juc也提供了一些并发安全的数据结构供我们使用。
阻塞队列
阻塞队列是由固定长度的,入队和出队操作是阻塞的:
- 入队列时,如果队列满了,将会阻塞,直到队列有位置。
- 出队列时,如果队列时空的,将会阻塞,直到队列有数据。
主要有下面这些阻塞队列供我们使用:
- ArrayBlockingQueue:基于数组的阻塞队列
- LinkedBlockingQueue:基于链表的阻塞队列
- PriorityBlockingQueue:支持优先级排序的阻塞队列
- DelayQueue:支持延时获取的无界阻塞队列
- SynchronousQueue:不储存元素的阻塞队列
- LinkedTransferQueue:基于链表的无边界阻塞队列
- LinkedBlockingDeque:基于链表的双向阻塞队列
阻塞队列有以下方法:
- put:阻塞地向队列插入数据
- take:阻塞地从队列获取数据
- drainTo(Collection<? super E> c):移除队列中所有元素,到
Collection
中 - int drainTo(Collection<? super E> c, int maxElements):移除最多maxElements个元素到
Collection
- add:不阻塞地插入数据,如果队列满,会返回false而不是阻塞
- poll:不阻塞地获取数据,如果没有数据,返回
null
- poll(timeout, unit):在指定时间内阻塞获取数据,如果这个时间内没有获取到,返回
null
- remainingCapacity:当前队列剩余的可用空间
- contain:队列是否包含某个数据
- remove:删除某个元素,返回删除是否成功
下面来依次介绍这些队列:
固定长度的阻塞队列
定长阻塞队列可以用于实现消费者-生产者模型。现在,生产者只需要不断向阻塞队列添加数据,消费者从中获取即可:
package cn.offer.juc.bqueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
class BlockQueueBasedShop {
// 长度为5的阻塞队列
private BlockingQueue<Integer> queue =
new LinkedBlockingQueue<>(5);
public void produce(int num) {
try {
this.queue.put(num);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public int consume() {
try {
return this.queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
return -1;
}
}
public class BaseBlockQueueDemo {
// 生产的递增的数字
private static AtomicInteger grow = new AtomicInteger(0);
public static void main(String[] args) {
BlockQueueBasedShop shop = new BlockQueueBasedShop();
// 5个消费者和5个生产者
for (int i = 0; i < 5; ++i) {
// 消费者
new Thread(() -> {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
int num = shop.consume();
if (num != -1) {
System.out.println("消费到了:" + num);
}
}
}).start();
// 生产者
new Thread(() -> {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
int num = grow.getAndAdd(1);
shop.produce(num);
System.out.println("生产了:" + num);
}
}).start();
}
}
}
这样的实现比使用等待唤醒会简单一些。
优先队列
我们再来关注一个队列:PriorityBlockingQueue
。这种队列支持优先获取某些元素。
要想优先获取元素,队列储存的对象必须实现Comparable
接口,例如,想依据商品的价格作为优先级,则需要下面的商品类:
class Product implements Comparable<Product> {
private String name;
private Integer price;
public Product(String name, Integer price) {
this.name = name;
this.price = price;
}
@Override
public int compareTo(Product o) {
return -this.price.compareTo(o.price);
}
@Override
public String toString() {
return this.name + ",价格:" + this.price;
}
}
这样就会按照商品价格的倒序来取元素了:
public class PriorityBlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Product> queue = new PriorityBlockingQueue<>();
queue.put(new Product("笔记本", 12));
queue.put(new Product("苹果", 20));
queue.put(new Product("铅笔", 6));
queue.put(new Product("mac电脑", 10000));
while (true) {
System.out.println(queue.take());
}
}
}
输出:
mac电脑,价格:10000
苹果,价格:20
笔记本,价格:12
铅笔,价格:6
延迟阻塞队列
延迟阻塞队列是基于优先队列的,它允许我们延时获取元素。
要使用延迟阻塞队列,类需要实现Delayed
接口,其中包含用于比较的compareTo
和用于获取延时时间的getDelay
。
compareTo
需要根据延时时间进行排序,这样可以优先获取低延时的元素:
class ProductDelay implements Delayed {
private String name;
private long time;
public ProductDelay(String name, long time) {
this.name = name;
this.time = time;
}
@Override
public String toString() {
return this.name;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.time -
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS),
o.getDelay(TimeUnit.MILLISECONDS));
}
}
这样使用DelayQueue
,就能按照顺序从队列里延时获取元素了:
public class DelayQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<ProductDelay> queue = new DelayQueue<>();
long now = System.currentTimeMillis();
queue.offer(new ProductDelay("1000", now + 1000));
queue.offer(new ProductDelay("200", now + 200));
queue.offer(new ProductDelay("300", now + 300));
queue.offer(new ProductDelay("500", now + 500));
queue.offer(new ProductDelay("2000", now + 2000));
while (true) {
System.out.println(queue.take());
}
}
}
输出:
200
300
500
1000
2000
同步队列
同步队列不储存元素,每个put
都需要等待一个take
。同步队列一般用于线程之间传递某个单一的信号。
因为不储存元素,同步队列比较特殊,有如下特征:
- 调用
peek
和iterator
永远返回null
-
isEmpty
永远返回true
-
remainingCapacity
永远返回0 -
remove
和removeAll
永远返回false
下面使用同步队列来实现两个线程交替执行:
package cn.offer.juc.bqueue;
import java.util.concurrent.SynchronousQueue;
class SynchronousWorker implements Runnable {
private SynchronousQueue<Boolean> synIn;
private SynchronousQueue<Boolean> synOut;
private String name;
public SynchronousWorker(
String name, SynchronousQueue<Boolean> synIn,
SynchronousQueue<Boolean> synOut) {
this.name = name;
this.synIn = synIn;
this.synOut = synOut;
}
@Override
public void run() {
while (true) {
try {
// 等待获取开始信号
this.synIn.take();
// 执行
System.out.println(name);
// 发送信号
this.synOut.put(true);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class SynchronousQueueDemo {
public static void main(String[] args) throws InterruptedException {
// 用于同步的两个信号量
SynchronousQueue<Boolean> synA = new SynchronousQueue<>();
SynchronousQueue<Boolean> synB = new SynchronousQueue<>();
new Thread(new SynchronousWorker("A", synA, synB)).start();
new Thread(new SynchronousWorker("B", synB, synA)).start();
// 需要给一个开始信号
synA.put(true);
}
}
通过两个信号,就可以实现两个线程的交替执行。
ConcurrentHashMap
众所周知,HashMap
是非线程安全的,HashTable
是线程安全的。但是HashTable
是直接使用synchronized
来将所有的方法进行互斥的,效率低下。
juc提供了一种更加高效的线程安全map,即ConcurrentHashMap
。它采用了采用分离锁技术,将hash表的数组部分分成若干段,每段维护一个锁,这些段可以并发的进行写操作。
分段锁使得ConcurrentHashMap
的吞吐量比一般的同步hash表高得多。在并发情况下如果要用到hash表,应该使用这个类,而不是HashTable
。
这个类的用法和HashMap
一样,这里不再演示。
CopyOnWriteArrayList/Set
采用写时复制的容器。所有的线程都共享同一个容器对象,当某个线程修改容器时,会Copy一个新的容器进行修改,所有线程后续将会访问新的容器。
这样对容器进行读的时候不需要加锁,可以高效并发完成,因为读的容器不可能添加新的内容。写的时候无法进行读。
这是以增加写的代价换取读高效率。如果容器的读次数大于写的次数,则可以考虑使用这样的容器。
这种容器的用法和普通的容器一样,不再演示。