RabbitMQ与SpringBoot2.0整合

application.properties:

spring.rabbitmq.addresses=192.
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

RabbitMQ与SpringBoot整合配置详解:

1. 生产端核心配置


  • publisher-confirms,实现一个监听器用于监听Broker端为我们返回的确认请求:
    RabbitTemplate.ConfirmCallback

  • publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续的处理,保证消息的路由成功:
    RabbitTemplate.ReturnCallback

  • 注意一点,在发送消息时候对template进行设置mandatory=true保证监听有效

  • 生产端还可以配置其他属性,比如发送重试、超时时间、次数、间隔等。

RabbitSender:

package com.pyy.springboot.producer;


import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class RabbitSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.err.println("correlationData:" + correlationData);
            System.err.println("ack:" + ack);
            if(!ack) {
                System.err.println("异常处理...");
            }else {
                // 更新数据库对应的消息状态:已发送
            }
        }
    };


    final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) {
            System.err.println("return exchange:" + exchange + " , routingKey:" + routingKey + ", replyCode:" + replyCode + ", replyText:" + replyText);
        }
    };

    public void send(Object message, Map<String, Object> headerProperties) throws Exception {
        MessageHeaders messageHeaders = new MessageHeaders(headerProperties);
        Message msg = MessageBuilder.createMessage(message, messageHeaders);

        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);

        CorrelationData correlationData = new CorrelationData();
        correlationData.setId("userid" + System.currentTimeMillis());// id + 时间戳 全局唯一 实际消息的id
        //rabbitTemplate.convertAndSend("pyy.exchange", "springboot.hello", msg, correlationData);

        rabbitTemplate.convertAndSend("pyy.exchange", "fasdfsf.hello", msg, correlationData);

    }
}

2. 消费端核心配置


spring.rabbitmq.listener.simple.acknowledge-mode=MANUAL
spring.rabbitmq.listener.simple.concurrency=1
spring.rabbitmq.listener.simple.max-concurrency=5
  • 首先配置ACK手工确认模式,用于ACK的手工处理,这样我可以保证消息的可靠性送达,或者在消费失败时候可以做到重回队列、根据业务记录日志等处理。

  • 可以设置消费端的监听个数和最大个数,用于控制消费端的并发情况

@RabbitListener注解使用

  • 消费端监听@RabbitMQListener注解,这个对于在实际工作中非常的好用

  • @RabbitListener只一个组合的注解,里面可以注解配置@QueueBinding@Queue@Exchange直接通过这个组合注解一次性搞定消费端交换机、队列、绑定、路由、并且配置监听功能等

package com.pyy.mq.service;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * 消息接收者
 * @RabbitListener bindings:绑定队列
 *   @QueueBinding  value:绑定队列的名称
 *                exchange:配置交换器
 * 
 *     @Queue value:配置队列名称
 *        autoDelete:是否是一个可删除的临时队列
 * 
 *     @Exchange value:为交换器起个名称
 *           type:指定具体的交换器类型
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.info}", autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.DIRECT),
                key = "${mq.config.queue.info.routing.key}"
        )
)
public class InfoReceiver {

    /**
     * 接收消息方法,采用消息队列监听机制
     * @param msg
     */
    @RabbitHandler
    public void process(String msg) {
        System.out.println("Info receiver:" + msg);
    }
}

@RabbitListener注解如果没有存在exchange和queue会自动创建

案例详细代码:https://github.com/pyygithub/springboot-rabbitmq

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 16,187评论 2 11
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 136,273评论 19 139
  • 前言 在微服务架构的系统中,我们通常会使用轻量级的消息代理来构建一个共用的消息主题让系统中所有微服务实例都连接上来...
    Chandler_珏瑜阅读 11,671评论 2 39
  • Spring整合rabbitmq实践(一):基础Spring整合rabbitmq实践(三):源码 3. 扩展实践 ...
    jinchaolv阅读 13,416评论 1 7
  • 时针指向了一点,钟表继续着滴答的作响。夜黑漆漆且静悄悄的。我独自疲乏的躺在沙发上,房间里的灯光有些昏暗了。电视机里...
    伶仃sir阅读 1,571评论 0 2

友情链接更多精彩内容