Springboot与RabbitMQ的整合

首先需弄明白RabbitMq的 exchange、route、queue的关系
可参照:(http://blog.csdn.net/samxx8/article/details/47417133)

RabbitMQ 三种Exchange

  • Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
  • Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
  • Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。

以下6个实例展示大部分应用场景:

*1.单生产者和单消费者

新建springboot maven工程 springboot-rabbitmq-OneToOne

pom.xml配置如下:

其中spring-boot-starter-amqp 是 rabbitmq 核心包。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.dev</groupId>
    <artifactId>springboot-rabbitmq-OneToOne</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>springboot-rabbitmq-demo</name>
    <description>springboot-rabbitmq-demo</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.4.RELEASE</version>
        <relativePath/> 
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>
</project>

application.yml配置如下:
spring: 
  application:
    name: rabbit-service
  rabbitmq:
    host: xxx.xxx.xxx.xxx    #rabbitmq服务器地址
    port: xxxx               #rabbitmq服务器端口
    username: xxx
    password: xxx
主类
package com.rabbit;

import org.springframework.amqp.core.Queue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import com.rabbit.queue.QueueName;

@SpringBootApplication
public class SpringbootRabbitmqDemoApplication {
    
    @Bean
    public Queue helloQueue() {//划重点:自动创建Queue
        return new Queue(QueueName.OneToOneQueue);
    }

    public static void main(String[] args) {
        SpringApplication.run(SpringbootRabbitmqDemoApplication.class, args);
    }
}
Queue名称定义
package com.rabbit.queue;
public class QueueName {    
//此处定义必须是final不然会报错
//The value for annotation attribute RabbitListener.queues must be a constant expression
public final static String OneToOneQueue="OneToOneQueue";
}
发送端
package com.rabbit.hello;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.rabbit.queue.QueueName;

@Component
public class Sender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String sendMsg = "Sender: hello rabbitMQ ";
        System.out.println(sendMsg);
        //向指定Queue发送文本消息
        this.rabbitTemplate.convertAndSend(QueueName.OneToOneQueue, sendMsg);
    }

}
接收端
package com.rabbit.hello;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.rabbit.queue.QueueName;

@Component
@RabbitListener(queues = {QueueName.OneToOneQueue}) //监控指定的Queue
public class Receiver {

    @RabbitHandler
    public void process(String revmsg) {
        System.out.println("Receiver  : " + revmsg);
    }

}
Controller
package com.rabbit.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.rabbit.hello.Sender;

@RestController
public class RabbitTest {
    
    @Autowired
    private Sender sender;
    
    @GetMapping("/oneToOne")
    public void oneToOne() {
        sender.send();
    }
}

测试结果:

http://127.0.0.1:8080/oneToOne

Sender: hello rabbitMQ 
Receiver  : Sender: hello rabbitMQ 

至此,单生产者和单消费者整合结束。

*2.单生产者和多消费者

与单消费者类似,其中接收端为多个消费者

package com.rabbit.hello;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.rabbit.queue.QueueName;

@Component
@RabbitListener(queues = {QueueName.OneToManyQueue})//监控指定的Queue, 名称修改,以区分一对一的Queue
public class Receiver1 {

    @RabbitHandler
    public void process(String revmsg) {
        System.out.println("Receiver1  : " + revmsg);
    }
}
package com.rabbit.hello;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.rabbit.queue.QueueName;

@Component
@RabbitListener(queues = {QueueName.OneToManyQueue})//监控指定的Queue, 名称修改,以区分一对一的Queue
public class Receiver2 {

    @RabbitHandler
    public void process(String revmsg) {
        System.out.println("Receiver2  : " + revmsg);
    }
}
Controller
package com.rabbit.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import com.rabbit.hello.Sender;

@RestController
public class RabbitTest {
    
