在现代多核处理器时代,掌握并发编程是Java开发者必备的技能。Java Util Concurrent(JUC)包为我们提供了强大而高效的并发工具,本文将带你全面深入地掌握JUC的核心组件和使用技巧。
一、JUC包概述
1.1 什么是JUC?
JUC(java.util.concurrent)是Java 5.0引入的并发工具包,它提供了一系列比synchronized更灵活、性能更好的并发控制工具。
1.2 为什么要使用JUC?
- 更高的性能:相比传统的synchronized,JUC提供了更细粒度的锁控制
- 更好的可扩展性:支持更复杂的并发场景
- 更丰富的功能:提供了线程池、并发集合、同步器等高级功能
- 更好的可控制性:支持超时、中断等灵活控制
二、原子类(Atomic Classes)
2.1 原子类的作用
提供线程安全的、基于CAS(Compare-And-Swap)操作的原子更新。
import java.util.concurrent.atomic.*;
public class AtomicExample {
public static void main(String[] args) throws InterruptedException {
// 原子整数
AtomicInteger atomicInt = new AtomicInteger(0);
// 原子引用
AtomicReference<String> atomicRef = new AtomicReference<>("initial");
// 原子数组
AtomicIntegerArray atomicArray = new AtomicIntegerArray(10);
// 多个线程同时自增
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
atomicInt.incrementAndGet(); // 原子自增
atomicRef.compareAndSet("initial", "updated"); // CAS操作
}
});
threads[i].start();
}
// 等待所有线程完成
for (Thread thread : threads) {
thread.join();
}
System.out.println("最终结果: " + atomicInt.get()); // 保证是10000
System.out.println("引用结果: " + atomicRef.get()); // 保证是updated
}
}
2.2 常用原子类
-
AtomicInteger/AtomicLong:原子整数/长整数 -
AtomicBoolean:原子布尔值 -
AtomicReference<T>:原子引用 -
AtomicIntegerArray:原子整数数组 -
AtomicStampedReference:带版本号的原子引用,解决ABA问题
2.3 CAS原理
// CAS的伪代码实现
public class SimulatedCAS {
private int value;
public synchronized boolean compareAndSet(int expect, int update) {
if (this.value == expect) {
this.value = update;
return true;
}
return false;
}
}
三、锁机制(Locks)
3.1 ReentrantLock 可重入锁
import java.util.concurrent.locks.*;
public class ReentrantLockExample {
private final ReentrantLock lock = new ReentrantLock();
private int count = 0;
public void increment() {
lock.lock(); // 获取锁
try {
count++;
System.out.println(Thread.currentThread().getName() + " count: " + count);
} finally {
lock.unlock(); // 必须在finally中释放锁
}
}
// 尝试获取锁,带有超时时间
public boolean tryIncrement() {
try {
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
count++;
return true;
} finally {
lock.unlock();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return false;
}
public static void main(String[] args) {
ReentrantLockExample example = new ReentrantLockExample();
// 多个线程同时执行
for (int i = 0; i < 10; i++) {
new Thread(() -> {
for (int j = 0; j < 5; j++) {
example.increment();
}
}).start();
}
}
}
3.2 ReadWriteLock 读写锁
public class ReadWriteLockExample {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
private Map<String, String> data = new HashMap<>();
// 读操作 - 共享锁
public String get(String key) {
readLock.lock();
try {
System.out.println(Thread.currentThread().getName() + " 读取数据");
Thread.sleep(100); // 模拟读取耗时
return data.get(key);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} finally {
readLock.unlock();
}
}
// 写操作 - 排他锁
public void put(String key, String value) {
writeLock.lock();
try {
System.out.println(Thread.currentThread().getName() + " 写入数据");
Thread.sleep(200); // 模拟写入耗时
data.put(key, value);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
writeLock.unlock();
}
}
}
3.3 Condition 条件变量
public class ConditionExample {
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition(); // 条件:不满
private final Condition notEmpty = lock.newCondition(); // 条件:不空
private final Object[] items = new Object[10];
private int putPtr, takePtr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
notFull.await(); // 等待不满条件
}
items[putPtr] = x;
if (++putPtr == items.length) putPtr = 0;
++count;
notEmpty.signal(); // 唤醒等待不空条件的线程
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await(); // 等待不空条件
}
Object x = items[takePtr];
if (++takePtr == items.length) takePtr = 0;
--count;
notFull.signal(); // 唤醒等待不满条件的线程
return x;
} finally {
lock.unlock();
}
}
}
四、并发集合(Concurrent Collections)
4.1 ConcurrentHashMap
public class ConcurrentHashMapExample {
public static void main(String[] args) {
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 多个线程同时操作
ExecutorService executor = Executors.newFixedThreadPool(10);
// 写入操作
for (int i = 0; i < 100; i++) {
final int key = i;
executor.submit(() -> {
map.put("key" + key, key);
});
}
// 读取操作 - 不需要加锁
for (int i = 0; i < 100; i++) {
final int key = i;
executor.submit(() -> {
Integer value = map.get("key" + key);
System.out.println("读取: key" + key + " = " + value);
});
}
// 原子更新
map.compute("key1", (k, v) -> v == null ? 1 : v + 1);
executor.shutdown();
}
}
4.2 BlockingQueue 阻塞队列
public class BlockingQueueExample {
public static void main(String[] args) throws InterruptedException {
// 有界阻塞队列
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
// 生产者
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 20; i++) {
queue.put(i); // 队列满时阻塞
System.out.println("生产: " + i + " 队列大小: " + queue.size());
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 消费者
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 20; i++) {
Integer item = queue.take(); // 队列空时阻塞
System.out.println("消费: " + item + " 队列大小: " + queue.size());
Thread.sleep(150);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
producer.join();
consumer.join();
}
}
4.3 CopyOnWriteArrayList
public class CopyOnWriteExample {
public static void main(String[] args) {
// 写时复制列表,适合读多写少的场景
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
// 初始化数据
list.add("A");
list.add("B");
list.add("C");
// 多个线程同时读取
for (int i = 0; i < 5; i++) {
new Thread(() -> {
// 读取操作不需要加锁
for (String item : list) {
System.out.println(Thread.currentThread().getName() + " 读取: " + item);
}
}).start();
}
// 写操作(会复制整个数组)
new Thread(() -> {
list.add("D");
System.out.println("添加新元素 D");
}).start();
}
}
五、同步工具类(Synchronizers)
5.1 CountDownLatch - 倒计时门闩
public class CountDownLatchExample {
/**
* 场景:主线程等待多个子任务完成
*/
public static void main(String[] args) throws InterruptedException {
int taskCount = 5;
CountDownLatch startSignal = new CountDownLatch(1); // 开始信号
CountDownLatch doneSignal = new CountDownLatch(taskCount); // 完成信号
for (int i = 0; i < taskCount; i++) {
final int taskId = i;
new Thread(() -> {
try {
System.out.println("任务" + taskId + " 等待开始...");
startSignal.await(); // 等待开始信号
// 执行任务
System.out.println("任务" + taskId + " 开始执行");
Thread.sleep(1000 + taskId * 100); // 模拟任务执行时间
System.out.println("任务" + taskId + " 执行完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
doneSignal.countDown(); // 计数减1
}
}).start();
}
// 主线程准备工作
Thread.sleep(2000);
System.out.println("所有任务准备就绪,开始执行...");
startSignal.countDown(); // 发出开始信号
doneSignal.await(); // 等待所有任务完成
System.out.println("所有任务执行完成!");
}
}
5.2 CyclicBarrier - 循环屏障
public class CyclicBarrierExample {
/**
* 场景:多个线程互相等待,到达屏障点后一起继续执行
*/
public static void main(String[] args) {
int threadCount = 3;
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
// 所有线程到达屏障后执行的回调
System.out.println("所有线程已到达屏障,继续执行下一阶段");
});
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
new Thread(() -> {
try {
// 第一阶段工作
System.out.println("线程" + threadId + " 完成第一阶段工作");
barrier.await(); // 等待其他线程
// 第二阶段工作
System.out.println("线程" + threadId + " 完成第二阶段工作");
barrier.await();
// 第三阶段工作
System.out.println("线程" + threadId + " 完成第三阶段工作");
barrier.await();
System.out.println("线程" + threadId + " 全部工作完成");
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
}
5.3 Semaphore - 信号量
public class SemaphoreExample {
/**
* 场景:控制同时访问特定资源的线程数量
*/
public static void main(String[] args) {
// 模拟停车场,只有3个车位
Semaphore parkingLot = new Semaphore(3);
// 10辆车来停车
for (int i = 1; i <= 10; i++) {
new Thread(new Car(parkingLot, i)).start();
try {
Thread.sleep(100); // 车辆陆续到达
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Car implements Runnable {
private final Semaphore semaphore;
private final int carId;
public Car(Semaphore semaphore, int carId) {
this.semaphore = semaphore;
this.carId = carId;
}
@Override
public void run() {
try {
System.out.println("车辆" + carId + " 到达停车场,寻找车位...");
// 尝试获取许可(车位)
semaphore.acquire();
System.out.println("车辆" + carId + " 找到车位,开始停车");
// 模拟停车时间
Thread.sleep(2000 + new Random().nextInt(3000));
System.out.println("车辆" + carId + " 离开车位");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 释放许可(车位)
semaphore.release();
}
}
}
}
5.4 Exchanger - 交换器
public class ExchangerExample {
/**
* 场景:两个线程交换数据
*/
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
// 生产者线程
new Thread(() -> {
try {
String data = "生产的数据";
System.out.println("生产者发送: " + data);
String response = exchanger.exchange(data);
System.out.println("生产者收到: " + response);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// 消费者线程
new Thread(() -> {
try {
String data = "消费的数据";
System.out.println("消费者发送: " + data);
String response = exchanger.exchange(data);
System.out.println("消费者收到: " + response);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
六、线程池(Thread Pools)
6.1 线程池的创建和使用
public class ThreadPoolExample {
public static void main(String[] args) {
// 创建固定大小的线程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
// 创建缓存线程池
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
// 创建定时任务线程池
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
// 创建单线程线程池
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
// 提交任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
fixedThreadPool.submit(() -> {
System.out.println(Thread.currentThread().getName() + " 执行任务" + taskId);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 定时任务
scheduledThreadPool.scheduleAtFixedRate(() -> {
System.out.println("定时任务执行: " + new Date());
}, 0, 1, TimeUnit.SECONDS);
// 关闭线程池
fixedThreadPool.shutdown();
try {
// 等待线程池终止
if (!fixedThreadPool.awaitTermination(60, TimeUnit.SECONDS)) {
fixedThreadPool.shutdownNow();
}
} catch (InterruptedException e) {
fixedThreadPool.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
6.2 自定义线程池
public class CustomThreadPoolExample {
public static void main(String[] args) {
// 自定义线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
5, // 最大线程数
60L, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new ArrayBlockingQueue<>(10), // 工作队列
Executors.defaultThreadFactory(), // 线程工厂
new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
);
// 监控线程池状态
ScheduledExecutorService monitor = Executors.newScheduledThreadPool(1);
monitor.scheduleAtFixedRate(() -> {
System.out.println("线程池状态: " +
"核心线程数: " + executor.getCorePoolSize() +
", 活动线程数: " + executor.getActiveCount() +
", 任务数: " + executor.getTaskCount() +
", 完成数: " + executor.getCompletedTaskCount() +
", 队列大小: " + executor.getQueue().size());
}, 0, 1, TimeUnit.SECONDS);
// 提交任务
for (int i = 0; i < 20; i++) {
final int taskId = i;
try {
executor.execute(() -> {
System.out.println("执行任务: " + taskId + " 线程: " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
} catch (Exception e) {
System.out.println("任务" + taskId + " 被拒绝");
}
}
// 关闭
executor.shutdown();
monitor.shutdown();
}
}
七、Future和CompletableFuture
7.1 Future 基本使用
public class FutureExample {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交有返回值的任务
Future<String> future = executor.submit(() -> {
Thread.sleep(2000); // 模拟耗时操作
return "任务执行结果";
});
// 可以做其他事情
System.out.println("主线程继续执行其他任务...");
// 获取结果(阻塞)
String result = future.get();
System.out.println("获取结果: " + result);
// 带超时的获取
try {
String result2 = future.get(1, TimeUnit.SECONDS);
} catch (TimeoutException e) {
System.out.println("获取结果超时");
}
executor.shutdown();
}
}
7.2 CompletableFuture 异步编程
public class CompletableFutureExample {
public static void main(String[] args) throws Exception {
// 简单的异步任务
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("执行任务1...");
return "Hello";
});
// 任务链
CompletableFuture<String> future2 = future1.thenApplyAsync(result -> {
System.out.println("任务1的结果: " + result);
return result + " World";
});
// 组合多个任务
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "Java");
CompletableFuture<String> future4 = CompletableFuture.supplyAsync(() -> "Programming");
CompletableFuture<String> combinedFuture = future3.thenCombine(future4, (r1, r2) -> r1 + " " + r2);
// 所有任务完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future2, combinedFuture);
// 获取结果
System.out.println("任务2结果: " + future2.get());
System.out.println("组合任务结果: " + combinedFuture.get());
// 异常处理
CompletableFuture<String> safeFuture = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("随机异常");
}
return "Success";
}).exceptionally(ex -> {
System.out.println("处理异常: " + ex.getMessage());
return "Default Value";
});
System.out.println("安全任务结果: " + safeFuture.get());
}
}
八、实战案例:高性能缓存系统
public class ConcurrentCache<K, V> {
private final ConcurrentHashMap<K, V> cache = new ConcurrentHashMap<>();
private final ConcurrentHashMap<K, Long> expiryTimes = new ConcurrentHashMap<>();
private final ScheduledExecutorService cleaner = Executors.newScheduledThreadPool(1);
public ConcurrentCache() {
// 定期清理过期缓存
cleaner.scheduleAtFixedRate(this::cleanup, 1, 1, TimeUnit.MINUTES);
}
public void put(K key, V value, long ttl, TimeUnit unit) {
cache.put(key, value);
long expiryTime = System.currentTimeMillis() + unit.toMillis(ttl);
expiryTimes.put(key, expiryTime);
}
public V get(K key) {
Long expiryTime = expiryTimes.get(key);
if (expiryTime != null && System.currentTimeMillis() > expiryTime) {
// 已过期,移除
cache.remove(key);
expiryTimes.remove(key);
return null;
}
return cache.get(key);
}
public V computeIfAbsent(K key, Function<K, V> mappingFunction, long ttl, TimeUnit unit) {
return cache.computeIfAbsent(key, k -> {
V value = mappingFunction.apply(k);
long expiryTime = System.currentTimeMillis() + unit.toMillis(ttl);
expiryTimes.put(k, expiryTime);
return value;
});
}
private void cleanup() {
long now = System.currentTimeMillis();
expiryTimes.entrySet().removeIf(entry -> {
if (now > entry.getValue()) {
cache.remove(entry.getKey());
return true;
}
return false;
});
}
public void shutdown() {
cleaner.shutdown();
}
}
九、最佳实践和注意事项
9.1 性能优化建议
- 选择合适的并发工具:根据场景选择最合适的工具
- 避免锁竞争:减小锁的粒度,缩短锁持有时间
- 使用读写锁:读多写少的场景使用ReadWriteLock
- 合理配置线程池:根据任务类型配置合适的线程池参数
- 避免死锁:按固定顺序获取锁
9.2 常见陷阱
public class CommonMistakes {
// 错误1:在锁内调用外部方法(可能导致死锁)
public void wrongLockUsage() {
synchronized (this) {
externalMethod(); // 危险!可能获取其他锁
}
}
// 错误2:忘记在finally中释放锁
public void wrongLockRelease() {
ReentrantLock lock = new ReentrantLock();
lock.lock();
try {
// 业务逻辑
throw new RuntimeException("异常发生!"); // 锁无法释放
} finally {
// 应该在这里释放锁
// lock.unlock();
}
}
// 错误3:误用ConcurrentHashMap的原子性
public void wrongConcurrentMapUsage(ConcurrentHashMap<String, Integer> map) {
// 这不是原子操作!
if (!map.containsKey("key")) {
map.put("key", 1); // 可能被其他线程打断
}
// 正确的做法:使用原子方法
map.putIfAbsent("key", 1);
}
private void externalMethod() {
// 可能获取其他锁
}
}
十、总结
JUC包为Java并发编程提供了强大而灵活的工具集。掌握这些工具不仅能够编写出高性能的并发程序,还能更好地理解并发编程的本质。在实际开发中,要根据具体场景选择合适的工具,并遵循最佳实践,才能充分发挥JUC的强大能力。
希望这篇详细的JUC指南对您有帮助!如果有任何问题,欢迎在评论区讨论。