阿里大牛带你轻松实现RabbitMQ 延时消息

RabbitMQ 延时消息的实现(上)

我们在实际业务中有一些需要延时发送消息的场景,例如:

  1. 家里有一台智能热水器,需要在30分钟后启动
  2. 未付款的订单,15分钟后关闭

注意这里的场景是延时,不是定时。当然,解决了延时,定时就很简单了(定时=当前时刻+间隔时间)。

由于RabbitMQ本身不支持延时队列(延时消息),所以要通过其他方式来实现。总的来说有三种:

  1. 先存储到数据库,用定时任务扫描,登记时刻+延时时间,就是需要投递的时刻
  2. 利用RabbitMQ的死信队列(Dead Letter Queue)实现
  3. 利用rabbitmq-delayed-message-exchange插件

定时任务实现比较简单,此处略过。我们来看一下后两种方案分别怎么实现。

前提知识:我们可以在发送消息时指定单条消息的存活时间(Time To Live,TTL)。也可以设置一个队列的消息过期时间。

这两种方式,当队列中的消息到达过期时间(比如30分钟)仍未被消费,就会被发送到队列的死信交换机(Dead Letter Exchange,DLX),被再次路由,此时再次路由到的队列就被称为死信队列(Dead Letter Queue)。需要注意,死信交换机和死信交换机都是基于其用途来描述的,它们实际上也是普通的交换机和普通的队列。如果队列没有指定DLX或者无法被路由到一个DLQ,则队列中过期的消息会被直接丢弃。

因此,我们可以利用消息TTL的特性,实现消息的延时投递。

1、设置单条消息的过期时间的方法:

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()

   .deliveryMode(2) // 持久化消息

   .contentEncoding("UTF-8")

   .expiration("10000") // TTL,10秒后没有被消费则被发送到DLX

   .build();

channel.basicPublish("", "TEST_TTL_QUEUE", properties, msg.getBytes()); //此处发送到 AMQP Default 这个默认的Direct类型的交换机,并路由到TEST_TTL_QUEUE队列

2、设置队列的消息过期时间的方法:

Map<String, Object> argss = new HashMap<String, Object>();

argss.put("x-message-ttl",6000); // TTL,6秒后没有被消费则被发送到DLX

channel.queueDeclare("TEST_TTL_QUEUE", false, false, false, argss);
1

注意:如果同时设置了消息的过期时间和队列的消息过期时间,则会取其中一个较小的值。比如消息设置5秒过期,队列设置消息10秒过期,则实际过期时间是5秒。

基于消息TTL,我们来看一下如何利用死信队列(DLQ)实现延时队列:

总体步骤:

1)创建一个交换机

2018已经过去过去,2019还想一成不变吗?拥抱变化,突破瓶颈,想要学习Java架构技术的朋友可以加我的群:725219329,群内每晚都会有阿里技术大牛讲解的最新Java架构技术。并会录制录播视频分享在群公告中,作为给广大朋友的加群的福利——分布式(Dubbo、Redis、RabbitMQ、Netty、RPC、Zookeeper、高并发、高可用架构)/微服务(Spring Boot、Spring Cloud)/源码(Spring、Mybatis)/性能优化(JVM、TomCat、MySQL)

2)创建一个队列,与上述交换机绑定,并且通过属性指定队列的死信交换机。

2

3)创建一个死信交换机

4)创建一个死信队列

4)将死信交换机绑定到死信队列

5)消费者监听死信队列

代码如下:

消费者:

因为此处使用默认的AMQP Default的Exchange,所以省略了第1)步,没有创建交换机。

这里用指定消息的TTL实现,所以设置队列TTL属性的代码注释了。

// 指定队列的死信交换机
Map<String,Object> arguments = ****new ****HashMap<String,Object>();
arguments.put(****"x-dead-letter-exchange"****,****"DLX_EXCHANGE"****);
// arguments.put("x-expires","9000"); // 设置队列的TTL

// 声明队列(默认交换机AMQP default,Direct)
channel.queueDeclare(****"TEST_DLX_QUEUE"****, ****false****, ****false****, ****false****, arguments);

// 声明死信交换机
channel.exchangeDeclare(****"DLX_EXCHANGE"****,****"topic"****, ****false****, ****false****, ****false****, ****null****);
// 声明死信队列
channel.queueDeclare(****"DLX_QUEUE"****, ****false****, ****false****, ****false****, ****null****);
// 绑定,此处 Dead letter routing key 设置为 #,****代表路由所有消息
channel.queueBind(****"DLX_QUEUE"****,****"DLX_EXCHANGE"****,****"#"****);

生产者:

String msg = ****"Hello world, Rabbit MQ, DLX MSG"****;

// 设置属性,消息10秒钟过期
AMQP.BasicProperties properties = ****new ****AMQP.BasicProperties.Builder()
.deliveryMode(2) // 持久化消息
.contentEncoding(****"UTF-8"****)
.expiration(****"10000"****) // TTL
.build();

