首先需弄明白RabbitMq的 exchange、route、queue的关系
可参照:(http://blog.csdn.net/samxx8/article/details/47417133)
RabbitMQ 三种Exchange
- Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
- Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
- Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。
以下6个实例展示大部分应用场景:
*1.单生产者和单消费者
新建springboot maven工程 springboot-rabbitmq-OneToOne
pom.xml配置如下:
其中spring-boot-starter-amqp 是 rabbitmq 核心包。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dev</groupId>
<artifactId>springboot-rabbitmq-OneToOne</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>springboot-rabbitmq-demo</name>
<description>springboot-rabbitmq-demo</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.4.RELEASE</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
</project>
application.yml配置如下:
spring:
application:
name: rabbit-service
rabbitmq:
host: xxx.xxx.xxx.xxx #rabbitmq服务器地址
port: xxxx #rabbitmq服务器端口
username: xxx
password: xxx
主类
package com.rabbit;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import com.rabbit.queue.QueueName;
@SpringBootApplication
public class SpringbootRabbitmqDemoApplication {
@Bean
public Queue helloQueue() {//划重点:自动创建Queue
return new Queue(QueueName.OneToOneQueue);
}
public static void main(String[] args) {
SpringApplication.run(SpringbootRabbitmqDemoApplication.class, args);
}
}
Queue名称定义
package com.rabbit.queue;
public class QueueName {
//此处定义必须是final不然会报错
//The value for annotation attribute RabbitListener.queues must be a constant expression
public final static String OneToOneQueue="OneToOneQueue";
}
发送端
package com.rabbit.hello;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.rabbit.queue.QueueName;
@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String sendMsg = "Sender: hello rabbitMQ ";
System.out.println(sendMsg);
//向指定Queue发送文本消息
this.rabbitTemplate.convertAndSend(QueueName.OneToOneQueue, sendMsg);
}
}
接收端
package com.rabbit.hello;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbit.queue.QueueName;
@Component
@RabbitListener(queues = {QueueName.OneToOneQueue}) //监控指定的Queue
public class Receiver {
@RabbitHandler
public void process(String revmsg) {
System.out.println("Receiver : " + revmsg);
}
}
Controller
package com.rabbit.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.rabbit.hello.Sender;
@RestController
public class RabbitTest {
@Autowired
private Sender sender;
@GetMapping("/oneToOne")
public void oneToOne() {
sender.send();
}
}
测试结果:
http://127.0.0.1:8080/oneToOne
Sender: hello rabbitMQ
Receiver : Sender: hello rabbitMQ
至此,单生产者和单消费者整合结束。
*2.单生产者和多消费者
与单消费者类似,其中接收端为多个消费者
package com.rabbit.hello;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbit.queue.QueueName;
@Component
@RabbitListener(queues = {QueueName.OneToManyQueue})//监控指定的Queue, 名称修改,以区分一对一的Queue
public class Receiver1 {
@RabbitHandler
public void process(String revmsg) {
System.out.println("Receiver1 : " + revmsg);
}
}
package com.rabbit.hello;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbit.queue.QueueName;
@Component
@RabbitListener(queues = {QueueName.OneToManyQueue})//监控指定的Queue, 名称修改,以区分一对一的Queue
public class Receiver2 {
@RabbitHandler
public void process(String revmsg) {
System.out.println("Receiver2 : " + revmsg);
}
}
Controller
package com.rabbit.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.rabbit.hello.Sender;
@RestController
public class RabbitTest {
@Autowired
private Sender sender;
/**
* 单生产者-多消费者
*/
@GetMapping("/oneToMany")
public void oneToMany() {
for(int i=0;i<10;i++){
sender.send("hellomsg:"+i);
}
}
}
测试结果:
http://127.0.0.1:8080/oneToMany
生产者发送的10条消息,分别被两个消费者接收了
Sender: hello rabbitMQ
Sender: hello rabbitMQ
Sender: hello rabbitMQ
Sender: hello rabbitMQ
Sender: hello rabbitMQ
Sender: hello rabbitMQ
Sender: hello rabbitMQ
Sender: hello rabbitMQ
Sender: hello rabbitMQ
Sender: hello rabbitMQ
Receiver1 : Sender: hello rabbitMQ
Receiver2 : Sender: hello rabbitMQ
Receiver1 : Sender: hello rabbitMQ
Receiver2 : Sender: hello rabbitMQ
Receiver1 : Sender: hello rabbitMQ
Receiver2 : Sender: hello rabbitMQ
Receiver1 : Sender: hello rabbitMQ
Receiver2 : Sender: hello rabbitMQ
Receiver1 : Sender: hello rabbitMQ
Receiver2 : Sender: hello rabbitMQ
至此,单生产者和多消费者整合结束。
*3.多生产者和多消费者
跟以上类似,再加个生产者即可
package com.rabbit.hello;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.rabbit.queue.QueueName;
@Component
public class Sender1 {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String msg) {
String sendMsg = msg;
System.out.println("Sender1 : " + sendMsg);
this.rabbitTemplate.convertAndSend(QueueName.ManyToManyQueue, sendMsg);
}
}
package com.rabbit.hello;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.rabbit.queue.QueueName;
@Component
public class Sender2 {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send(String msg) {
String sendMsg = msg;
System.out.println("Sender2 : " + sendMsg);
this.rabbitTemplate.convertAndSend(QueueName.ManyToManyQueue, sendMsg);
}
}
Controller
package com.rabbit.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.rabbit.hello.Sender1;
import com.rabbit.hello.Sender2;
@RestController
public class RabbitTest {
@Autowired
private Sender1 sender1;
@Autowired
private Sender2 sender2;
/**
* 多生产者-多消费者
*/
@GetMapping("/manyToMany")
public void manyToMany() {
for(int i=0;i<10;i++){
sender1.send("msg-1 :"+i);
sender2.send("msg-2 :"+i);
}
}
}
http://127.0.0.1:8080/manyToMany
接收端仍然会均匀接收到消息
Sender1 : msg-1 :0
Sender2 : msg-2 :0
Sender1 : msg-1 :1
Sender2 : msg-2 :1
Sender1 : msg-1 :2
Sender2 : msg-2 :2
Sender1 : msg-1 :3
Sender2 : msg-2 :3
Sender1 : msg-1 :4
Sender2 : msg-2 :4
Sender1 : msg-1 :5
Sender2 : msg-2 :5
Sender1 : msg-1 :6
Sender2 : msg-2 :6
Sender1 : msg-1 :7
Sender2 : msg-2 :7
Sender1 : msg-1 :8
Sender2 : msg-2 :8
Sender1 : msg-1 :9
Sender2 : msg-2 :9
Receiver1 : msg-1 :0
Receiver2 : msg-2 :0
Receiver1 : msg-1 :1
Receiver2 : msg-2 :1
Receiver1 : msg-1 :2
Receiver2 : msg-2 :2
Receiver1 : msg-1 :3
Receiver2 : msg-2 :3
Receiver1 : msg-1 :4
Receiver2 : msg-2 :4
Receiver1 : msg-1 :5
Receiver2 : msg-2 :5
Receiver2 : msg-2 :6
Receiver1 : msg-1 :6
Receiver2 : msg-1 :7
Receiver1 : msg-2 :7
Receiver2 : msg-1 :8
Receiver1 : msg-2 :8
Receiver1 : msg-2 :9
Receiver2 : msg-1 :9
*4. topic ExChange(DirectExchange 与之类似较简单,此处不作实例演示,可参照RPC消息实例。)
启动类修改:
package com.rabbit;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import com.rabbit.queue.QueueName;
@SpringBootApplication
public class SpringbootRabbitmqTopicApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootRabbitmqTopicApplication.class, args);
}
// 创建队列
@Bean
public Queue queueMessage() {
return new Queue(QueueName.message);//队列名称自己定义在QueueName类里
}
// 创建队列
@Bean
public Queue queueMessages() {
return new Queue(QueueName.messages);
}
// 创建交换器
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {//划重点:队列名称与上述方法名一致
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");//同一exchange下的队列绑定不同的routingkey
}
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//#代表一个或多个匹配,*代表一个字符匹配
}
}
发送方
//3个不同方法均可用于测试,绑定3个不同的routingkey
package com.rabbit.topic;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.rabbit.queue.QueueName;
@Component
public class TopicSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "i am testTopic";
System.out.println("Sender : " + context);
String routingKey="topic.1";
this.rabbitTemplate.convertAndSend("topicExchange", routingKey, context);
}
public void send1() {
String context = "i am testTopic 1";
System.out.println("Sender : " + context);
String routingKey="topic.message";
//检查routingKey "topic.message" 是否匹配QueueName.message(topic.message),QueueName.messages(topic.#)中的routingKey
this.rabbitTemplate.convertAndSend("topicExchange", routingKey,context);
}
public void send2() {
String context = "i am testTopic 2";
System.out.println("Sender : " + context);
String routingKey="topic.messages";
this.rabbitTemplate.convertAndSend("topicExchange", routingKey,context);
}
}
两个接收方
指定不同的队列,测试是单独收到还是同时收到消息
package com.rabbit.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbit.queue.QueueName;
@Component
@RabbitListener(queues = QueueName.message)
public class TopicReceiver1 {
@RabbitHandler
public void process(String message) {
System.out.println("Topic Receiver1 : " + message);
}
}
package com.rabbit.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbit.queue.QueueName;
@Component
@RabbitListener(queues = QueueName.messages)
public class TopicReceiver2 {
@RabbitHandler
public void process(String message) {
System.out.println("Topic Receiver2 : " + message);
}
}
Controller
package com.rabbit.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.rabbit.topic.TopicSender;
@RestController
public class RabbitTest {
@Autowired
private TopicSender topicSender;
/**
* topic exchange类型rabbitmq测试
*/
@GetMapping("/topicTest")
public void topicTest() {
topicSender.send();
}
@GetMapping("/topicTest1")
public void topicTest1() {
topicSender.send1();
}
@GetMapping("/topicTest2")
public void topicTest2() {
topicSender.send2();
}
}
测试结果(routingKey-队列1:topic.message 队列2:topic.messages)
http://localhost:8080/topicTest
Sender : i am testTopic
Topic Receiver2 : i am testTopic
发送方的routingkey 是topic.1不匹配第一个队列绑定的routingkey
而第二个队列routingkey 是topic.# 模糊匹配,所以绑定第二个队列的接收端能收到消息
--------------------------------------------
http://localhost:8080/topicTest1
Sender : i am testTopic 1
Topic Receiver1 : i am testTopic 1
Topic Receiver2 : i am testTopic 1
发送方的routingkey是topic.message,绑定的两个队列的routingKey均匹配,则都能收到
---------------------------------------------
http://localhost:8080/topicTest2
Sender : i am testTopic 2
Topic Receiver2 : i am testTopic 2
发送方的routingkey是topic.messages ,不匹配第一个队列routingKey,而第二个模糊匹配,则能收到
至此topic相关结束。
*5. fanout ExChange
Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout转发器发送消息,绑定这个转发器的所有队列都收到这个消息。
启动类(3个队列绑定FanoutExchange ):
package com.rabbit;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
@SpringBootApplication
public class SpringbootRabbitmqFanoutApplication {
@Bean
public Queue AMessage() {
return new Queue("fanout.A");
}
@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}
@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
public static void main(String[] args) {
SpringApplication.run(SpringbootRabbitmqFanoutApplication.class, args);
}
}
发送方:
package com.rabbit.fanout;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class FanoutSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String msgString="fanoutSender : I am fanoutSender";
System.out.println(msgString);
this.rabbitTemplate.convertAndSend("fanoutExchange","hehe" ,msgString);//routingkey任意,去除routingKey则不生效
}
}
接收方
package com.rabbit.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {
@RabbitHandler
public void process(String msg) {
System.out.println("FanoutReceiverA : " + msg);
}
}
package com.rabbit.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {
@RabbitHandler
public void process(String msg) {
System.out.println("FanoutReceiverB : " + msg);
}
}
package com.rabbit.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {
@RabbitHandler
public void process(String msg) {
System.out.println("FanoutReceiverC : " + msg);
}
}
Controller:
package com.rabbit.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.rabbit.fanout.FanoutSender;
@RestController
public class RabbitTest {
@Autowired
private FanoutSender fanoutSender;
@Autowired
/**
* fanout exchange类型rabbitmq测试
*/
@GetMapping("/fanoutTest")
public void fanoutTest() {
fanoutSender.send();
}
}
测试结果
http://localhost:8080/fanoutTest
fanoutSender : I am fanoutSender
FanoutReceiverB : fanoutSender : I am fanoutSender
FanoutReceiverC : fanoutSender : I am fanoutSender
FanoutReceiverA : fanoutSender : I am fanoutSender
至此fanout exchange结束
*6. RPC(callback)
启动类
package com.rabbit;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import com.rabbit.queue.QueueName;
@SpringBootApplication
public class SpringbootRabbitmqCallbackApplication {
/** 设置交换机类型 */
@Bean
public DirectExchange defaultExchange() {
/**
* DirectExchange:按照routingkey分发到指定队列
* TopicExchange:多关键字匹配
* FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
* HeadersExchange:通过添加属性key-value匹配
*/
return new DirectExchange(QueueName.FOO_EXCHANGE);
}
@Bean
public Queue fooQueue() {
return new Queue(QueueName.FOO_QUEUE);
}
@Bean
public Binding binding() {
/** 将队列绑定到交换机 */
return BindingBuilder.bind(fooQueue()).to(defaultExchange()).with(QueueName.FOO_ROUTINGKEY);
}
public static void main(String[] args) {
SpringApplication.run(SpringbootRabbitmqCallbackApplication.class, args);
}
}
增加回调处理,这里不再使用application.properties默认配置的方式,会在程序中显示的使用文件中的配置信息。
package com.rabbitmq.config;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
@Configuration
public class AmqpConfig {
@Value("${spring.rabbitmq.addresses}")
private String addresses;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
@Value("${spring.rabbitmq.publisher-confirms}")
private boolean publisherConfirms;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(addresses);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
/** 如果要进行消息回调,则这里必须要设置为true */
connectionFactory.setPublisherConfirms(publisherConfirms);
return connectionFactory;
}
@Bean
/** 因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置 */
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
}
发送方
package com.rabbit.callback;
import java.util.UUID;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.rabbit.queue.QueueName;
@Component
public class Sender implements RabbitTemplate.ConfirmCallback { //划重点 必须实现该接口
private RabbitTemplate rabbitTemplate;
@Autowired
public Sender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
this.rabbitTemplate.setConfirmCallback(this);
}
public void send(String msg) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
System.out.println("send: " + correlationData.getId());
this.rabbitTemplate.convertAndSend(QueueName.FOO_EXCHANGE, QueueName.FOO_ROUTINGKEY, msg, correlationData);
}
/** 回调方法 */
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm: " + correlationData.getId());
}
}
接收方
package com.rabbit.callback;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Configuration;
import com.rabbit.queue.QueueName;
@Configuration
@RabbitListener(queues = QueueName.FOO_QUEUE)
public class Receiver {
@RabbitHandler
public void process(String foo) {
System.out.println("Receiver: " + foo);
}
}
相关名称定义
package com.rabbit.queue;
public class QueueName {
public static final String FOO_EXCHANGE = "callback.exchange.foo";
public static final String FOO_ROUTINGKEY = "callback.routingkey.foo";
public static final String FOO_QUEUE = "callback.queue.foo";
}
Controller
package com.rabbit.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.rabbit.callback.Sender;
@RestController
public class RabbitTest {
@Autowired
private Sender sender;
@GetMapping("/callback")
public void send() {
sender.send("this is callback message");
}
}
测试结果
http://localhost:8080/callback
send: dc9243c5-b524-4f8c-a0c5-248018ecf66b
confirm: dc9243c5-b524-4f8c-a0c5-248018ecf66b
Receiver: this is callback message
至此Callback(RPC)结束
说明:rabbitmq中消息的发送也支持对象的传递,只需对象实现Serializable接口即可。
Springboot与RabbitMQ的整合部分结束,感谢。
部分内容参照:
http://www.cnblogs.com/boshen-hzb/p/6841982.html
http://blog.csdn.net/zl18310999566/article/details/54341057
https://www.rabbitmq.com/getstarted.html