深入理解JUC包:Java并发编程完全指南

在现代多核处理器时代,掌握并发编程是Java开发者必备的技能。Java Util Concurrent(JUC)包为我们提供了强大而高效的并发工具,本文将带你全面深入地掌握JUC的核心组件和使用技巧。

一、JUC包概述

1.1 什么是JUC?

JUC(java.util.concurrent)是Java 5.0引入的并发工具包,它提供了一系列比synchronized更灵活、性能更好的并发控制工具。

1.2 为什么要使用JUC?

  • 更高的性能:相比传统的synchronized,JUC提供了更细粒度的锁控制
  • 更好的可扩展性:支持更复杂的并发场景
  • 更丰富的功能:提供了线程池、并发集合、同步器等高级功能
  • 更好的可控制性:支持超时、中断等灵活控制

二、原子类(Atomic Classes)

2.1 原子类的作用

提供线程安全的、基于CAS(Compare-And-Swap)操作的原子更新。

import java.util.concurrent.atomic.*;

public class AtomicExample {
    
    public static void main(String[] args) throws InterruptedException {
        // 原子整数
        AtomicInteger atomicInt = new AtomicInteger(0);
        
        // 原子引用
        AtomicReference<String> atomicRef = new AtomicReference<>("initial");
        
        // 原子数组
        AtomicIntegerArray atomicArray = new AtomicIntegerArray(10);
        
        // 多个线程同时自增
        Thread[] threads = new Thread[10];
        for (int i = 0; i < 10; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    atomicInt.incrementAndGet(); // 原子自增
                    atomicRef.compareAndSet("initial", "updated"); // CAS操作
                }
            });
            threads[i].start();
        }
        
        // 等待所有线程完成
        for (Thread thread : threads) {
            thread.join();
        }
        
        System.out.println("最终结果: " + atomicInt.get()); // 保证是10000
        System.out.println("引用结果: " + atomicRef.get()); // 保证是updated
    }
}

2.2 常用原子类

  • AtomicInteger / AtomicLong:原子整数/长整数
  • AtomicBoolean:原子布尔值
  • AtomicReference<T>:原子引用
  • AtomicIntegerArray:原子整数数组
  • AtomicStampedReference:带版本号的原子引用,解决ABA问题

2.3 CAS原理

// CAS的伪代码实现
public class SimulatedCAS {
    private int value;
    
    public synchronized boolean compareAndSet(int expect, int update) {
        if (this.value == expect) {
            this.value = update;
            return true;
        }
        return false;
    }
}

三、锁机制(Locks)

3.1 ReentrantLock 可重入锁

import java.util.concurrent.locks.*;

public class ReentrantLockExample {
    private final ReentrantLock lock = new ReentrantLock();
    private int count = 0;
    
    public void increment() {
        lock.lock(); // 获取锁
        try {
            count++;
            System.out.println(Thread.currentThread().getName() + " count: " + count);
        } finally {
            lock.unlock(); // 必须在finally中释放锁
        }
    }
    
    // 尝试获取锁,带有超时时间
    public boolean tryIncrement() {
        try {
            if (lock.tryLock(1, TimeUnit.SECONDS)) {
                try {
                    count++;
                    return true;
                } finally {
                    lock.unlock();
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return false;
    }
    
    public static void main(String[] args) {
        ReentrantLockExample example = new ReentrantLockExample();
        
        // 多个线程同时执行
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 5; j++) {
                    example.increment();
                }
            }).start();
        }
    }
}

3.2 ReadWriteLock 读写锁

public class ReadWriteLockExample {
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    private final Lock readLock = rwLock.readLock();
    private final Lock writeLock = rwLock.writeLock();
    private Map<String, String> data = new HashMap<>();
    
