DelayQueue

DelayQueue是BlockingQueue的一种,所以它是线程安全的,DelayQueue的特点就是插入Queue中的数据可以按照自定义的delay时间进行排序。只有delay时间小于0的元素才能够被取出。

DelayQueue是一个没有边界BlockingQueue实现,加入其中的元素必需实现Delayed接口。当生产者线程调用put之类的方法加入元素时,会触发Delayed接口中的compareTo方法进行排序,也就是说队列中元素的顺序是按到期时间排序的,而非它们进入队列的顺序。排在队列头部的元素是最早到期的,越往后到期时间赿晚。

package com.conrrentcy.juc;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class DelayedQueneTest {
    public static void main(String[] args) throws InterruptedException {
        Item item1 = new Item("item1", 5, TimeUnit.SECONDS);
        Item item2 = new Item("item2", 10, TimeUnit.SECONDS);
        Item item3 = new Item("item3", 15, TimeUnit.SECONDS);
        DelayQueue<Item> queue = new DelayQueue<>();
        queue.put(item1);
        queue.put(item2);
        queue.put(item3);
        System.out.println("begin time:"
                + LocalDateTime.now().format(
                        DateTimeFormatter.ISO_LOCAL_DATE_TIME));
        for (int i = 0; i < 3; i++) {
            Item take = queue.take();
            System.out
                    .format("name:{%s}, time:{%s}\n", take.name, LocalDateTime
                            .now().format(DateTimeFormatter.ISO_DATE_TIME));
        }
    }

}

class Item implements Delayed {
    /* 触发时间 */
    private long time;
    String name;

    public Item(String name, long time, TimeUnit unit) {
        this.name = name;
        this.time = System.currentTimeMillis()
                + (time > 0 ? unit.toMillis(time) : 0);
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return time - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
        Item item = (Item) o;
        long diff = this.time - item.time;
        if (diff <= 0) {// 改成>=会造成问题
            return -1;
        } else {
            return 1;
        }
    }

    @Override
    public String toString() {
        return "Item{" + "time=" + time + ", name='" + name + '\'' + '}';
    }
}

先看一下DelayQueue的定义:

public interface Delayed extends Comparable<Delayed> {
 
    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

由Delayed定义可以得知,队列元素需要实现getDelay(TimeUnit unit)方法和compareTo(Delayed o)方法, getDelay定义了剩余到期时间,compareTo方法定义了元素排序规则,注意,元素的排序规则影响了元素的获取顺序,为什么这样设计呢?

因为DelayQueue的底层存储是一个PriorityQueue,PriorityQueue是一个可排序的Queue,其中的元素必须实现Comparable方法。而getDelay方法则用来判断排序后的元素是否可以从Queue中取出。
内部存储结构  
DelayedQuene的元素存储交由优先级队列存放。

public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();//元素存放

DelayedQuene的优先级队列使用的排序方式是队列元素的compareTo方法,优先级队列存放顺序是从小到大的,所以队列元素的compareTo方法影响了队列的出队顺序。
若compareTo方法定义不当,会造成延时高的元素在队头,延时低的元素无法出队。

获取队列元素

    public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else
                return q.poll();
        } finally {
            lock.unlock();
        }
    }

PriorityQueue队列peek()方法。
public E peek() { return (size == 0) ? null : (E) queue[0];}

由代码我们可以看出,获取元素时,总是判断PriorityQueue队列的队首元素是否到期,若未到期,返回null,所以compareTo()的方法实现不当的话,会造成队首元素未到期,当队列中有到期元素却获取不到的情况。因此,队列元素的compareTo方法实现需要注意。

public E take() throws InterruptedException {
      final ReentrantLock lock = this.lock;
      lock.lockInterruptibly();
      try {
          for (;;) {
              E first = q.peek();
              if (first == null) //没有元素,让出线程,等待java.lang.Thread.State#WAITING
                  available.await();
              else {
                  long delay = first.getDelay(NANOSECONDS);
                  if (delay <= 0) // 已到期,元素出队
                      return q.poll();
                  first = null; // don't retain ref while waiting
                  if (leader != null) 
                      available.await();// 其它线程在leader线程TIMED_WAITING期间,会进入等待状态,这样可以只有一个线程去等待到时唤醒,避免大量唤醒操作
                 else { 
                      Thread thisThread = Thread.currentThread(); 
                       leader = thisThread; 
                       try { 
                       available.awaitNanos(delay);// 等待剩余时间后,再尝试获取元素,他在等待期间,由于leader是当前线程,所以其它线程会等待。 
                    } finally { 
                     if (leader == thisThread) leader = null; 
                    } 
              } 
         }
     } 
  } finally { 
    if (leader == null && q.peek() != null) 
             available.signal();
           lock.unlock(); 
  } 
}

使用场景:

DQueue非常有用的。我们利用DQueue的延时特性,可以讲DQueue应用于以下场景:

1:缓存的设计。可以利用Dqueue保存缓存元素的有效期。使用一个线程循环的从队列中获取数据。一旦获取到数据,就说明缓存有效期到了。

2:定时任务调度。可以使用Dqueue保存需要执行的任务和任务执行的时间,一旦从DQueue中获取到了任务,就开始执行任务了。比如TimerQueue就是使用了DelayQueue来实现的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容