SpringBoot 集成 RabbitMQ 从基础到进阶

SpringBoot 集成 RabbitMQ 分为 5 个部分:增加 pom 依赖,增加 yml 配置,新建 MQ 配置,生产者开发,消费者开发。

基础版

1. 新增 pom 依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2. 新增 yml 依赖

注意!yml 文件的依赖,生产者和消费者的配置有些不一样。

生产者

spring.rabbitmq.addresses=192.168.11.76:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

消费者

spring.rabbitmq.addresses=192.168.11.76:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10

3. MQ 初始化配置

注意!MQ 初始化有很多种实现方式,不止这一种。

  1. 注意!微服务下,MQ 初始化配置如果只在生产者或者消费者一端实现,则该端需要先启动。如果可以,放到公共配置服务下,启动的时候最先启动配置服务。然后在启动生产者和消费者。
  2. 在 springboot 启动类上加上 @ComponentScan 注解,并扫描到初始化配置的类。
  3. 编写初始化配置的类。代码作用请看注释,示例如下:
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MainConfig {

    // 队列名称
    public final static String Q_NAME = "spring-boot-queue-1";
    // 交换机名称
    public final static String EXCHANG_NAME = "spring-boot-exchange-1";
    // 绑定的值
    public static final String BIND_KEY = "spring-boot-bind-key-1";

    // 定义队列,第二个参数是表示持久化
    @Bean
    Queue queue() {
        return new Queue(Q_NAME, true);
    }

    // 定义交换机,第二个参数是表示持久化,第三个参数表示当最后一个绑定被解绑的时候,不会自动删除该 exchange
    @Bean
    TopicExchange exchange() {
        return new TopicExchange(EXCHANG_NAME, true, false);
    }

    // 绑定队列和交换机,因为是 TopicExchange
    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(BIND_KEY);
    }
}
  1. 关于什么是 TopicExchange,为什么队列和交换机要绑定,这是基础。如果看不同可以看看我写的入门的文章。
  2. MQ 初始化好之后,就可以编写生产者和开发者的时候使用这些。
  3. 这只是一个配置实例,实际开发中可能需要将 exchange 和 Q 的信息从配置文件中获取,而不是在代码中写死。

4. 生产者

@Component
public class SendMsg1 {
    // 此接口的默认实现是RabbitTemplate,目前只有一个实现,
    @Autowired
    private AmqpTemplate amqpTemplate;

    // 发送消息
    public void send_1(String msgContent) {
        amqpTemplate.convertAndSend(MainConfig.EXCHANG_NAME, MainConfig.BIND_KEY, msgContent);
    }
}

5. 消费者

@Component
public class ReceiveMsg1 {

    /**
     * 获取信息: queue也可以支持RabbitMQ中对队列的模糊匹配
     * @param content
     */
    @RabbitListener(queues = MainConfig.Q_NAME)
    public void receive_1(String content) {
        // ...
        System.out.println("[ReceiveMsg-1] receive msg: " + content);
    }
}

6. 使用

public class SimpleTest {

    @Autowired
    private SendMsg1 sendMsg1;

    @Test
    public void sendAndReceive_1(){
        String testContent = "send msg via spring boot - 1";
        sendMsg1.send_1(testContent);
        try {
            Thread.sleep(1000 * 10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

进阶版

基础版代码接近一个 demo,在开发中还要考虑很多其他的东西。进阶版就是在基础版的基础上进行完善,保证消息 100% 投递。

进阶版的 pom,yml,MQ 初始化都和基础版一样,无需更改。

知识背景

  1. 首先要知道 confirmCallbackreturnCallback 分别是什么。rabbitmq 整个消息投递的路径为:
    生产者->rabbitmq broker cluster->exchange->Q->消费者。这个路径在另一篇《RabbitMQ 核心概念与命令》中有图片清晰展示出来。

    消息从生产者到 rabbitmq broker cluster 成功则会返回一个 confirmCallback 。
    消息从 exchange 到 queue 投递失败则会返回一个 returnCallback 。

第一次升级,解决消息可靠性投递的问题。

生产者升级

生产者在投递消息的时候,能够知道消息是否成功投递,成功投递怎么处理,投递失败怎么补偿就要看具体的业务逻辑了。因为业务不同,实现不同。可以参考另一篇《如何保障消息 100% 投递成功?》的文章。

@Component
public class RabbitSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;  
    
    // ack 为 true 表示正常,false 表示异常
    final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.err.println("correlationData: " + correlationData);
            System.err.println("ack: " + ack);
            if(!ack){
                System.err.println("异常处理....");
            }
        }
    };
    
    // 消息没有正确送到 Q 里,需要做额外的处理,这里的实现只是打印。
    final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
                String exchange, String routingKey) {
            System.err.println("return exchange: " + exchange + ", routingKey: " 
                + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
        }
    };
    
