Springboot 技术整合--笔记7--集成rabbitmq

注意点

  • 消息的唯一id
  • 生产者发送消息之前需要先创建路由key,否则消息无法发送
  • 消费者手动签收时,依赖消息通道Channel
  • 通常情况下生产者与消费者是在不同的web app
  • 消费者的@RabbitListener 会自动帮助我们创建队列、路由key及绑定关系

rabbitmq常用地址

#管理后台
http://192.168.0.19:15672

最佳测试方式是启动2个服务

如8081端口发布消息(测试用例)
8080接收消息

websocke+rabbitmqt流程.jpg

image.png

pom.xml导入依赖

spring-boot-starter-test是为了后面写测试类用,
spring-boot-starter-amqp才是真正的使用rabbitmq的依赖

        <!--添加rabbitmq -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>1.5.2.RELEASE</version>
        </dependency>
        <!--添加rabbitmq  测试用例用 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
<!--单元测试系列:Mock工具之Mockito实战 https://www.cnblogs.com/zishi/p/6780719.html -->

        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-core</artifactId>
            <version>2.7.19</version>
            <scope>test</scope>
        </dependency>

application.properties配置文件当中引入RabbitMQ基本的配置信息

############################################################
#
# RabbitMQ 配置
#
############################################################
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#虚拟主机如virtual-task
spring.rabbitmq.virtual-host=/
#链接超时时间15秒
spring.rabbitmq.connection-timeout=15000s

配置jackson部分--非必须的

############################################################
#
#jackson 部分
#如data部分的时间格式化
# 不允许传空
############################################################
spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
spring.jackson.time-zone=GMT+8
spring.jackson.default-property-inclusion=NON_NULL

创建RabbitConfig

使用@Value注解获取application.properties配置信息

/**
 Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,
 Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
 Queue:消息的载体,每个消息都会被投到一个或多个队列。
 Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
 Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
 vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
 Producer:消息生产者,就是投递消息的程序.
 Consumer:消息消费者,就是接受消息的程序.
 Channel:消息通道,在客户端的每个连接里,可建立多个channel.
 */

@Configuration
public class RabbitConfig {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Value("${spring.rabbitmq.host}")  //在application.properties中配置
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;


    public static final String EXCHANGE_TASK = "my-mq-exchange_task";
    public static final String EXCHANGE_B = "my-mq-exchange_B";
    public static final String EXCHANGE_C = "my-mq-exchange_C";


    public static final String QUEUE_A = "QUEUE_A";
    public static final String QUEUE_B = "QUEUE_B";
    public static final String QUEUE_C = "QUEUE_C";

    public static final String ROUTINGKEY_A = "spring-boot-routingKey_A";
    public static final String ROUTINGKEY_B = "spring-boot-routingKey_B";
    public static final String ROUTINGKEY_C = "spring-boot-routingKey_C";

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    //必须是prototype类型
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }

    /**
     * 针对消费者配置
     * 1. 设置交换机类型
     * 2. 将队列绑定到交换机
     FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
     HeadersExchange :通过添加属性key-value匹配
     DirectExchange:按照routingkey分发到指定队列
     TopicExchange:多关键字匹配
     */
    @Bean
    public DirectExchange defaultExchange() {
        return new DirectExchange(EXCHANGE_TASK);
    }
    /**
     * 获取队列A
     * @return
     */
    @Bean
    public Queue queueA() {
        return new Queue(QUEUE_A, true); //队列持久
    }
    @Bean
    public Binding binding() {

        return BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
    }

}

创建消息的生产者MsgProducer


/**
 * 消息的生产者
 */