    // 读操作 - 共享锁
    public String get(String key) {
        readLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " 读取数据");
            Thread.sleep(100); // 模拟读取耗时
            return data.get(key);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        } finally {
            readLock.unlock();
        }
    }
    
    // 写操作 - 排他锁
    public void put(String key, String value) {
        writeLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " 写入数据");
            Thread.sleep(200); // 模拟写入耗时
            data.put(key, value);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            writeLock.unlock();
        }
    }
}

3.3 Condition 条件变量

public class ConditionExample {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();  // 条件:不满
    private final Condition notEmpty = lock.newCondition(); // 条件:不空
    
    private final Object[] items = new Object[10];
    private int putPtr, takePtr, count;
    
    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length) {
                notFull.await(); // 等待不满条件
            }
            items[putPtr] = x;
            if (++putPtr == items.length) putPtr = 0;
            ++count;
            notEmpty.signal(); // 唤醒等待不空条件的线程
        } finally {
            lock.unlock();
        }
    }
    
    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                notEmpty.await(); // 等待不空条件
            }
            Object x = items[takePtr];
            if (++takePtr == items.length) takePtr = 0;
            --count;
            notFull.signal(); // 唤醒等待不满条件的线程
            return x;
        } finally {
            lock.unlock();
        }
    }
}

四、并发集合(Concurrent Collections)

4.1 ConcurrentHashMap

public class ConcurrentHashMapExample {
    
    public static void main(String[] args) {
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
        
        // 多个线程同时操作
        ExecutorService executor = Executors.newFixedThreadPool(10);
        
        // 写入操作
        for (int i = 0; i < 100; i++) {
            final int key = i;
            executor.submit(() -> {
                map.put("key" + key, key);
            });
        }
        
        // 读取操作 - 不需要加锁
        for (int i = 0; i < 100; i++) {
            final int key = i;
            executor.submit(() -> {
                Integer value = map.get("key" + key);
                System.out.println("读取: key" + key + " = " + value);
            });
        }
        
        // 原子更新
        map.compute("key1", (k, v) -> v == null ? 1 : v + 1);
        
        executor.shutdown();
    }
}

4.2 BlockingQueue 阻塞队列

public class BlockingQueueExample {
    
    public static void main(String[] args) throws InterruptedException {
        // 有界阻塞队列
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
        
        // 生产者
        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 20; i++) {
                    queue.put(i); // 队列满时阻塞
                    System.out.println("生产: " + i + " 队列大小: " + queue.size());
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        // 消费者
        Thread consumer = new Thread(() -> {
            try {
                for (int i = 0; i < 20; i++) {
                    Integer item = queue.take(); // 队列空时阻塞
                    System.out.println("消费: " + item + " 队列大小: " + queue.size());
                    Thread.sleep(150);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        producer.start();
        consumer.start();
        
        producer.join();
        consumer.join();
    }
}

4.3 CopyOnWriteArrayList

public class CopyOnWriteExample {
    
    public static void main(String[] args) {
        // 写时复制列表,适合读多写少的场景
        CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
        
        // 初始化数据
        list.add("A");
        list.add("B");
        list.add("C");
        
        // 多个线程同时读取
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                // 读取操作不需要加锁
                for (String item : list) {
                    System.out.println(Thread.currentThread().getName() + " 读取: " + item);
                }
            }).start();
        }
        
        // 写操作(会复制整个数组)
        new Thread(() -> {
            list.add("D");
            System.out.println("添加新元素 D");
        }).start();
    }
}

五、同步工具类(Synchronizers)

5.1 CountDownLatch - 倒计时门闩

public class CountDownLatchExample {
    
