DelayQueue是什么
DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。
DelayQueue能做什么
交易结束后,需要通知业务方交易结果,如果第一次通知失败,则需要延迟3分钟发起第二次通知
具体实现
定义元素类,作为队列的元素
DelayQueue只能添加(offer/put/add)实现了Delayed接口的对象,意思是说我们不能想往DelayQueue里添加什么就添加什么,不能添加int、也不能添加String进去,必须添加我们自己的实现了Delayed接口的类的对象,来代码:
@Data
public class DelaySendInfo implements Delayed {
private String name;
// 延迟时间
private long delayTime;
// 过期时间
private long expireTime;
public DelaySendInfo(String name, long delayTime) {
this.name = name;
this.expireTime = System.currentTimeMillis() + delayTime;
}
/**
* 用于取DelayQueue里面的元素时判断是否到了延时时间,否则不予获取,是则获取
*当返回值小于等于0,则表示时间过期,该元素会被队列取出
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
long diff = unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return diff;
}
/**
* 该方法用于确认delayQueue中的元素顺序,值越大越靠后
* 返回值等于0,表示该元素和比较元素相等
* 返回值小于0,表示该元素小于比较元素
* 返回值大于0,表示该元素大于比较元素
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
return (int) (this.expireTime - ((DelaySendInfo) o).getDelayTime());
}
}
写一个线程模拟延迟发送通知
public class ProdecerThread extends Thread{
DelayQueue<DelaySendInfo>delayQueue;
public ProdecerThread(DelayQueue<DelaySendInfo>delayQueue){
this.delayQueue=delayQueue;
}
@Override
public void run() {
// 模拟延迟发送通知,第一次延迟1秒发送,第二次延迟2秒发送,第三次延迟3秒发送
for(int i=1;i<4;i++){
DelaySendInfo delaySendInfo=new DelaySendInfo(i+"",i*1000,"SUCCESS");
delayQueue.offer(delaySendInfo);
}
}
}
写一个线程模拟不停从队列中读取要发送的消息
@Slf4j
public class ConsumerThread extends Thread{
private DelayQueue<DelaySendInfo>delayQueue;
public ConsumerThread(DelayQueue<DelaySendInfo>delayQueue){
this.delayQueue=delayQueue;
}
// 模拟生产中的线程,不停去delayQueue中取定时发送的元素
@Override
public void run() {
while(true){
try {
DelaySendInfo delaySendInfo= delayQueue.take();
log.info("{}取出元素:{}",new Date(),delaySendInfo);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
执行
public static void main(String[] args) {
DelayQueue<DelaySendInfo>delayQueue=new DelayQueue<>();
new ConsumerThread(delayQueue).start();
new ProdecerThread(delayQueue).start();
}
结果打印
23:04:46.451 [Thread-0] INFO com.epay.send.ConsumerThread - Sun Mar 27 23:04:46 CST 2022取出元素:DelaySendInfo(name=1, delayTime=0, expireTime=1648393486435, msg=SUCCESS)
23:04:47.450 [Thread-0] INFO com.epay.send.ConsumerThread - Sun Mar 27 23:04:47 CST 2022取出元素:DelaySendInfo(name=2, delayTime=0, expireTime=1648393487435, msg=SUCCESS)
23:04:48.450 [Thread-0] INFO com.epay.send.ConsumerThread - Sun Mar 27 23:04:48 CST 2022取出元素:DelaySendInfo(name=3, delayTime=0, expireTime=1648393488435, msg=SUCCESS)