@Component
public class MsgProducer implements RabbitTemplate.ConfirmCallback {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    //由于rabbitTemplate的scope属性设置为ConfigurableBeanFactory.SCOPE_PROTOTYPE,所以不能自动注入
    private RabbitTemplate rabbitTemplate;
    /**
     * 构造方法注入rabbitTemplate
     */
    @Autowired
    public MsgProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容
    }

    public void sendMsg(String content) {
        //消息ID
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); //
        //把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列A
        logger.info("rabbitMq 生产者消息  内容content = {} CorrelationData= {}", content,correlationId);
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_TASK, RabbitConfig.ROUTINGKEY_A, content, correlationId);
    }
    /**
     * 回调
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        logger.info(" 回调id:" + correlationData);
        if (ack) {
            logger.info("消息成功消费");
        } else {
            logger.info("消息消费失败:" + cause);
        }
    }
}

创建测试类RabbitMQTest

@SpringBootTest(classes= WechatTaskApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
public class RabbitMQTest {
    @Autowired
    private MsgProducer msgProducer;

    @Test
    public void testRabbit_Producer() {
        String mq = "{\n" +
                "    \"task_id\": 1462,\n" +
                "    \"wechat_id\": \"wxid_on8oksh88zo22\"\n" +
                "}";
        msgProducer.sendMsg(mq);
    }


}
运行testRabbit_Producer测试方法

运行测试类RabbitMQTestd的testRabbit_Producer


image.png

rabbitmq--消费端配置


############################################################
#
# Springboot RabbitMQ 基本配置 192.168.0.19
#192.168.2.34 ubuntu的ip被自动变为192.168.0.19
############################################################
spring.rabbitmq.host=192.168.0.19
spring.rabbitmq.port=5672
spring.rabbitmq.username=czg-admin
spring.rabbitmq.password=czg_pass
#虚拟主机如virtual-task
spring.rabbitmq.virtual-host=/
#链接超时时间15秒
spring.rabbitmq.connection-timeout=15000s



############################################################
#
# Springboot RabbitMQ 消费者配置(依赖RabbitMQ的基本配置)
#
############################################################
#最大并发数
spring.rabbitmq.listener.simple.concurrency=5
#最大并发支持
spring.rabbitmq.listener.simple.max-concurrency=10
#签收模式--可以手工或自动
spring.rabbitmq.listener.simple.acknowledge-mode=auto
#spring.rabbitmq.listener.simple.acknowledge-mode=manual
#限流(一条条过来,每一个线程即并发数一次消费一条)---在大流量时控制一次消费消息个数
spring.rabbitmq.listener.simple.prefetch=1
############################################################

添加消息的消费者MsgReceiver

**
 * 消费方
 * 默认情况下消息消费者是自动 ack (确认)消息的,如果要手动 ack(确认)则需要修改确认模式为 manual
 */
@Component
@RabbitListener(queues = RabbitConfig.QUEUE_A)
public class MsgReceiver {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

   /* //方式1=不处理消费确认
   @RabbitHandler
    public void process(String content) {
        logger.info("接收处理队列A当中的消息: " + content);
    }
*/

    /**
     * 方式2
     * Channel
     * @param message
     * @param channel com.rabbitmq.client.Channel https://blog.csdn.net/asdfsadfasdfsa/article/details/79671097
     * @param tag  好像都是1,2之类 deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel
     *
     * RabbitMQ(4)SpringBoot+RabbitMQ发送确认和消费手动确认机制
     * https://www.jianshu.com/p/fae8fca98522
     */
    @RabbitHandler
    public void processMessage2(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        logger.info("接收处理队列A当中的message = {},消息tag ={}" + message,tag);

        try {
            // 模拟执行任务
            Thread.sleep(1000);
            channel.basicAck(tag,false);            // 确认收到消息(需要在application.properties消费消息时指定非自动确认),消息将被队列移除,false只确认当前consumer一个消息收到,true确认所有consumer获得的消息。
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

再运行刚才的测试方法

image.png

关于rabbitmq的文章收录

rabbitmq消息队列的简单入门-csdn
Spring Boot整合RabbitMQ详细教程-csdn
springboot学习笔记-6 springboot整合RabbitMQ

RabbitMQ:消息发送确认 与 消息接收确认(ACK)
RabbitMQ(4)SpringBoot+RabbitMQ发送确认和消费手动确认机制
RabbitMQ消息中间件极速入门与实战--慕课

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 221,695评论 6 515
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,569评论 3 399
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 168,130评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,648评论 1 297
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,655评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,268评论 1 309
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,835评论 3 421
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,740评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,286评论 1 318
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,375评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,505评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,185评论 5 350
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,873评论 3 333
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,357评论 0 24
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,466评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,921评论 3 376
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,515评论 2 359

推荐阅读更多精彩内容