    /**
     * 场景:主线程等待多个子任务完成
     */
    public static void main(String[] args) throws InterruptedException {
        int taskCount = 5;
        CountDownLatch startSignal = new CountDownLatch(1); // 开始信号
        CountDownLatch doneSignal = new CountDownLatch(taskCount); // 完成信号
        
        for (int i = 0; i < taskCount; i++) {
            final int taskId = i;
            new Thread(() -> {
                try {
                    System.out.println("任务" + taskId + " 等待开始...");
                    startSignal.await(); // 等待开始信号
                    
                    // 执行任务
                    System.out.println("任务" + taskId + " 开始执行");
                    Thread.sleep(1000 + taskId * 100); // 模拟任务执行时间
                    System.out.println("任务" + taskId + " 执行完成");
                    
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    doneSignal.countDown(); // 计数减1
                }
            }).start();
        }
        
        // 主线程准备工作
        Thread.sleep(2000);
        System.out.println("所有任务准备就绪,开始执行...");
        startSignal.countDown(); // 发出开始信号
        
        doneSignal.await(); // 等待所有任务完成
        System.out.println("所有任务执行完成!");
    }
}

5.2 CyclicBarrier - 循环屏障

public class CyclicBarrierExample {
    
    /**
     * 场景:多个线程互相等待,到达屏障点后一起继续执行
     */
    public static void main(String[] args) {
        int threadCount = 3;
        CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
            // 所有线程到达屏障后执行的回调
            System.out.println("所有线程已到达屏障,继续执行下一阶段");
        });
        
        for (int i = 0; i < threadCount; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    // 第一阶段工作
                    System.out.println("线程" + threadId + " 完成第一阶段工作");
                    barrier.await(); // 等待其他线程
                    
                    // 第二阶段工作
                    System.out.println("线程" + threadId + " 完成第二阶段工作");
                    barrier.await();
                    
                    // 第三阶段工作
                    System.out.println("线程" + threadId + " 完成第三阶段工作");
                    barrier.await();
                    
                    System.out.println("线程" + threadId + " 全部工作完成");
                    
                } catch (InterruptedException | BrokenBarrierException e) {
                    Thread.currentThread().interrupt();
                }
            }).start();
        }
    }
}

5.3 Semaphore - 信号量

public class SemaphoreExample {
    
