SpringBoot整合RabbitMQ

简介:主要讲述RabbitMQ的使用

RabbitMQ消息中间件

  • 使用Erlang语言编写的一个实现分布式消息队列的开软软件

主要有四种交换机策略,四种策略都有一个共同点,他们都是由消息的生产者(Producer)和消息的消费者(Consumer)所构成。

1. 安装RabbitMQ

这里是在Docker里面进行运行RabbitMQ,详情参考下面链接

Docker安装RabbitMQ

2. DirectExchange

  • 2.1 说明:该策略交换机绑定一个队列,生产者推送消息到交换机,交换机只会推送到绑定的队列中

  • 2.2 图解

    DiectExchange

SpringBoot实现

  • 2.3 依赖安装
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.5.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.Ryan</groupId>
    <artifactId>rabbitmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rabbitmq</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- 实现对 RabbitMQ 的自动化配置 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

  • 2.4 application.yml配置RabbitMQ
spring:
  # RabbitMQ 配置项,对应 RabbitProperties 配置类
  rabbitmq:
    host: 175.24.16.47            # RabbitMQ 服务的地址(腾讯云上地址)
    port: 5672                    # RabbitMQ 服务的端口
    username: guest               # RabbitMQ 服务的账号
    password: guest               # RabbitMQ 服务的密码  
  • 2.5 RabbitMQ的DirectExchange策略的Java配置类
@Configuration
public class RabbitConfig {

   /**
     * Direct Exchange 示例的配置类
     */
    public static class DirectExchangeDemoConfiguration {

        // 创建 Queue
        @Bean
        public Queue demo01Queue() {
            return new Queue(DirectMessage .QUEUE, // Queue 名字
                    true, // durable: 是否持久化
                    false, // exclusive: 是否排它
                    false); // autoDelete: 是否自动删除
        }

        // 创建 Direct Exchange
        @Bean
        public DirectExchange demo01Exchange() {
            return new DirectExchange(DirectMessage .EXCHANGE,
                    true,  // durable: 是否持久化
                    false);  // exclusive: 是否排它
        }

        // 创建 Binding
        // Exchange:DirectMessage .EXCHANGE
        // Routing key:DirectMessage .ROUTING_KEY
        // Queue:DirectMessage .QUEUE
        @Bean
        public Binding demo01Binding() {
            return BindingBuilder.bind(demo01Queue()).to(demo01Exchange()).with(DemoMessage.ROUTING_KEY);
        }

    }

}
  • 2.6 定义消息队列配置信息DirectMessage
package com.ryan.rabbitmq.message;

import java.io.Serializable;

public class DirectMessage implements Serializable {

    public static final String QUEUE = "QUEUE_DEMO_01";

    public static final String EXCHANGE = "EXCHANGE_DEMO_01";

    public static final String ROUTING_KEY = "ROUTING_KEY_01";

    /**
     * 编号
     */
    private Integer id;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }
}

  • 2.7 定义生产者
package com.ryan.rabbitmq.producer;

import com.ryan.rabbitmq.message.DirectMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;


@Component
public class DirectProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void syncSend(Integer id) {
        // 创建 Demo01Message 消息
        DirectMessage message = new DirectMessage();
        message.setId(id);
        // 同步发送消息
        rabbitTemplate.convertAndSend(DirectMessage.EXCHANGE, DirectMessage.ROUTING_KEY, message);
    }

    public void syncSendDefault(Integer id) {
        // 创建 Demo01Message 消息
        DirectMessage message = new DirectMessage();
        message.setId(id);
        // 同步发送消息
        rabbitTemplate.convertAndSend(DirectMessage.QUEUE, message);
    }

    @Async
    public ListenableFuture<Void> asyncSend(Integer id) {
        try {
            // 发送消息
            this.syncSend(id);
            // 返回成功的 Future
            return AsyncResult.forValue(null);
        } catch (Throwable ex) {
            // 返回异常的 Future
            return AsyncResult.forExecutionException(ex);
        }
    }
}
  • 2.8 定义消费者
package com.ryan.rabbitmq.consumer;

import com.ryan.rabbitmq.message.DirectMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = DirectMessage.QUEUE)
public class Direct {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @RabbitHandler
    public void onMessage(DirectMessage message) {
        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
    }
}

通过@RabbitListener注解进行绑定队列,@RabbitHandler进行注明消息的处理逻辑代码。

  • 2.9 测试运行
    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private DirectProducer directProducer;

    /**
     * 测试同步发送
     *
     * @throws InterruptedException
     */
    @Test
    void testDirectSyncSend() throws InterruptedException {
        int id = (int) (System.currentTimeMillis() / 1000);
        directProducer.syncSend(id);
        logger.info("[testSyncSend][发送编号:[{}] 发送成功]", id);

        // 阻塞等待,保证消费
        new CountDownLatch(1).await();
    }

    @Test
    public void tesDirectSyncSendDefault() throws InterruptedException {
        int id = (int) (System.currentTimeMillis() / 1000);
        directProducer.syncSendDefault(id);
        logger.info("[tesSyncSendDefault][发送编号:[{}] 发送成功]", id);

        // 阻塞等待,保证消费
        new CountDownLatch(1).await();
    }

    @Test
    public void testDirectAsyncSend() throws InterruptedException {
        int id = (int) (System.currentTimeMillis() / 1000);
        directProducer.asyncSend(id).addCallback(new ListenableFutureCallback<Void>() {

            @Override
            public void onFailure(Throwable e) {
                logger.info("[testASyncSend][发送编号:[{}] 发送异常]]", id, e);
            }

            @Override
            public void onSuccess(Void aVoid) {
                logger.info("[testASyncSend][发送编号:[{}] 发送成功,发送成功]", id);
            }

        });
        logger.info("[testASyncSend][发送编号:[{}] 调用完成]", id);

        // 阻塞等待,保证消费
        new CountDownLatch(1).await();
    }
  • 2.10 运行结果
2020-11-12 09:28:04.178  INFO 6576 --- [           main] c.r.rabbitmq.RabbitmqApplicationTests    : [testSyncSend][发送编号:[1605144484] 发送成功]
2020-11-12 09:28:04.238  INFO 6576 --- [ntContainer#0-1] com.ryan.rabbitmq.consumer.Direct        : [onMessage][线程编号:17 消息内容:com.ryan.rabbitmq.message.DirectMessage@7ca1598c]
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容