SpringBoot RabbitMQ消息队列的重试、超时、延时、死信队列

今天介绍使用SpringBoot实现RabbitMQ消息队列的高级用法。

  • MQ安装
  • 自动创建
  • 消息重试
  • 消息超时
  • 死信队列
  • 延时队列

一、RabbitMQ的安装

众所周知,RabbitMQ的安装相对复杂,需要先安装Erlang,再按着对应版本的RabbitMQ的服务端,最后为了方便管理还需要安装rabbitmq_management管理端插件,偶尔还会出现一些安装配置问题,故十分复杂。
在开发测试环境下使用docker来安装就方便多了,省去了环境和配置的麻烦。

1. 拉取官方image

docker pull rabbitmq:management

2. 启动RabbitMQ

docker run -dit --name MyRabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management

rabbitmq:management: image:tag
--name:指定容器名;
-d:后台运行容器;
-t:在新容器内指定一个伪终端或终端;
-i:允许你对容器内的标准输入 (STDIN) 进行交互;
-p:指定服务运行的端口(5672:应用访问端口;15672:控制台Web端口号);
-e:指定环境变量;(RABBITMQ_DEFAULT_USER:默认的用户名;RABBITMQ_DEFAULT_PASS:默认用户名的密码);

至此RabbitMQ就安装启动完成了,可以通过http://localhost:15672 登陆管理后台,用户名密码就是上面配置的admin/admin

二、使用SpringBoot自动创建队列

1. 引入amqp包

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. MQ配置

bootstrap.yml 配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /
    username: admin
    password: admin
    listener:
      simple:
        concurrency: 5
      direct:
        prefetch: 10

concurrency:每个listener在初始化的时候设置的并发消费者的个数
prefetch:每次从一次性从broker里面取的待消费的消息的个数

rabbitmq-spring.xml配置

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <!--接收消息的队列名-->
    <rabbit:queue name="login-user-logined" />
    <!--声明exchange的名称与类型-->
    <rabbit:topic-exchange name="login_barryhome_fun">
        <rabbit:bindings>
            <!--queue与exchange的绑定和匹配路由-->
            <rabbit:binding queue="login-user-logined" pattern="login.user.logined"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>
</beans>

rabbit:topic-exchange:声明为topic消息类型
pattern="login.user.logined":此处是一个表达式,可使用“*”表示一个词,“#”表示一个或多个词

3. 消息生产端

@Autowired
RabbitTemplate rabbitTemplate;

@GetMapping("/send")
public LoginUser SendLoginSucceedMessage(){
    LoginUser loginUser = getLoginUser("succeed");
    // 发送消息
    rabbitTemplate.convertAndSend(MessageConstant.MESSAGE_EXCHANGE,
            MessageConstant.LOGIN_ROUTING_KEY, loginUser);
    return loginUser;
}

@NoArgsConstructor
@AllArgsConstructor
public class LoginUser implements Serializable {
    String userName;
    String realName;
    String userToken;
    Date loginTime;
    String status;
}

这里需要注意的是默认情况下消息的转换器为SimpleMessageConverter只能解析stringbyte,故传递的消息对象必须是可序列化的,实现Serializable接口

SimpleMessageConverter only supports String, byte[] and Serializable payloads, received: fun.barryhome.cloud.dto.LoginUser

4. 消息消费端

@Component
public class ReceiverMessage {

    @RabbitListener(queues = "login-user-logined")
    public void receiveLoginMessage(LoginUser loginUser) {
        System.err.println(loginUser);
    }
}

@RabbitListener(queues = "login-user-logined"):用于监听名为login-user-logined 队列中的消息

5. 自动创建Queue

@SpringBootApplication
@ImportResource(value = "classpath:rabbitmq-spring.xml")
public class MQApplication {
    public static void main(String[] args) {
        SpringApplication.run(MQApplication.class, args);
    }
}

在没有导入xml且MQ服务器上没有列队的情况下,会导致找不到相关queue的错误

channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'login-user-logined' in vhost '/', class-id=50, method-id=10)

而导入之后将自动创建 exchangequeue

三、消息重试

默认情况下如果有消息消费出错后会一直重试,造成消息堵塞


如图可观察unackedtotal一直是1,但deliver/get飙升

消息堵塞之后也影响到后续消息的消费,时间越长越来越多的消息将无法及时消费处理。
如果是单条或极少量的消息有问题可通过多开节点concurrency将正常的消息消息掉,但如果较多则全部节点都将堵塞。

如果想遇到消息消费报错重试几次就舍弃,从而不影响后续消息的消费,如何实现呢?

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /
    username: admin
    password: admin
    listener:
      simple:
        concurrency: 5
        prefetch: 10
        retry:
          enabled: true   # 允许消息消费失败的重试
          max-attempts: 3   # 消息最多消费次数3次
          initial-interval: 2000    # 消息多次消费的间隔2秒

