rabbitmq延时队列

延时队列
在实际业务场景中可能会用到延时消息发送,例如支付场景,准时支付、超过未支付将执行不同的方案,其中超时未支付可以看做一个延时消息。
RabbitMQ本身不具有延时消息队列的功能,但是可以通过TTL(Time To Live)、DLX(Dead Letter Exchanges)特性实现。其原理给消息设置过期时间,在消息队列上为过期消息指定转发器,这样消息过期后会转发到与指定转发器匹配的队列上,变向实现延时队列。利用RabbitMQ的这种特性,应该可以实现很多现实中的业务,我们可以发挥想象。
rabbitmq-delayed-message-exchange,我们也可以使用插件来实现延时队列。利用TTL、DLX实现的延时队列可以中断,使用插件实现的延时队列是否可以中断?.该功能是已插件的形式实现的。在做实验时确保有安装过该插件
查询插件列表:

rabbitmq-plugins list

image.png

如果没有安装插件
下载参考:https://blog.csdn.net/youjin/article/details/82586888

参考:https://blog.csdn.net/azhegps/article/details/53815117
https://www.cnblogs.com/haoxinyue/p/6613706.html

使用RabbitMQ来实现延迟任务必须先了解RabbitMQ的两个概念:消息的TTL和死信Exchange,通过这两者的组合来实现上述需求

过期方式有两种:
1.队列过期时间,发送该队列(queue1)的消息都是统一过期时间,过期后会进入死信队列(queue2)
2.消息过期时间,如果依次发送了10s,5s,3s的三条消息到该队列(queue1)的消息,队列会在10s过期才将消息从
queue1转发到queue2死信队列中(在队列queue1没有消费的情况下),监听队列queue2的消费者,这个时候才会消费到信息,
除非发送的消息达到queue1的
消费者消费的时候,也会在10s后才会消费这个过期消息,只有当过期的消息到了队列的顶端(队首),
才会被真正的丢弃或者进入死信队列(queue2)
如果发送的是5s,10s,3s的消息,则5s先消费,再消费10s和3s消息

实例:
1.先创建exchange:test.message
并绑定队列:my.timeout.message
routing: my.routing.key

2.创建队列:my.timeout.message
x-dead-letter-exchange: timeout-exchange
x-dead-letter-routing-key: test.timeout.message
durable: true

image.png

3.创建消费过期转发exchange:timeout-exchange,类型fanout方式

并绑定队列:my.timeout.message
4.创建队列:my.timeout.message


image.png

5.发送者

import java.io.IOException;
import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import com.rabbitmq.client.AMQP.BasicProperties;  
import com.rabbitmq.client.AMQP.BasicProperties.Builder;  
import com.rabbitmq.client.BuiltinExchangeType;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
  
/**
 * https://www.cnblogs.com/haoxinyue/p/6613706.html
 * 发送消息类 
 * @author
 * 
 */  
public class Send{
    private static final String EXCHANGE_NAME = "test.message";

    private final static String QUEUE_NAME = "my.timeout.message";