    @Autowired
    private Sender sender;

    
    /**
     * 单生产者-多消费者
     */
    @GetMapping("/oneToMany")
    public void oneToMany() {
        for(int i=0;i<10;i++){
            sender.send("hellomsg:"+i);
        }
        
    }
}

测试结果:

http://127.0.0.1:8080/oneToMany
生产者发送的10条消息,分别被两个消费者接收了

Sender: hello rabbitMQ 
Sender: hello rabbitMQ 
Sender: hello rabbitMQ 
Sender: hello rabbitMQ 
Sender: hello rabbitMQ 
Sender: hello rabbitMQ 
Sender: hello rabbitMQ 
Sender: hello rabbitMQ 
Sender: hello rabbitMQ 
Sender: hello rabbitMQ 
Receiver1  : Sender: hello rabbitMQ 
Receiver2  : Sender: hello rabbitMQ 
Receiver1  : Sender: hello rabbitMQ 
Receiver2  : Sender: hello rabbitMQ 
Receiver1  : Sender: hello rabbitMQ 
Receiver2  : Sender: hello rabbitMQ 
Receiver1  : Sender: hello rabbitMQ 
Receiver2  : Sender: hello rabbitMQ 
Receiver1  : Sender: hello rabbitMQ 
Receiver2  : Sender: hello rabbitMQ 

至此,单生产者和多消费者整合结束。

*3.多生产者和多消费者

跟以上类似,再加个生产者即可

package com.rabbit.hello;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.rabbit.queue.QueueName;

@Component
public class Sender1 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(String msg) {
        String sendMsg = msg;
        System.out.println("Sender1 : " + sendMsg);
        this.rabbitTemplate.convertAndSend(QueueName.ManyToManyQueue, sendMsg);
    }

}
package com.rabbit.hello;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.rabbit.queue.QueueName;

@Component
public class Sender2 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(String msg) {
        String sendMsg = msg;
        System.out.println("Sender2 : " + sendMsg);
        this.rabbitTemplate.convertAndSend(QueueName.ManyToManyQueue, sendMsg);
    }

}
Controller
package com.rabbit.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import com.rabbit.hello.Sender1;
import com.rabbit.hello.Sender2;

@RestController
public class RabbitTest {
    
    @Autowired
    private Sender1 sender1;
    @Autowired
    private Sender2 sender2;
    
    /**
     * 多生产者-多消费者
     */
    @GetMapping("/manyToMany")
    public void manyToMany() {
        for(int i=0;i<10;i++){
            sender1.send("msg-1 :"+i);
            sender2.send("msg-2 :"+i);
        }
        
    }
}
http://127.0.0.1:8080/manyToMany
接收端仍然会均匀接收到消息

Sender1 : msg-1 :0
Sender2 : msg-2 :0
Sender1 : msg-1 :1
Sender2 : msg-2 :1
Sender1 : msg-1 :2
Sender2 : msg-2 :2
Sender1 : msg-1 :3
Sender2 : msg-2 :3
Sender1 : msg-1 :4
Sender2 : msg-2 :4
Sender1 : msg-1 :5
Sender2 : msg-2 :5
Sender1 : msg-1 :6
Sender2 : msg-2 :6
Sender1 : msg-1 :7
Sender2 : msg-2 :7
Sender1 : msg-1 :8
Sender2 : msg-2 :8
Sender1 : msg-1 :9
Sender2 : msg-2 :9
Receiver1  : msg-1 :0
Receiver2  : msg-2 :0
Receiver1  : msg-1 :1
Receiver2  : msg-2 :1
Receiver1  : msg-1 :2
Receiver2  : msg-2 :2
Receiver1  : msg-1 :3
Receiver2  : msg-2 :3
Receiver1  : msg-1 :4
Receiver2  : msg-2 :4
Receiver1  : msg-1 :5
Receiver2  : msg-2 :5
Receiver2  : msg-2 :6
Receiver1  : msg-1 :6
Receiver2  : msg-1 :7
Receiver1  : msg-2 :7
Receiver2  : msg-1 :8
Receiver1  : msg-2 :8
Receiver1  : msg-2 :9
Receiver2  : msg-1 :9