以上配置允许消息消费失败后重试3次,每次间隔2秒,如果还是失败则直接舍弃掉本条消息。
重试可解决因非消息体本身处理问题产生的临时性的故障,而将处理失败的消息直接舍弃掉只是为其它消息正常处理的权益之计而以,将业务操作降到相对低的影响。

四、消息超时

消息重试可解决因消息处理报错引起的问题。如果是消息处理过慢导致错过时效,除了可在处理逻辑中进行处理外,也可以通过消息的超时机制来处理,设定超时时间后将消息直接舍弃。

修改rabbitmq-spring.xml

<rabbit:queue name="login-user-logined">
    <rabbit:queue-arguments>
    <entry key="x-message-ttl" value="10000" value-type="java.lang.Long" />
    </rabbit:queue-arguments>
</rabbit:queue>

x-message-ttl:在消息服务器停留的时间(ms)


如果配置前已存在queue将不能被修改,需要删除原有queue后自动创建
创建成功后会在Features中有TTL标识

五、死信队列

死信队列就是当业务队列处理失败后,将消息根据routingKey转投到另一队列,这样的情况有:

  • 消息被拒绝 (basic.reject or basic.nack) 且带 requeue=false不重新入队参数或达到的retry重新入队的上限次数
  • 消息的TTL(Time To Live)-存活时间已经过期
  • 队列长度限制被超越(队列满,queue的"x-max-length"参数)

1. 修改rabbitmq-spring.xml

<!--接收消息的队列名-->
<rabbit:queue name="login-user-logined">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl" value="10000" value-type="java.lang.Long"/>
        <!--死信的交换机-->
        <entry key="x-dead-letter-exchange" value="login_barryhome_fun"/>
        <!--死信发送的路由-->
        <entry key="x-dead-letter-routing-key" value="login.user.login.dlq"/>
    </rabbit:queue-arguments>
</rabbit:queue>
<rabbit:queue name="login-user-logined-dlq"/>

<!--申明exchange的名称与类型-->
<rabbit:topic-exchange name="login_barryhome_fun">
    <rabbit:bindings>
        <!--queue与exchange的绑定和匹配路由-->
        <rabbit:binding queue="login-user-logined" pattern="login.user.logined"/>
        <rabbit:binding queue="login-user-logined-dlq" pattern="login.user.login.dlq"/>
    </rabbit:bindings>
</rabbit:topic-exchange>

通过对死信发送的交换机和路由的的设置,可将消息转向具体的queue中。这里交换机可以和原业务队列不是一个。
login-user-logined中的消息处理失败后将直接转投向login-user-logined-dlq队列中。
当程序逻辑修复后可再将消息再移回业务队列中move messages

2. 安装插件


如图提示需要先安装插件

3. 移动消息


安装成功后就可以输入业务队列名再转投

六、延时队列

延时队列除了可以做一般的延时处理外,还可以当作单个job的定时任务处理,比起一般通过定时器去轮询的方式更优雅。

1. 修改rabbitmq-spring.xml

<rabbit:topic-exchange name="login_barryhome_fun" delayed="true">

初次配置时,如果报以下错误,则是服务器不支持此命令,需要安装插件

Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message', class-id=40, method-id=10)

2. 安装插件

  1. 下载插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0

  2. 上传插件到docker容器中/plugins
    docker ps 查询rabbitmq的 CONTAINER ID

docker cp rabbitmq_delayed_message_exchange-3.8.0.ez 2c248563a2b0:/plugins
  1. 进入docker容器内部
docker exec -it 2c248563a2b0 /bin/bash
  1. 安装插件
cd /plugins
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

具体安装教程可参考:https://blog.csdn.net/magic_1024/article/details/103840681

安装成功后重启程序,观察mq管理端的exchange可发现

3. 发送延时消息

@GetMapping("/sendDelay")
public LoginUser SendDelayLoginSucceedMessage() {
    LoginUser loginUser = getLoginUser("succeed");

    MessagePostProcessor messagePostProcessor = message -> {
        // 延时10s
        message.getMessageProperties().setHeader("x-delay", 10000);
        return message;
    };

    // 发送消息
    rabbitTemplate.convertAndSend(MessageConstant.MESSAGE_EXCHANGE,
            MessageConstant.LOGIN_ROUTING_KEY, loginUser, messagePostProcessor);
    return loginUser;
}

需要注意的是消息的发送是实时的,消息服务器接收到消息待延时时间后再投到对应的queue中

七、完整代码

https://gitee.com/hypier/barry-cloud/tree/master/cloud-mq

八、请关注我的公众号

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343