一.BlockingQueue
在Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景。
BlockingQueue 的核心方法
1.放入数据
add(E): boolean
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
将指定的元素插入到此队列中(如果立即可行且不会违反容量限制),在成功时返回 true,如果当前没有可用空间,则抛出 IllegalStateException。
offer(E e): boolean
offer(E e, long timeout, TimeUnit unit): boolean
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
//方法内部通过 putIndex 索引直接将 元素添加到数组 items
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;// 当putIndex 等于数组长度时,将 putIndex 重置为 0
count++;
notEmpty.signal();//唤醒处于等待状态下的线程,表示当前队列中的元素不为空,如果存在消费者线程阻塞,就可以开始取出元素
}
将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),在成功时返回 true,如果此队列已满,则返回 false。当使用有容量限制的队列时,此方法通常要优于 add 方法,后者可能无法插入元素,而只是抛出一个异常。
put(E e): boolean
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//这个也是获得锁,但 是和lock的区别是,这个方法优先允许在等待时由其他线程调 用等待线程的 interrupt 方法来中断等待直接返回。而 lock 方法是尝试获得锁成功后才响应中断
try {
while (count == items.length)
notFull.await();//队列满了的情况下,当前 线程将会被 notFull 条件对象挂起加到等待队列中
enqueue(e);
} finally {
lock.unlock();
}
}
将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。
2.获取数据
poll() 取走BlockingQueue里排在首位的对象
poll(long timeout, TimeUnit unit)同poll ,限制了超时时间
take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;
drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
JUC提供的阻塞队列
队列 | 介绍 |
---|---|
ArrayBlockingQueue | 数组实现的有界阻塞队列, 此队列按照先进先出(FIFO)的原则 对元素进行排序。 |
LinkedBlockingQueue | 链表实现的有界阻塞队列, 此队列的默认和最大长度为 Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行 排序 |
PriorityBlockingQueue | 支持优先级排序的无界阻塞队列, 默认情况下元素采取自然顺序 升序排列。也可以自定义类实现 compareTo()方法来指定元素 排序规则,或者初始化 PriorityBlockingQueue 时,指定构造 参数 Comparator 来对元素进行排序。 |
DelayQueue | 优先级队列实现的无界阻塞队列 |
SynchronousQueue | 不存储元素的阻塞队列, 每一个 put 操作必须等待一个 take 操 作,否则不能继续添加元素。 |
LinkedTransferQueue | 链表实现的无界阻塞队列 |
LinkedBlockingDeque | 链表实现的双向阻塞队列 |
使用案例
生产者消费者的实际使用
用户注册的时候,在注册成功以后发放积分。
主要考虑两方面的问题:
1.性能。注册时需要创建用户和发放积分,假设创建需要1s,发放积分需要1s,注册过程就会大于2s
2.耦合。添加用户和增加积分,可以认为是两个领域,也就是说,增加积分并不是注册必须要具备的功能,但是一旦增加积分这个逻辑出现异常,就会导致注册失败。
public class BlockingQueueTest {
//用线程池是为了熟悉线程池,无伤大雅
private final ExecutorService addIntegral =
new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
private final ExecutorService createUser =
new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
private ArrayBlockingQueue queue = new ArrayBlockingQueue(10);
private volatile boolean isRunning = true;
{
init();
}
private class User {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
public boolean register() {
createUser.execute(()->{
while (true){
User user = new User();
user.setName("dage");
addUser(user);
// sendPoints(user);
queue.add(user);
}
});
return true;
}
public void addUser(User user) {
System.out.println("添加用户:" + user.getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void sendPoints(User user) {
System.out.println("发送积分给指定用户:" + user);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void init() {
addIntegral.execute(() -> {
while (isRunning) {
try {
User user = (User) queue.take();
sendPoints(user);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
public static void main(String[] args) {
new BlockingQueueTest().register();
}
}
二.JUC中的原子操作类
多线程下可使用的线程安全的类
原子性这个概念,在多线程编程里是一个老生常谈的问题。 所谓的原子性表示一个或者多个操作,要么全部执行完,要么一个也不执行。不能出现成功一部分失败一部分的情况。
多线程里面,要实现原子性,有几 种方法,其中一种就是加 Synchronized 同步锁。而从 JDK1.5 开始,在 J.U.C 包中提供了 Atomic 包,提供了对于常用数据结构的原子操作。它提供了简单、高效、以及线程安全的更新一个变量的方式。
1. 原子更新基本类型
AtomicBoolean、AtomicInteger、AtomicLong
2. 原子更新数组
AtomicIntegerArray 、 AtomicLongArray 、AtomicReferenceArray
3. 原子更新引用
AtomicReference 、 AtomicReferenceFieldUpdater 、AtomicMarkableReference(更新带有标记位的引用类 型)
4. 原子更新字段
AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、 AtomicStampedReference
AtomicInteger 分析
贯彻大部分AtomicInteger 中方法的变量valueOffset,通过 unsafe.objectFieldOffset() 获取当前 Value 这个变量在内存中的偏移量。后续多个方法中都通过比较valueOffset 和expect 值,然后更新AtomicInteger 的值。
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -1);
}
private AtomicInteger integer= new AtomicInteger(0);
public void atomicIntegerTest(){
while (true){
new Thread(()->{
System.out.println(integer.getAndDecrement());
}).start();
new Thread(()->{
System.out.println(integer.getAndDecrement());
}).start();
new Thread(()->{
System.out.println(integer.getAndDecrement());
}).start();
}
}
public static void main(String[] args) {
new BlockingQueueTest().atomicIntegerTest();;
}