基于Rabbitmq实现延迟队列
延迟队列的使用场景
- 淘宝订单业务:下单后30min之内没有付款,就自动取消订单。
- 饿了吗订餐通知:下单成功后60s之后给用户发送短信通知。
- 关闭空闲连接:服务器中有很多客户端的连接,空闲一段时间之后需要关闭之。
- 缓存:缓存中的对象,超过了空闲时间,从缓存中移出。
- 任务超时处理:在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。
- 失败重试机制:业务操作失败后,间隔一定的时间进行失败重试。
这类业务的特点就是:延迟工作、失败重试。一种比较笨的方式是使用后台线程遍历所有对象,挨个检查。这种方法虽然简单好用,但是对象数量过多时,可能存在性能问题,检查间隔时间不好设置,间隔时间过大,影响精确度,过小则存在效率问题,而且做不到按超时的时间顺序处理。
本地延迟队列 DelayQueue
DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。Delayed扩展了Comparable接口,比较的基准为延时的时间值,Delayed接口的实现类getDelay的返回值应为固定值(final)。DelayQueue内部是使用PriorityQueue实现的。
DelayQueue = BlockingQueue + PriorityQueue + Delayed
DelayQueue的关键元素BlockingQueue、PriorityQueue、Delayed。可以这么说,DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。(注意:不能将null元素放置到这种队列)
但是我们知道,利用DelayQueue实现的是一个单机的、JVM内存中的延迟队列,并没有集群的支持,而且无法满足在对业务系统泵机的时、消息消费异常的时候做相应的逻辑处理。
基于分布式消息队列RabbitMQ实现延迟队列
RabbitMQ本身没有直接支持延迟队列功能,但是可以通过以下特性模拟出延迟队列的功能:
Per-Queue Message TTL RabbitMQ可以对消息和队列设置TTL(过期时间)。
RabbitMQ针对队列中的消息过期时间(Time To Live, TTL)有两种方法可以设置。第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息进行单独设置,每条消息TTL可以不同。如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead message,消费者将无法再收到该消息。
Dead Letter Exchanges 死信消息
利用DLX,当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。消息变成死信有以下几种情况:
- 消息被拒绝(basic.reject or basic.nack)并且requeue=false
- 消息TTL过期
- 队列达到最大长度
DLX同一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当队列中有死信消息时,RabbitMQ就会自动的将死信消息重新发布到设置的Exchange中去,进而被路由到另一个队列,publish可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ 3.0.0以前支持的immediate参数中的向publish确认的功能。
结合以上两个特性,就可以模拟出延迟消息的功能.
流程图
源代码
package hbec.app.stock.rabbitmq.utils;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
/**
* @Description <strong>基于RabbitMQ实现的分布式延迟重试队列</strong>
*
* <ul>
* <li>delayExchangeName : 交换器名称</li>
* <li>delayQueueName : 延迟队列名称</li>
* <li>delayRoutingKeyName : 路由器名称</li>
* <li>perDelayQueueMessageTTL : 延迟队列中message的默认ttl</li>
* </ul>
* 通过{@link RabbitMQDelayQueue#put(byte[], long, TimeUnit)}首次进入延迟队列的消息,
* 其ttl = min(message ttl, per queue message ttl),
* 消息被Reject/nack之后变成死信消息,其自带message ttl失效,
* 以后将按照{@link #perDelayQueueMessageTTL}指定的延迟时间投递给经由{@link RabbitMQDelayQueue#consumerRegister}注册的消费者,直到消息被Ack.
*
* @author roc roc.fly@qq.com
* @date Dec 9, 2016 3:29:39 PM
*/
public class RabbitMQDelayQueue {
private static Logger LOGGER = LoggerFactory.getLogger(RabbitMQDelayQueue.class);
private static final String POSTFIX_TASK = "_task";
// direct类型 交换器
public static final String EXCHANGE_TYPE_DIRECT = "direct";
private Connection connection;
//注册消费者
private ConsumerRegister consumerRegister;
//任务队列配置
private String taskExchangeName;
private String taskQueueName;
private String taskRoutingKeyName;
//延迟队列配置
private String delayExchangeName;
private String delayQueueName;
private String delayRoutingKeyName;
//延迟队列中的消息ttl
private long perDelayQueueMessageTTL;
public RabbitMQDelayQueue(Connection connection, ConsumerRegister consumerRegister, String delayExchangeName, String delayQueueName, String delayRoutingKeyName, long perDelayQueueMessageTTL) throws IOException {
this.connection = connection;
this.consumerRegister = consumerRegister;
this.delayExchangeName = delayExchangeName;
this.delayQueueName = delayQueueName;
this.delayRoutingKeyName = delayRoutingKeyName;
this.perDelayQueueMessageTTL = perDelayQueueMessageTTL;
this.taskExchangeName = delayExchangeName + POSTFIX_TASK;
this.taskQueueName = delayQueueName + POSTFIX_TASK;
this.taskRoutingKeyName = delayRoutingKeyName + POSTFIX_TASK;
init();
registerConsumer();
}
/**
*
* @Description 注册消费者
* @author roc roc.fly@qq.com
* @date Dec 29, 2016 1:36:25 PM
*/
public interface ConsumerRegister {
public Consumer register(Channel channel) throws IOException;
}
/**
* 注册带有ttl的queue和对应的任务队列
*
* @throws IOException
* @author roc
*/
private void init() throws IOException {
Channel channel = connection.createChannel();
channel.exchangeDeclare(taskExchangeName, EXCHANGE_TYPE_DIRECT, true);
channel.exchangeDeclare(delayExchangeName, EXCHANGE_TYPE_DIRECT, true);
// 任务队列 B
HashMap<String, Object> argumentsTask = Maps.newHashMap();
argumentsTask.put("x-dead-letter-exchange", delayExchangeName);
argumentsTask.put("x-dead-letter-routing-key", delayRoutingKeyName);
channel.queueDeclare(taskQueueName, true, false, false, argumentsTask);
channel.queueBind(taskQueueName, taskExchangeName, taskRoutingKeyName);
// 延迟队列 A
HashMap<String, Object> argumentsDelay = Maps.newHashMap();
argumentsDelay.put("x-dead-letter-exchange", taskExchangeName);
argumentsDelay.put("x-dead-letter-routing-key", taskRoutingKeyName);
argumentsDelay.put("x-message-ttl", perDelayQueueMessageTTL);
channel.queueDeclare(delayQueueName, true, false, false, argumentsDelay);
channel.queueBind(delayQueueName, delayExchangeName, delayRoutingKeyName);
channel.close();
}
/**
* 注册消费者
* @throws IOException
* @author roc
*/
private void registerConsumer() throws IOException {
LOGGER.info("register consumer ->{}", this);
Channel channel = connection.createChannel();
Consumer consumer = consumerRegister.register(channel);
channel.basicConsume(taskQueueName, false, consumer);
LOGGER.info("register consumer ->{} success", this);
}
/**
* 消息入队
*
* @param body 消息内容
* @param timeout 超时时间
* @param unit 超时时间单位
* @throws IOException
* @author roc
*/
public void put(byte[] body, long timeout, TimeUnit unit) throws IOException {
Preconditions.checkNotNull(body);
Preconditions.checkArgument(timeout >= 0);
Preconditions.checkNotNull(unit);
LOGGER.info("put element to delay queue ->{}", body.hashCode());
Channel channel = null;
try {
channel = connection.createChannel();
// deliveryMode=2 标识任务的持久性
long millis = unit.toMillis(timeout);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration(String.valueOf(millis)).deliveryMode(2).build();
channel.basicPublish(delayExchangeName, delayRoutingKeyName, properties, body);
LOGGER.info("put element to delay queue success");
} finally {
if (null != channel)
channel.close();
}
}
public static class Builder {
private Connection connection;
private ConsumerRegister consumerRegister;
private String delayExchangeName;
private String delayQueueName;
private String delayRoutingKeyName;
private long perDelayQueueMessageTTL;
public Builder setConnection(Connection connection) {
this.connection = connection;
return this;
}
public Builder setDelayExchangeName(String delayExchangeName) {
this.delayExchangeName = delayExchangeName;
return this;
}
public Builder setDelayQueueName(String delayQueueName) {
this.delayQueueName = delayQueueName;
return this;
}
public Builder setDelayRoutingKeyName(String delayRoutingKeyName) {
this.delayRoutingKeyName = delayRoutingKeyName;
return this;
}
public Builder setConsumerRegister(ConsumerRegister consumerRegister) {
this.consumerRegister = consumerRegister;
return this;
}
public Builder setPerDelayQueueMessageTTL(long timeout, TimeUnit unit) {
this.perDelayQueueMessageTTL = unit.toMillis(timeout);;
return this;
}
public RabbitMQDelayQueue build() throws IOException {
Preconditions.checkNotNull(connection);
Preconditions.checkNotNull(delayExchangeName);
Preconditions.checkNotNull(delayQueueName);
Preconditions.checkNotNull(delayRoutingKeyName);
Preconditions.checkNotNull(consumerRegister);
return new RabbitMQDelayQueue(connection, consumerRegister, delayExchangeName, delayQueueName, delayRoutingKeyName, perDelayQueueMessageTTL);
}
}
}
测试代码
package hbec.app.stock.rabbitmq.utils;
import hbec.app.stock.rabbitmq.utils.RabbitMQDelayQueue.ConsumerRegister;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* 测试demo
*
*/
public class RabbitMQDelayQueueTest {
public static void main(String[] args) throws IOException {
delayQueue();
}
public static void delayQueue() throws IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
Address address = new Address("10.0.30.67", 56720);
Connection connection = factory.newConnection(new Address[] { address });
RabbitMQDelayQueue delayQueue = new RabbitMQDelayQueue.Builder().setConnection(connection).setPerDelayQueueMessageTTL(15, TimeUnit.SECONDS).setDelayExchangeName("delay_exchange_roc").setDelayQueueName("delay_queue_roc").setDelayRoutingKeyName("delay_routing_key_roc").setConsumerRegister(new ConsumerRegister() {
@Override
public Consumer register(Channel channel) throws IOException {
return new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
String exchange = envelope.getExchange();
String routingKey = envelope.getRoutingKey();
// TODO do something
String content = new String(body, Charset.forName("utf-8"));
System.out.println("receive message --- > " + content);
Map<String, Object> headers = properties.getHeaders();
if (headers != null) {
List<Map<String, Object>> xDeath = (List<Map<String, Object>>) headers.get("x-death");
System.out.println("xDeath--- > " + xDeath);
if (xDeath != null && !xDeath.isEmpty()) {
Map<String, Object> entrys = xDeath.get(0);
}
}
// 消息拒收
// if(do something) 消息重新入队
getChannel().basicReject(deliveryTag, false);
// else 消息应答
// getChannel().basicAck(deliveryTag, false);
}
};
}
}).build();
delayQueue.put("{\"name\" : \"i am roc!!\"}\"".getBytes("UTF-8"), 3, TimeUnit.SECONDS);
}
}