*4. topic ExChange(DirectExchange 与之类似较简单,此处不作实例演示,可参照RPC消息实例。)

启动类修改:

package com.rabbit;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import com.rabbit.queue.QueueName;


@SpringBootApplication
public class SpringbootRabbitmqTopicApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringbootRabbitmqTopicApplication.class, args);
    }

    // 创建队列
    @Bean
    public Queue queueMessage() {
        return new Queue(QueueName.message);//队列名称自己定义在QueueName类里
    }

    // 创建队列
    @Bean
    public Queue queueMessages() {
        return new Queue(QueueName.messages);
    }

    // 创建交换器
    @Bean
    TopicExchange exchange() {
        return new TopicExchange("topicExchange");
    }

    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {//划重点:队列名称与上述方法名一致
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");//同一exchange下的队列绑定不同的routingkey
    }
    
    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//#代表一个或多个匹配,*代表一个字符匹配
    }

}
发送方

//3个不同方法均可用于测试,绑定3个不同的routingkey

package com.rabbit.topic;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.rabbit.queue.QueueName;

@Component
public class TopicSender {
    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String context = "i am testTopic";
        System.out.println("Sender : " + context);
        String routingKey="topic.1";
        this.rabbitTemplate.convertAndSend("topicExchange", routingKey, context);
    }

    public void send1() {
        String context = "i am testTopic 1";
        System.out.println("Sender : " + context);
        String routingKey="topic.message";
        //检查routingKey "topic.message" 是否匹配QueueName.message(topic.message),QueueName.messages(topic.#)中的routingKey
        this.rabbitTemplate.convertAndSend("topicExchange", routingKey,context);
    }
    
    public void send2() {
        String context = "i am testTopic 2";
        System.out.println("Sender : " + context);
        String routingKey="topic.messages";
        this.rabbitTemplate.convertAndSend("topicExchange", routingKey,context);
    }
}
两个接收方

指定不同的队列,测试是单独收到还是同时收到消息

package com.rabbit.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.rabbit.queue.QueueName;

@Component
@RabbitListener(queues = QueueName.message)
public class TopicReceiver1 {
    @RabbitHandler
    public void process(String message) {
        System.out.println("Topic Receiver1 : " + message);
    }
}
package com.rabbit.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.rabbit.queue.QueueName;

@Component
@RabbitListener(queues = QueueName.messages)
public class TopicReceiver2 {
    @RabbitHandler
    public void process(String message) {
        System.out.println("Topic Receiver2 : " + message);
    }
}
Controller
package com.rabbit.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import com.rabbit.topic.TopicSender;

@RestController
public class RabbitTest {
    
    @Autowired
    private TopicSender topicSender;
    
    /**
     * topic exchange类型rabbitmq测试
     */
    @GetMapping("/topicTest")
    public void topicTest() {
           topicSender.send();
    }
    @GetMapping("/topicTest1")
    public void topicTest1() {
           topicSender.send1();
    }
    @GetMapping("/topicTest2")
    public void topicTest2() {
           topicSender.send2();
    }
}
测试结果(routingKey-队列1:topic.message 队列2:topic.messages)
http://localhost:8080/topicTest
Sender : i am testTopic
Topic Receiver2 : i am testTopic
发送方的routingkey 是topic.1不匹配第一个队列绑定的routingkey
而第二个队列routingkey 是topic.# 模糊匹配,所以绑定第二个队列的接收端能收到消息
--------------------------------------------
http://localhost:8080/topicTest1
Sender : i am testTopic 1
Topic Receiver1 : i am testTopic 1
Topic Receiver2 : i am testTopic 1
发送方的routingkey是topic.message,绑定的两个队列的routingKey均匹配,则都能收到
---------------------------------------------
http://localhost:8080/topicTest2
Sender : i am testTopic 2
Topic Receiver2 : i am testTopic 2
发送方的routingkey是topic.messages ,不匹配第一个队列routingKey,而第二个模糊匹配,则能收到

