基于Rabbitmq实现延迟队列

转自 基于Rabbitmq实现延迟队列

基于Rabbitmq实现延迟队列

延迟队列的使用场景

  1. 淘宝订单业务:下单后30min之内没有付款,就自动取消订单。
  2. 饿了吗订餐通知:下单成功后60s之后给用户发送短信通知。
  3. 关闭空闲连接:服务器中有很多客户端的连接,空闲一段时间之后需要关闭之。
  4. 缓存:缓存中的对象,超过了空闲时间,从缓存中移出。
  5. 任务超时处理:在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。
  6. 失败重试机制:业务操作失败后,间隔一定的时间进行失败重试。

这类业务的特点就是:延迟工作、失败重试。一种比较笨的方式是使用后台线程遍历所有对象,挨个检查。这种方法虽然简单好用,但是对象数量过多时,可能存在性能问题,检查间隔时间不好设置,间隔时间过大,影响精确度,过小则存在效率问题,而且做不到按超时的时间顺序处理。

本地延迟队列 DelayQueue

DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。Delayed扩展了Comparable接口,比较的基准为延时的时间值,Delayed接口的实现类getDelay的返回值应为固定值(final)。DelayQueue内部是使用PriorityQueue实现的。

DelayQueue = BlockingQueue + PriorityQueue + Delayed

DelayQueue的关键元素BlockingQueue、PriorityQueue、Delayed。可以这么说,DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。(注意:不能将null元素放置到这种队列)

但是我们知道,利用DelayQueue实现的是一个单机的、JVM内存中的延迟队列,并没有集群的支持,而且无法满足在对业务系统泵机的时、消息消费异常的时候做相应的逻辑处理。

基于分布式消息队列RabbitMQ实现延迟队列

RabbitMQ本身没有直接支持延迟队列功能,但是可以通过以下特性模拟出延迟队列的功能:

Per-Queue Message TTL RabbitMQ可以对消息和队列设置TTL(过期时间)。

RabbitMQ针对队列中的消息过期时间(Time To Live, TTL)有两种方法可以设置。第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息进行单独设置,每条消息TTL可以不同。如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead message,消费者将无法再收到该消息。

Dead Letter Exchanges 死信消息

利用DLX,当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。消息变成死信有以下几种情况:

  1. 消息被拒绝(basic.reject or basic.nack)并且requeue=false
  2. 消息TTL过期
  3. 队列达到最大长度

DLX同一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当队列中有死信消息时,RabbitMQ就会自动的将死信消息重新发布到设置的Exchange中去,进而被路由到另一个队列,publish可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ 3.0.0以前支持的immediate参数中的向publish确认的功能。

结合以上两个特性,就可以模拟出延迟消息的功能.

流程图

image.png

源代码

package hbec.app.stock.rabbitmq.utils;

import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;

/**
 * @Description <strong>基于RabbitMQ实现的分布式延迟重试队列</strong>
 *
 * <ul>
 *      <li>delayExchangeName : 交换器名称</li>
 *      <li>delayQueueName : 延迟队列名称</li>
 *      <li>delayRoutingKeyName : 路由器名称</li>
 *      <li>perDelayQueueMessageTTL : 延迟队列中message的默认ttl</li>
 * </ul>
 * 通过{@link RabbitMQDelayQueue#put(byte[], long, TimeUnit)}首次进入延迟队列的消息,
 * 其ttl = min(message ttl, per queue message ttl),
 * 消息被Reject/nack之后变成死信消息,其自带message ttl失效,
 * 以后将按照{@link #perDelayQueueMessageTTL}指定的延迟时间投递给经由{@link RabbitMQDelayQueue#consumerRegister}注册的消费者,直到消息被Ack.
 *
 * @author roc roc.fly@qq.com
 * @date Dec 9, 2016 3:29:39 PM
 */
public class RabbitMQDelayQueue {

    private static Logger LOGGER = LoggerFactory.getLogger(RabbitMQDelayQueue.class);