    //发送消息方法调用: 构建Message消息
    public void send(Object message, Map<String, Object> properties) throws Exception {
        MessageHeaders mhs = new MessageHeaders(properties);
        Message msg = MessageBuilder.createMessage(message, mhs);

        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        
        //id + 时间戳 全局唯一。 没有投递成功的时候可以通过这个值判断是哪个消息。
        CorrelationData correlationData = new CorrelationData("1234567890");

        rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
    }
}

消费者升级

消费者拿到消息具体的业务处理,各有不同。

import com.bfxy.springboot.MainConfig;
import com.rabbitmq.client.Channel;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
public class RabbitReceiver {

    /**
     * 这是消费者监听消息的方法
     * @RabbitListener 是监听哪一个队列
     * @param message
     * @param channel
     * @throws Exception
     */
    @RabbitListener(queues=MainConfig.Q_NAME)
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端Payload: " + message.getPayload());
        
        //手工ACK,这一步尤为重要,因为在配置文件中配置了手动 ACK,所有需要有 ACK 的动作。
        Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(deliveryTag, false);
    }
}

第二次升级,解决 MQ 传递 java 对象的问题。

如果 MQ 中传递 java 对象,则需要生产者和消费者都定义好这个对象。尤其是在 SpringCloud 中,生产者和消费者很可能不在一个服务中。

以下是对象示例:

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order implements Serializable {

    private String id;
    private String name;
}

一定要实现序列化接口,因为 MQ 只可以投递 String 和 byte[] 类型。

生产者修改

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.bfxy.springboot.entity.Order;

@Component
public class RabbitSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;  
    
    // ack 为 true 表示正常,false 表示异常
    final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.err.println("correlationData: " + correlationData);
            System.err.println("ack: " + ack);
            if(!ack){
                System.err.println("异常处理....");
            }
        }
    };
    
    // 消息没有正确送到 Q 里,需要做额外的处理,这里的实现只是打印。
    final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
                String exchange, String routingKey) {
            System.err.println("return exchange: " + exchange + ", routingKey: " 
                + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
        }
    };

    //发送消息方法调用: 构建Message消息
    public void sendOrder(Order order) throws Exception {
        // 如果不需要设置 Headers 则没有必要加。MQ 有默认的 Headers 设置。
        // MessageHeaders mhs = new MessageHeaders(properties);
        // Message msg = MessageBuilder.createMessage(order, mhs);

        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);

        //id + 时间戳 全局唯一。 没有投递成功的时候可以通过这个值判断是哪个消息。
        CorrelationData correlationData = new CorrelationData("1234567890");
        rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", order, correlationData);
    }
}

消费者修改

import com.bfxy.springboot.MainConfig;
import com.bfxy.springboot.entity.Order;
import com.rabbitmq.client.Channel;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class RabbitReceiver {
    /**
     * 将 Message 拆成 Payload 和 Headers
     * @param order Q 里传递的 java 对象
     * @param channel
     * @param headers
     * @throws Exception
     */
    @RabbitListener(queues = MainConfig.Q_NAME)
    @RabbitHandler
    public void onOrderMessage(@Payload Order order, Channel channel, @Headers Map<String, Object> headers) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端 orderID: " + order.getId());

        //手工ACK,这一步尤为重要,因为在配置文件中配置了手动 ACK,所有需要有 ACK 的动作。
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(deliveryTag, false);
    }
}

使用

public class ApplicationTests {
    @Autowired
    private RabbitSender rabbitSender;

    public void testSender2() throws Exception {
         Order order = new Order("001", "第一个订单");
         rabbitSender.sendOrder(order);
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,922评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,591评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,546评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,467评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,553评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,580评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,588评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,334评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,780评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,092评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,270评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,925评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,573评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,194评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,437评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,154评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352