至此topic相关结束。

*5. fanout ExChange

Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout转发器发送消息,绑定这个转发器的所有队列都收到这个消息。

启动类(3个队列绑定FanoutExchange ):
package com.rabbit;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class SpringbootRabbitmqFanoutApplication {
    
      @Bean
        public Queue AMessage() {
            return new Queue("fanout.A");
        }

        @Bean
        public Queue BMessage() {
            return new Queue("fanout.B");
        }

        @Bean
        public Queue CMessage() {
            return new Queue("fanout.C");
        }

        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange");
        }

        @Bean
        Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(AMessage).to(fanoutExchange);
        }

        @Bean
        Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(BMessage).to(fanoutExchange);
        }

        @Bean
        Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(CMessage).to(fanoutExchange);
        }


    public static void main(String[] args) {
        SpringApplication.run(SpringbootRabbitmqFanoutApplication.class, args);
    }
}
发送方:
package com.rabbit.fanout;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class FanoutSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String msgString="fanoutSender : I am fanoutSender";
        System.out.println(msgString);
        this.rabbitTemplate.convertAndSend("fanoutExchange","hehe" ,msgString);//routingkey任意,去除routingKey则不生效
    }

}
接收方
package com.rabbit.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverA  : " + msg);
    }

} 
package com.rabbit.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverB  : " + msg);
    }

} 
package com.rabbit.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutReceiverC  : " + msg);
    }

} 
Controller:
package com.rabbit.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import com.rabbit.fanout.FanoutSender;

@RestController
public class RabbitTest {
    
    @Autowired
    private FanoutSender fanoutSender;
    @Autowired
    
    /**
     * fanout exchange类型rabbitmq测试
     */
    @GetMapping("/fanoutTest")
    public void fanoutTest() {
           fanoutSender.send();
    }
}
测试结果
http://localhost:8080/fanoutTest

fanoutSender : I am fanoutSender
FanoutReceiverB  : fanoutSender : I am fanoutSender
FanoutReceiverC  : fanoutSender : I am fanoutSender
FanoutReceiverA  : fanoutSender : I am fanoutSender

至此fanout exchange结束

*6. RPC(callback)

启动类
package com.rabbit;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import com.rabbit.queue.QueueName;

@SpringBootApplication
public class SpringbootRabbitmqCallbackApplication {
    
    /** 设置交换机类型 */
    @Bean
    public DirectExchange defaultExchange() {
        /**
         * DirectExchange:按照routingkey分发到指定队列 
         * TopicExchange:多关键字匹配
         * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念 
         * HeadersExchange:通过添加属性key-value匹配
         */
        return new DirectExchange(QueueName.FOO_EXCHANGE);
    }

    @Bean
    public Queue fooQueue() {
        return new Queue(QueueName.FOO_QUEUE);
    }

    @Bean
    public Binding binding() {
        /** 将队列绑定到交换机 */
        return BindingBuilder.bind(fooQueue()).to(defaultExchange()).with(QueueName.FOO_ROUTINGKEY);
    }

    public static void main(String[] args) {
        SpringApplication.run(SpringbootRabbitmqCallbackApplication.class, args);
    }
}

增加回调处理,这里不再使用application.properties默认配置的方式,会在程序中显示的使用文件中的配置信息。

package com.rabbitmq.config;  
  
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;  
import org.springframework.amqp.rabbit.connection.ConnectionFactory;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.beans.factory.annotation.Value;  
import org.springframework.beans.factory.config.ConfigurableBeanFactory;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
import org.springframework.context.annotation.Scope;  
  
@Configuration  
public class AmqpConfig {  
  