    private static final String POSTFIX_TASK = "_task";
    // direct类型 交换器
    public static final String EXCHANGE_TYPE_DIRECT = "direct";

    private Connection connection;
    //注册消费者
    private ConsumerRegister consumerRegister;

    //任务队列配置
    private String taskExchangeName;
    private String taskQueueName;
    private String taskRoutingKeyName;

    //延迟队列配置
    private String delayExchangeName;
    private String delayQueueName;
    private String delayRoutingKeyName;

    //延迟队列中的消息ttl
    private long perDelayQueueMessageTTL;

    public RabbitMQDelayQueue(Connection connection, ConsumerRegister consumerRegister, String delayExchangeName, String delayQueueName, String delayRoutingKeyName, long perDelayQueueMessageTTL) throws IOException {
        this.connection = connection;
        this.consumerRegister = consumerRegister;
        this.delayExchangeName = delayExchangeName;
        this.delayQueueName = delayQueueName;
        this.delayRoutingKeyName = delayRoutingKeyName;
        this.perDelayQueueMessageTTL = perDelayQueueMessageTTL;
        this.taskExchangeName = delayExchangeName + POSTFIX_TASK;
        this.taskQueueName = delayQueueName + POSTFIX_TASK;
        this.taskRoutingKeyName = delayRoutingKeyName + POSTFIX_TASK;
        init();
        registerConsumer();
    }


    /**
     *
     * @Description 注册消费者
     * @author roc roc.fly@qq.com
     * @date Dec 29, 2016 1:36:25 PM
     */
    public interface ConsumerRegister {
        public Consumer register(Channel channel) throws IOException;
    }

    /**
     * 注册带有ttl的queue和对应的任务队列
     *
     * @throws IOException
     * @author roc
     */
    private void init() throws IOException {
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(taskExchangeName, EXCHANGE_TYPE_DIRECT, true);
        channel.exchangeDeclare(delayExchangeName, EXCHANGE_TYPE_DIRECT, true);

        // 任务队列 B
        HashMap<String, Object> argumentsTask = Maps.newHashMap();
        argumentsTask.put("x-dead-letter-exchange", delayExchangeName);
        argumentsTask.put("x-dead-letter-routing-key", delayRoutingKeyName);
        channel.queueDeclare(taskQueueName, true, false, false, argumentsTask);
        channel.queueBind(taskQueueName, taskExchangeName, taskRoutingKeyName);

        // 延迟队列 A
        HashMap<String, Object> argumentsDelay = Maps.newHashMap();
        argumentsDelay.put("x-dead-letter-exchange", taskExchangeName);
        argumentsDelay.put("x-dead-letter-routing-key", taskRoutingKeyName);
        argumentsDelay.put("x-message-ttl", perDelayQueueMessageTTL);
        channel.queueDeclare(delayQueueName, true, false, false, argumentsDelay);
        channel.queueBind(delayQueueName, delayExchangeName, delayRoutingKeyName);

        channel.close();
    }

    /**
     * 注册消费者
     * @throws IOException
     * @author roc
     */
    private void registerConsumer() throws IOException {
        LOGGER.info("register consumer ->{}", this);
        Channel channel = connection.createChannel();
        Consumer consumer = consumerRegister.register(channel);
        channel.basicConsume(taskQueueName, false, consumer);
        LOGGER.info("register consumer ->{} success", this);
    }

    /**
     * 消息入队
     *
     * @param body 消息内容
     * @param timeout 超时时间
     * @param unit 超时时间单位
     * @throws IOException
     * @author roc
     */
    public void put(byte[] body, long timeout, TimeUnit unit) throws IOException {

        Preconditions.checkNotNull(body);
        Preconditions.checkArgument(timeout >= 0);
        Preconditions.checkNotNull(unit);

        LOGGER.info("put element to delay queue ->{}", body.hashCode());
        Channel channel = null;
        try {
            channel = connection.createChannel();
            // deliveryMode=2 标识任务的持久性
            long millis = unit.toMillis(timeout);
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration(String.valueOf(millis)).deliveryMode(2).build();
            channel.basicPublish(delayExchangeName, delayRoutingKeyName, properties, body);
            LOGGER.info("put element to delay queue success");
        } finally {
            if (null != channel)
                channel.close();
        }
    }