    private static final String ROUTKEY="my.routing.key";
    /** 
     * 在topic转发器的基础上练习延时转发,发送消息时指定消息过期时间 
     * 消息已发送到queue上,但未有consumer进行消费 
     * @param object 消息主体 
     * @throws IOException 
     */  
    public static void sendAToB(String object) throws Exception{
        Connection conn=MqManager.newConnection();  
        Channel channel=conn.createChannel();  
        //声明headers转发器  
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS,true);
        //定义headers存储的键值对  
        Map<String, Object> headers=new HashMap<String, Object>();  
        headers.put("key", "123456");  
        headers.put("token", "654321");
      Map<String, Object> args = new HashMap<String, Object>();
       // args.put("x-message-ttl",12000); //消息过去
        args.put("x-dead-letter-exchange", "timeout-exchange");//过期消息转向路由
        args.put("x-dead-letter-routing-key", "test.timeout.message");//过期消息转向路由相匹配routingkey
       channel.queueDeclare(QUEUE_NAME, true, false, false, args);
        //把键值对放在properties  
        Builder properties=new BasicProperties.Builder();  
        properties.headers(headers);  
        properties.deliveryMode(2);//持久化
        //指定消息过期时间为12秒,队列上也可以指定消息的过期时间,两者以较小时间为准  
        properties.expiration("12000");//(12000)延时12秒,不会及时删除(在consuemr消费时判定是否过期,因为每条消息的过期时间不一致,删除过期消息就需要扫描整个队列)
        channel.basicPublish(EXCHANGE_NAME,ROUTKEY ,properties.build(),object.getBytes());
        System.out.println("Send '"+object+"'"+",时间:"+DateUtils.dataToStr(new Date()));
        channel.close();  
        conn.close();  
    }
    public static void main(String[] args) throws Exception {  
        sendAToB("我开始测试延时消息了3!");
    }  
} 

6.延期消费者

import java.io.IOException;
import java.util.Date;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.Consumer;  
import com.rabbitmq.client.DefaultConsumer;  
import com.rabbitmq.client.Envelope;  
  
/** 
 * 延时消息处理类 
 * @author  
 * <p>
 *     即使一个消息比在同一队列中的其他消息提前过期,提前过期的也不会优先进入死信队列,它们还是按照入库的顺序让消费者消费。
 *     如果第一进去的消息过期时间是1小时,那么死信队列的消费者也许等1小时才能收到第一个消息。
 *     参考官方文档发现“Only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered).”只有当过期的消息到了队列的顶端(队首),
 *     才会被真正的丢弃或者进入死信队列。所以在考虑使用RabbitMQ来实现延迟任务队列的时候,需要确保业务上每个任务的延迟时间是一致的。
 *    如果遇到不同的任务类型需要不同的延时的话,需要为每一种不同延迟时间的消息建立单独的消息队列
 *
 *
 * </p>
 */

public class DelayRecv {
    private static final String EXCHANGE_NAME = "timeout-exchange";
    private final static String QUEUE_NAME = "my.timeout.message2";
    private static final String ROUTKEY="test.timeout.message";
    /** 
     * 创建队列并声明consumer用于处理转发过来的延时消息 
     * @throws Exception 
     */  
    public static void delayRecv() throws Exception{  
        Connection conn=MqManager.newConnection();  
        final  Channel channel=conn.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT,true);
        String queueName=channel.queueDeclare().getQueue();
        System.out.println("队列名称"+queueName);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTKEY);
        Consumer consumer=new DefaultConsumer(channel){  
            @Override  
            public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{
                String mes= new String(body);
                System.out.println(envelope.getRoutingKey()+":delay Received :'"+mes+"' done,接受时间:"+DateUtils.dataToStr(new Date()));
               // channel.basicAck(envelope.getDeliveryTag(), true);
//                if(mes.indexOf("8")>0){
//                    System.out.println("测试回滚");
//                    channel.basicNack(envelope.getDeliveryTag(),true, true);
//                    System.out.println("测试回滚完成");
//                }
            }  
        };
        System.out.println("执行开始 ");
        //关闭自动应答机制,默认开启;这时候需要手动进行应该  
      channel.basicConsume(QUEUE_NAME, true, consumer);

        System.out.println("执行完了");
    }  
      
    public static void main(String[] args) throws Exception {  
        delayRecv();  
    }  
  
} 
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 */
public class MqManager {

    public static Connection newConnection()throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");
        // 获取到连接以及mq通道
        Connection connection = factory.newConnection();
        return connection;
    }
}

sprintboot实现方式请见:https://www.jianshu.com/p/e75dab831d95

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,651评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,468评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,931评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,218评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,234评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,198评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,084评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,926评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,341评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,563评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,731评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,430评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,036评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,676评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,829评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,743评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,629评论 2 354