    @Value("${spring.rabbitmq.addresses}")  
    private String addresses;  
    @Value("${spring.rabbitmq.username}")  
    private String username;  
    @Value("${spring.rabbitmq.password}")  
    private String password;  
    @Value("${spring.rabbitmq.virtual-host}")  
    private String virtualHost;  
    @Value("${spring.rabbitmq.publisher-confirms}")  
    private boolean publisherConfirms;  
  
    @Bean  
    public ConnectionFactory connectionFactory() {  
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();  
        connectionFactory.setAddresses(addresses);  
        connectionFactory.setUsername(username);  
        connectionFactory.setPassword(password);  
        connectionFactory.setVirtualHost(virtualHost);  
        /** 如果要进行消息回调,则这里必须要设置为true */  
        connectionFactory.setPublisherConfirms(publisherConfirms);  
        return connectionFactory;  
    }  
  
    @Bean  
    /** 因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置 */  
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)  
    public RabbitTemplate rabbitTemplate() {  
        RabbitTemplate template = new RabbitTemplate(connectionFactory());  
        return template;  
    }  
  
}  
发送方
package com.rabbit.callback;  
  
import java.util.UUID;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.rabbit.queue.QueueName;
  
@Component  
public class Sender implements RabbitTemplate.ConfirmCallback {  //划重点 必须实现该接口
  
    private RabbitTemplate rabbitTemplate;  
  
    @Autowired  
    public Sender(RabbitTemplate rabbitTemplate) {  
        this.rabbitTemplate = rabbitTemplate;  
        this.rabbitTemplate.setConfirmCallback(this);  
    }  
  
    public void send(String msg) {  
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());  
        System.out.println("send: " + correlationData.getId());  
        this.rabbitTemplate.convertAndSend(QueueName.FOO_EXCHANGE, QueueName.FOO_ROUTINGKEY, msg, correlationData);  
    }  
  
    /** 回调方法 */  
    @Override  
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {  
        System.out.println("confirm: " + correlationData.getId());  
    }  
}  
接收方
package com.rabbit.callback;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Configuration;

import com.rabbit.queue.QueueName;

@Configuration
@RabbitListener(queues = QueueName.FOO_QUEUE)
public class Receiver {

    @RabbitHandler
    public void process(String foo) {
        System.out.println("Receiver: " + foo);
    }
}
相关名称定义
package com.rabbit.queue;

public class QueueName {
    
    public static final String FOO_EXCHANGE   = "callback.exchange.foo";  
    public static final String FOO_ROUTINGKEY = "callback.routingkey.foo";  
    public static final String FOO_QUEUE      = "callback.queue.foo";  

}

Controller
package com.rabbit.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import com.rabbit.callback.Sender;
  
@RestController  
public class RabbitTest {  
  
    @Autowired  
    private Sender sender;  
  
    @GetMapping("/callback")  
    public void send() {  
        sender.send("this is callback message");  
    }  
}  
测试结果
http://localhost:8080/callback

send: dc9243c5-b524-4f8c-a0c5-248018ecf66b
confirm: dc9243c5-b524-4f8c-a0c5-248018ecf66b
Receiver: this is callback message

至此Callback(RPC)结束

说明:rabbitmq中消息的发送也支持对象的传递,只需对象实现Serializable接口即可。

Springboot与RabbitMQ的整合部分结束,感谢。

部分内容参照:
http://www.cnblogs.com/boshen-hzb/p/6841982.html
http://blog.csdn.net/zl18310999566/article/details/54341057
https://www.rabbitmq.com/getstarted.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,928评论 6 509
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,748评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,282评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,065评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,101评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,855评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,521评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,414评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,931评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,053评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,191评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,873评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,529评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,074评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,188评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,491评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,173评论 2 357

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,673评论 18 139
  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,365评论 2 34
  • RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。 消息...
    彩虹之梦阅读 1,086评论 2 1
  • 什么叫消息队列 消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂...
    lijun_m阅读 1,350评论 0 1
  • 小老郑阅读 272评论 0 0