    public static class Builder {

        private Connection connection;
        private ConsumerRegister consumerRegister;

        private String delayExchangeName;
        private String delayQueueName;
        private String delayRoutingKeyName;

        private long perDelayQueueMessageTTL;

        public Builder setConnection(Connection connection) {
            this.connection = connection;
            return this;
        }

        public Builder setDelayExchangeName(String delayExchangeName) {
            this.delayExchangeName = delayExchangeName;
            return this;
        }

        public Builder setDelayQueueName(String delayQueueName) {
            this.delayQueueName = delayQueueName;
            return this;
        }

        public Builder setDelayRoutingKeyName(String delayRoutingKeyName) {
            this.delayRoutingKeyName = delayRoutingKeyName;
            return this;
        }

        public Builder setConsumerRegister(ConsumerRegister consumerRegister) {
            this.consumerRegister = consumerRegister;
            return this;
        }

        public Builder setPerDelayQueueMessageTTL(long timeout, TimeUnit unit) {
            this.perDelayQueueMessageTTL = unit.toMillis(timeout);;
            return this;
        }

        public RabbitMQDelayQueue build() throws IOException {
            Preconditions.checkNotNull(connection);
            Preconditions.checkNotNull(delayExchangeName);
            Preconditions.checkNotNull(delayQueueName);
            Preconditions.checkNotNull(delayRoutingKeyName);
            Preconditions.checkNotNull(consumerRegister);
            return new RabbitMQDelayQueue(connection, consumerRegister, delayExchangeName, delayQueueName, delayRoutingKeyName, perDelayQueueMessageTTL);
        }

    }

}

测试代码

package hbec.app.stock.rabbitmq.utils;

import hbec.app.stock.rabbitmq.utils.RabbitMQDelayQueue.ConsumerRegister;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 测试demo
 *
 */
public class RabbitMQDelayQueueTest {

    public static void main(String[] args) throws IOException {
        delayQueue();
    }

    public static void delayQueue() throws IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        Address address = new Address("10.0.30.67", 56720);
        Connection connection = factory.newConnection(new Address[] { address });

        RabbitMQDelayQueue delayQueue = new RabbitMQDelayQueue.Builder().setConnection(connection).setPerDelayQueueMessageTTL(15, TimeUnit.SECONDS).setDelayExchangeName("delay_exchange_roc").setDelayQueueName("delay_queue_roc").setDelayRoutingKeyName("delay_routing_key_roc").setConsumerRegister(new ConsumerRegister() {
            @Override
            public Consumer register(Channel channel) throws IOException {
                return new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
                        long deliveryTag = envelope.getDeliveryTag();
                        String exchange = envelope.getExchange();
                        String routingKey = envelope.getRoutingKey();
                        // TODO do something
                        String content = new String(body, Charset.forName("utf-8"));
                        System.out.println("receive message --- > " + content);

                        Map<String, Object> headers = properties.getHeaders();
                        if (headers != null) {
                            List<Map<String, Object>> xDeath = (List<Map<String, Object>>) headers.get("x-death");
                            System.out.println("xDeath--- > " + xDeath);
                            if (xDeath != null && !xDeath.isEmpty()) {
                                Map<String, Object> entrys = xDeath.get(0);
                            }
                        }
                        // 消息拒收
                        // if(do something) 消息重新入队
                            getChannel().basicReject(deliveryTag, false);
                        // else 消息应答
                        // getChannel().basicAck(deliveryTag, false);
                    }
                };
            }
        }).build();

        delayQueue.put("{\"name\" : \"i am roc!!\"}\"".getBytes("UTF-8"), 3, TimeUnit.SECONDS);

    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,001评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,210评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,874评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,001评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,022评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,005评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,929评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,742评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,193评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,427评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,583评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,305评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,911评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,564评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,731评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,581评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,478评论 2 352

推荐阅读更多精彩内容