java.util.concurrent.DelayQueue采用了Leader-Follower模式,结合源码理解下这种模式的编码实现。
先总结一下Leader-Follower模式
举个例子来理解这些情况,拿饭店员工来对照理解。
- 单Reactor多线程模式
饭店的员工一般都是分角色的,比如接待员、服务员、厨师等等。- 假如有一个叫做A的人,固定他作为饭店接待员,来客人了就分给客人一个座位号,然后交给其他服务员,比如B进行后续处理。
- B会根据座位号为客人引路,为客人点菜等等。
- 如果把A、B比作两个线程,客人比作任务,任务由A处理,到交接给B处理,有一次线程上下文切换。
- Leader-Follower模式
这次饭店不分角色了,每个人都是接待员和服务员,统称为员工。- 每次只能有一个员工在门口等待,比如A先在门口等待,其他员工在屋里歇着。
- 来客人了的话,A会叫一个其他员工,比如B来门口接替自己。
- 然后A开始为客人服务,比如分配座位号,引路,点菜等全流程服务。
- 拿线程来说的话,就是接受任务,处理任务都是由线程A负责,没有线程上下文切换。
- DelayQueue的Leader-Follower模式
这次饭店也不分角色,都是员工,但是改变了经营策略,每个客人必须预约吃饭时间,预约采用APP预约。因为加入了延时,逻辑变得复杂了一些。- 每次还是只能有一个员工在门口等待,比如A先在门口等待,A看了眼预约登记表,发现离预约最早到店的时间还有30分钟,A就什么都不干了,先休息30分钟。
- 其他员工还是先在屋里歇着,但是因为采用APP预约,客人约几点都有可能,如果此时有客人约的是10分钟后到店,因为A要30分钟后才能醒来干活,所以如果这位客人来了,门口就没有人接待了。
- 对于这个问题,饭店的软件系统在监听到最早到店时间变了的话,会再叫一个员工来门口等待,此员工可能是新员工B,也可能是叫醒了之前在门口休息的员工A。我们叫这位新员工X。
- 如果新员工X发现最早到店时间是现在,或者客人已经来了,就会叫一个员工C来门口接替自己,并立即开始为客人提供全流程服务。
- 如果新员工X发现最早到店时间是10分钟后,新员工X就像A之前一样,什么都不干了,先休息10分钟。
- 如果最早到店时间没有变化,还是30分钟后,软件系统不会叫人,其他员工看到A在门口等待,自己可以安心的在屋里歇着,等着A叫人替换他。
- 员工A在30分钟后醒来,客人也到了,A会叫一个同事比如B接替自己,而A为客人提供全流程服务。
DelayQueue相关源码
-
属性定义
//锁 private final transient ReentrantLock lock = new ReentrantLock(); //优先级队列,最小堆 private final PriorityQueue<E> q = new PriorityQueue<E>(); //leader线程 private Thread leader = null; //监视器 private final Condition available = lock.newCondition();
offer
相当于饭店的软件系统,提供预约服务,并监听最早到店时间的变化。
```java
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//当前线程执行q.offer,向最小堆中添加延迟任务
q.offer(e);
//如果这个延迟任务e,刚好因为set的延迟时间最短,
//成为了堆顶元素,也就是即将最先出队的元素
if (q.peek() == e) {
//清空leader标识,唤醒一个follower,使其能够成为leader。
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
```
-
take
相当于饭店的员工,为客人提供全流程服务。public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll(); first = null; // don't retain ref while waiting //从这里开始才是Leader-Follower模式 if (leader != null) //有leader了,所以follwer无限等待 available.await(); else { //还没有leader Thread thisThread = Thread.currentThread(); //当前线程成为leader leader = thisThread; try { //leader只等待一个过期周期就醒来 available.awaitNanos(delay); } finally { if (leader == thisThread) //释放leader身份 leader = null; } } } } } finally { if (leader == null && q.peek() != null) //没有leader 并且 队列还有元素的话 唤醒一个做leader available.signal(); lock.unlock(); } }