// 发送消息
channel.basicPublish(****""****, ****"TEST_DLX_QUEUE"****, properties, msg.getBytes());

消息的流转流程

生产者——原交换机——原队列——(超过TTL之后)——死信交换机——死信队列——最终消费者

如图:

3

使用死信队列实现延时消息的缺点:

  1. 如果统一用队列来设置消息的TTL,当梯度非常多的情况下,比如1分钟,2分钟,5分钟,10分钟,20分钟,30分钟……需要创建很多交换机和队列来路由消息。
  2. 2018已经过去过去,2019还想一成不变吗?拥抱变化,突破瓶颈,想要学习Java架构技术的朋友可以加我的群:725219329,群内每晚都会有阿里技术大牛讲解的最新Java架构技术。并会录制录播视频分享在群公告中,作为给广大朋友的加群的福利——分布式(Dubbo、Redis、RabbitMQ、Netty、RPC、Zookeeper、高并发、高可用架构)/微服务(Spring Boot、Spring Cloud)/源码(Spring、Mybatis)/性能优化(JVM、TomCat、MySQL)
  3. 如果单独设置消息的TTL,则可能会造成队列中的消息阻塞——前一条消息没有出队(没有被消费),后面的消息无法投递。
  4. 可能存在一定的时间误差。

RabbitMQ 延时消息的实现(下)
在RabbitMQ 3.5.7及以后的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延时队列功能。同时插件依赖Erlang/OPT 18.0及以上。

插件源码地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

插件下载地址:

https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange

1、进入插件目录

whereis rabbitmq

cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.12/plugins

2、下载插件

wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez

如果下载的文件名带问号则需要改名,例如:

4

mv download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez rabbitmq_delayed_message_exchange-0.0.1.ez

5

3、启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

4、停用插件

rabbitmq-plugins disable rabbitmq_delayed_message_exchange

5、插件使用

通过声明一个x-delayed-message类型的exchange来使用delayed-messaging特性。x-delayed-message是插件提供的类型,并不是rabbitmq本身的(区别于direct、topic、fanout、headers)。

6

代码:

消费者(先启动):

**// 声明x-delayed-message类型的exchange**
Map<String, Object> argss = ****new ****HashMap<String, Object>();
argss.put(****"x-delayed-type"****, ****"direct"****);
channel.exchangeDeclare(****"DELAY_EXCHANGE"****, ****"x-delayed-message"****, ****false****,
        ****false****, argss);

**// 声明队列**
channel.queueDeclare(****"DELAY_QUEUE"****, ****false****,****false****,****false****,****null****);

**// 绑定交换机与队列**
channel.queueBind(****"DELAY_QUEUE"****, ****"DELAY_EXCHANGE"****, ****"DELAY_KEY"****);

**// 创建消费者**
Consumer consumer = ****new ****DefaultConsumer(channel) {
    [@Override](https://my.oschina.net/u/1162528)
    ****public void ****handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                               ****byte****[] body) ****throws ****IOException {
        String msg = ****new ****String(body, ****"UTF-8"****);
        SimpleDateFormat sf=****new ****SimpleDateFormat(****"yyyy-MM-dd HH:mm:ss.SSS"****);
        System.******out******.println(****"收到消息:[" ****+ msg + ****"]********\n********接收时间:" ****+sf.format(****new ****Date()));
    }
};

**// 开始获取消息**
channel.basicConsume(****"DELAY_QUEUE"****, ****true****, consumer);

生产者(后启动):

**// 延时投递,比如延时1分钟**
Date now = ****new ****Date();
Calendar calendar = Calendar.**getInstance**();
calendar.add(Calendar.******MINUTE******, +1);**// 1分钟后投递**
Date delayTime = calendar.getTime();

SimpleDateFormat sf = ****new ****SimpleDateFormat(****"yyyy-MM-dd HH:mm:ss.SSS"****);
String msg = ****"发送时间:" ****+ sf.format(now) + ****",投递时间:" ****+ sf.format(delayTime);

**// 延迟的间隔时间,目标时刻减去当前时刻**
Map<String, Object> headers = ****new ****HashMap<String, Object>();
headers.put(****"x-delay"****, delayTime.getTime() - now.getTime());

AMQP.BasicProperties.Builder props = ****new ****AMQP.BasicProperties.Builder()
        .headers(headers);
channel.basicPublish(****"DELAY_EXCHANGE"****, ****"DELAY_KEY"****, props.build(),
        msg.getBytes());

channel.close();
conn.close();

控制台输出:

收到消息:[发送时间:2019-01-15 20:44:41.000,投递时间:2019-01-15 20:45:41.003]

接收时间:2019-01-15 20:45:41.064

7
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,001评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,210评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,874评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,001评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,022评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,005评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,929评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,742评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,193评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,427评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,583评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,305评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,911评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,564评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,731评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,581评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,478评论 2 352

推荐阅读更多精彩内容