    /**
     * 场景:控制同时访问特定资源的线程数量
     */
    public static void main(String[] args) {
        // 模拟停车场,只有3个车位
        Semaphore parkingLot = new Semaphore(3);
        
        // 10辆车来停车
        for (int i = 1; i <= 10; i++) {
            new Thread(new Car(parkingLot, i)).start();
            try {
                Thread.sleep(100); // 车辆陆续到达
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    static class Car implements Runnable {
        private final Semaphore semaphore;
        private final int carId;
        
        public Car(Semaphore semaphore, int carId) {
            this.semaphore = semaphore;
            this.carId = carId;
        }
        
        @Override
        public void run() {
            try {
                System.out.println("车辆" + carId + " 到达停车场,寻找车位...");
                
                // 尝试获取许可(车位)
                semaphore.acquire();
                System.out.println("车辆" + carId + " 找到车位,开始停车");
                
                // 模拟停车时间
                Thread.sleep(2000 + new Random().nextInt(3000));
                
                System.out.println("车辆" + carId + " 离开车位");
                
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                // 释放许可(车位)
                semaphore.release();
            }
        }
    }
}

5.4 Exchanger - 交换器

public class ExchangerExample {
    
    /**
     * 场景:两个线程交换数据
     */
    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();
        
        // 生产者线程
        new Thread(() -> {
            try {
                String data = "生产的数据";
                System.out.println("生产者发送: " + data);
                String response = exchanger.exchange(data);
                System.out.println("生产者收到: " + response);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
        
        // 消费者线程
        new Thread(() -> {
            try {
                String data = "消费的数据";
                System.out.println("消费者发送: " + data);
                String response = exchanger.exchange(data);
                System.out.println("消费者收到: " + response);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).start();
    }
}

六、线程池(Thread Pools)

6.1 线程池的创建和使用

public class ThreadPoolExample {
    
    public static void main(String[] args) {
        // 创建固定大小的线程池
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
        
        // 创建缓存线程池
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        
        // 创建定时任务线程池
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);
        
        // 创建单线程线程池
        ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
        
        // 提交任务
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            fixedThreadPool.submit(() -> {
                System.out.println(Thread.currentThread().getName() + " 执行任务" + taskId);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        // 定时任务
        scheduledThreadPool.scheduleAtFixedRate(() -> {
            System.out.println("定时任务执行: " + new Date());
        }, 0, 1, TimeUnit.SECONDS);
        
        // 关闭线程池
        fixedThreadPool.shutdown();
        try {
            // 等待线程池终止
            if (!fixedThreadPool.awaitTermination(60, TimeUnit.SECONDS)) {
                fixedThreadPool.shutdownNow();
            }
        } catch (InterruptedException e) {
            fixedThreadPool.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

6.2 自定义线程池

public class CustomThreadPoolExample {
    
    public static void main(String[] args) {
        // 自定义线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2, // 核心线程数
            5, // 最大线程数
            60L, // 空闲线程存活时间
            TimeUnit.SECONDS, // 时间单位
            new ArrayBlockingQueue<>(10), // 工作队列
            Executors.defaultThreadFactory(), // 线程工厂
            new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
        );
        
        // 监控线程池状态
        ScheduledExecutorService monitor = Executors.newScheduledThreadPool(1);
        monitor.scheduleAtFixedRate(() -> {
            System.out.println("线程池状态: " +
                "核心线程数: " + executor.getCorePoolSize() +
                ", 活动线程数: " + executor.getActiveCount() +
                ", 任务数: " + executor.getTaskCount() +
                ", 完成数: " + executor.getCompletedTaskCount() +
                ", 队列大小: " + executor.getQueue().size());
        }, 0, 1, TimeUnit.SECONDS);
        
        // 提交任务
        for (int i = 0; i < 20; i++) {
            final int taskId = i;
            try {
                executor.execute(() -> {
                    System.out.println("执行任务: " + taskId + " 线程: " + Thread.currentThread().getName());
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                });
            } catch (Exception e) {
                System.out.println("任务" + taskId + " 被拒绝");
            }
        }
        
        // 关闭
        executor.shutdown();
        monitor.shutdown();
    }
}

七、Future和CompletableFuture

7.1 Future 基本使用

public class FutureExample {
    
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        // 提交有返回值的任务
        Future<String> future = executor.submit(() -> {
            Thread.sleep(2000); // 模拟耗时操作
            return "任务执行结果";
        });
        
        // 可以做其他事情
        System.out.println("主线程继续执行其他任务...");
        
        // 获取结果(阻塞)
        String result = future.get();
        System.out.println("获取结果: " + result);
        
        // 带超时的获取
        try {
            String result2 = future.get(1, TimeUnit.SECONDS);
        } catch (TimeoutException e) {
            System.out.println("获取结果超时");
        }
        
        executor.shutdown();
    }
}

7.2 CompletableFuture 异步编程

public class CompletableFutureExample {
    
    public static void main(String[] args) throws Exception {
        // 简单的异步任务
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("执行任务1...");
            return "Hello";
        });
        
        // 任务链
        CompletableFuture<String> future2 = future1.thenApplyAsync(result -> {
            System.out.println("任务1的结果: " + result);
            return result + " World";
        });
        
        // 组合多个任务
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "Java");
        CompletableFuture<String> future4 = CompletableFuture.supplyAsync(() -> "Programming");
        
        CompletableFuture<String> combinedFuture = future3.thenCombine(future4, (r1, r2) -> r1 + " " + r2);
        
        // 所有任务完成
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(future2, combinedFuture);
        
        // 获取结果
        System.out.println("任务2结果: " + future2.get());
        System.out.println("组合任务结果: " + combinedFuture.get());
        
        // 异常处理
        CompletableFuture<String> safeFuture = CompletableFuture.supplyAsync(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("随机异常");
            }
            return "Success";
        }).exceptionally(ex -> {
            System.out.println("处理异常: " + ex.getMessage());
            return "Default Value";
        });
        
        System.out.println("安全任务结果: " + safeFuture.get());
    }
}

八、实战案例:高性能缓存系统

public class ConcurrentCache<K, V> {
    private final ConcurrentHashMap<K, V> cache = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<K, Long> expiryTimes = new ConcurrentHashMap<>();
    private final ScheduledExecutorService cleaner = Executors.newScheduledThreadPool(1);
    
    public ConcurrentCache() {
        // 定期清理过期缓存
        cleaner.scheduleAtFixedRate(this::cleanup, 1, 1, TimeUnit.MINUTES);
    }
    
    public void put(K key, V value, long ttl, TimeUnit unit) {
        cache.put(key, value);
        long expiryTime = System.currentTimeMillis() + unit.toMillis(ttl);
        expiryTimes.put(key, expiryTime);
    }
    
    public V get(K key) {
        Long expiryTime = expiryTimes.get(key);
        if (expiryTime != null && System.currentTimeMillis() > expiryTime) {
            // 已过期,移除
            cache.remove(key);
            expiryTimes.remove(key);
            return null;
        }
        return cache.get(key);
    }
    
    public V computeIfAbsent(K key, Function<K, V> mappingFunction, long ttl, TimeUnit unit) {
        return cache.computeIfAbsent(key, k -> {
            V value = mappingFunction.apply(k);
            long expiryTime = System.currentTimeMillis() + unit.toMillis(ttl);
            expiryTimes.put(k, expiryTime);
            return value;
        });
    }
    
    private void cleanup() {
        long now = System.currentTimeMillis();
        expiryTimes.entrySet().removeIf(entry -> {
            if (now > entry.getValue()) {
                cache.remove(entry.getKey());
                return true;
            }
            return false;
        });
    }
    
    public void shutdown() {
        cleaner.shutdown();
    }
}

九、最佳实践和注意事项

9.1 性能优化建议

  1. 选择合适的并发工具:根据场景选择最合适的工具
  2. 避免锁竞争:减小锁的粒度,缩短锁持有时间
  3. 使用读写锁:读多写少的场景使用ReadWriteLock
  4. 合理配置线程池:根据任务类型配置合适的线程池参数
  5. 避免死锁:按固定顺序获取锁

9.2 常见陷阱

public class CommonMistakes {
    
    // 错误1:在锁内调用外部方法(可能导致死锁)
    public void wrongLockUsage() {
        synchronized (this) {
            externalMethod(); // 危险!可能获取其他锁
        }
    }
    
    // 错误2:忘记在finally中释放锁
    public void wrongLockRelease() {
        ReentrantLock lock = new ReentrantLock();
        lock.lock();
        try {
            // 业务逻辑
            throw new RuntimeException("异常发生!"); // 锁无法释放
        } finally {
            // 应该在这里释放锁
            // lock.unlock();
        }
    }
    
    // 错误3:误用ConcurrentHashMap的原子性
    public void wrongConcurrentMapUsage(ConcurrentHashMap<String, Integer> map) {
        // 这不是原子操作!
        if (!map.containsKey("key")) {
            map.put("key", 1); // 可能被其他线程打断
        }
        
        // 正确的做法:使用原子方法
        map.putIfAbsent("key", 1);
    }
    
    private void externalMethod() {
        // 可能获取其他锁
    }
}

十、总结

JUC包为Java并发编程提供了强大而灵活的工具集。掌握这些工具不仅能够编写出高性能的并发程序,还能更好地理解并发编程的本质。在实际开发中,要根据具体场景选择合适的工具,并遵循最佳实践,才能充分发挥JUC的强大能力。


希望这篇详细的JUC指南对您有帮助!如果有任何问题,欢迎在评论区讨论。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

友情链接更多精彩内容