简介
DelayQueue 是java自带的一个支持延时获取元素的无界阻塞队列,队列内部使用PriorityQueue来实现。在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素,如果元素没有达到延时时间,就阻塞当前线程。
从源码可以看出DelayQueue是一个泛型队列,它接受的类型是继承Delayed的,需要重写getDelay和compareTo方法。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
public int compareTo(T o);
该方法主要在往DelayQueue里面加入数据会执行,根据此方法的返回值判断数据应该排在哪个位置。排得越前,越先被消费
long getDelay(TimeUnit unit);
该方法主要是判断消息是否到期(是否可以被读取出来)的依据。当返回负数,说明消息已到期,此时消息就可以被读取出来了。
优点:java自带,轻量级,使用简单
缺点:存储内存中,服务器重启会造成数据丢失,可配合redis使用。当然了,如果数量庞大的,推荐使用mq消息中间件实现
代码实现
1.定义一个订单类,实现Delayed接口
/**
* 订单类,用于存放订单头信息
*
* @author lanjerry
* @date 2019/2/14 10:52
*/
public class Order implements Delayed {
/**
* 单号
*/
private String orderNo;
/**
* 状态
*/
private String status;
/**
* 创建时间
*/
private LocalDateTime createdTime;
public String getOrderNo() {
return orderNo;
}
public void setOrderNo(String orderNo) {
this.orderNo = orderNo;
}
public LocalDateTime getCreatedTime() {
return createdTime;
}
public void setCreatedTime(LocalDateTime createdTime) {
this.createdTime = createdTime;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
/**
* 过期时间(单位为毫秒,这里表示10秒)
*/
private static final long expireTime = 10000;
@Override
public long getDelay(TimeUnit unit) {
//消息是否到期(是否可以被读取出来)判断的依据。当返回负数,说明消息已到期,此时消息就可以被读取出来了
long result = unit.convert(this.createdTime.toInstant(ZoneOffset.of("+8")).toEpochMilli() + expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return result;
}
@Override
public int compareTo(Delayed o) {
//这里根据取消时间来比较,如果取消时间小的,就会优先被队列提取出来
int result = this.getCreatedTime().compareTo(((Order) o).getCreatedTime());
return result;
}
}
2.测试compareTo
public static void main(String[] args) {
//初始化队列
DelayQueue<Order> queue = new DelayQueue<>();
LocalDateTime now = LocalDateTime.now();
Order order01 = new Order();
order01.setOrderNo("DD2019021401");
order01.setStatus("待付款");
order01.setCreatedTime(now);
queue.add(order01);
Order order02 = new Order();
order02.setOrderNo("DD2019021402");
order02.setStatus("待付款");
order02.setCreatedTime(now.minusHours(1));//时间-1小时
queue.add(order02);
}
测试结果:
3.完整代码
/**
* 模拟一个使用DelayQueue的场景
* 这里模拟的是订单下达之后,如果一直都还没支付,也就是停留在创建状态的话,就将其改成取消状态。
*
* @author lanjerry
* @date 2019/2/14 15:28
*/
public class TestDelayQueue {
/**
* 初始化延迟队列
*/
static DelayQueue<Order> queue = new DelayQueue<>();
public static void main(String[] args) throws InterruptedException {
//加入订单DD2019021401
producer("DD2019021401");
//停顿5秒,方便测试效果
Thread.sleep(5000);
//加入订单DD2019021402
producer("DD2019021402");
//执行消费
consumer();
}
/**
* 生产者
*
* @param orderNo 订单编号
* @author lanjerry
* @date 2019/2/14 15:35
*/
private static void producer(String orderNo) {
Order order = new Order();
order.setOrderNo(orderNo);
order.setStatus("待付款");
order.setCreatedTime(LocalDateTime.now());
queue.add(order);
System.out.println(String.format("时间:%s,订单:%s加入队列", order.getCreatedTime().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), order.getOrderNo()));
}
/**
* 消费者
*
* @author lanjerry
* @date 2019/2/14 15:36
*/
private static void consumer() {
try {
while (true) {
Order order = queue.take();
order.setStatus("已取消");
System.out.println(String.format("时间:%s,订单:%s已过期", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")), order.getOrderNo()));
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果:
修改为多线程版:
public static void main(String[] args) throws InterruptedException {
//创建生产者线程
Thread producerThread = new Thread(() -> {
for (int i = 1; i <= 20; i++) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
producer("DD20190214" + i);
}
});
producerThread.start();
//创建消费者线程
Thread consumerThread = new Thread(() -> {
consumer();
});
consumerThread.start();
}
运行结果:
总结:
现功能时的选择很重要,如果你的系统所处理的数据量不是很大,我觉得队列和缓存很适合你,这样你可以对消息的传递更加了解,但你使用MQ,kafka的中间件时,你会发现使用起来更加轻松,但对于数据量大的系统来说,中间件是最好的选择,在这个大数据的时代,高并发,多线程,分布式会越来越重要
数据量小推荐使用:DelayQueue+redis
数据量大推荐使用:RabbitMQ