简介
简单地说,RabbitMQ就是一个消息中间件,负责接收消息和异步处理消息。
一条消息从生产到被消费要经历了以下几个步骤:
- 生产者(Producer)产生一条消息(Message),也就是客户端调用一个发送消息函数。
- 消息通过交换机名称找到对应的交换机(Exchange)。
- 交换机对到来的消息进行分发,根据路由键(Routing Key)去匹配绑定到该交换机的队列的绑定键(Binding Key),将消息转发到所有匹配的队列。
- 队列(Queue)接收到消息,被监听该队列的消费者(Cunsumer)消费。
最简单的消息发送消费样例
pom导入
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
application.yml 配置
spring:
rabbitmq:
port: 5672 #端口号
host: localhost #ip地址
username: guest #用户名
password: guest #密码
创建一条队列,同时监听队列
/**
* 监听消息队列queue,异步消费。注意方法不能返回对象,否则会报错 Failed to send reply with payload ...
*/
@RabbitListener(queuesToDeclare = @Queue(name = "queue"))
public void receive(String msg) {
log.info("接收消息:{} ", msg);
}
创建队列需要以下几个参数:
参数 | 含义 | 默认值 |
---|---|---|
name | 队列名 | |
durable | 是否可持久化。可持久化的队列在RabbitMQ挂掉后,队列中未被消费的消息不会丢失。 | true |
exclusive | 是否排他的。只有创建这条队列的的连接有权访问,连接断开后,排他队列将自动删除。 | false |
autoDelete | 是否自动删除。当没有消费者监听这条队列时,队列会被自动删除,不管队列中是否存在未被消费的消息 。 | false |
arguments | 创建队列可以附带一些参数,如消息过期时间(x-message-ttl)、成为死信队列(x-dead-letter-exchange)等等 。 |
向队列queue发送一条消息
@Scheduled(fixedRate = 5000)
public void send() {
rabbitTemplate.convertAndSend("queue", "hello");
}
这里开启了一个定时调度任务,每隔5秒向queue发送一条hello消息。关于SpringBoot的定时调度任务,可以参考这篇文章。
有几点要说明的是:
- 当不传exchange时,是通过Default Exchange去找到对应的队列的,Default Exchange自动绑定所有的队列,并且Routing Key等于队列名。
- 如果没有消费者去消费的话,消息会堆积在队列中,为准备(ready)状态。
- 如果队列名没有找到,则消息无法成功送达,会直接丢失。
- 消费时报错异常,默认的策略是重新返回队列中。
控制台输出
2019-10-29 10:31:11.650 INFO 996 --- [cTaskExecutor-1] o.j.demo.rabbitmq.RabbitmqApplication : 接收消息:hello
2019-10-29 10:31:16.650 INFO 996 --- [cTaskExecutor-1] o.j.demo.rabbitmq.RabbitmqApplication : 接收消息:hello
2019-10-29 10:31:21.650 INFO 996 --- [cTaskExecutor-1] o.j.demo.rabbitmq.RabbitmqApplication : 接收消息:hello
接收对象
导入pom
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.61</version>
</dependency>
序列化以及反序列化
@RabbitListener(queuesToDeclare = @Queue(name = "queue.obj"))
public void receiveObj(String msg) {
User user = JSONObject.parseObject(msg, User.class);
log.info("接收对象:{} ", user);
}
@Scheduled(fixedRate = 5000)
public void sendObj() {
rabbitTemplate.convertAndSend("queue.obj", JSONObject.toJSONString(new User(1, "name")));
}
@Data
@AllArgsConstructor
public static class User {
private Integer id;
private String name;
}
直接用Fastjson进行序列化和反序列化即可。这种方法不需要修改rabbitTemplate默认的序列化配置,简单又快捷。
全部代码演示
package org.jiangzhe.demo.rabbitmq;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
@Slf4j
@EnableScheduling
@SpringBootApplication
public class RabbitmqApplication {
@Autowired
RabbitTemplate rabbitTemplate;
public static void main(String[] args) {
SpringApplication.run(RabbitmqApplication.class, args);
}
@RabbitListener(queuesToDeclare = @Queue(name = "queue"))
public void receiveString(String msg) {
log.info("接收消息:{} ", msg);
}
@RabbitListener(queuesToDeclare = @Queue(name = "queue.obj"))
public void receiveObj(String msg) {
User user = JSONObject.parseObject(msg, User.class);
log.info("接收对象:{} ", user);
}
@Scheduled(fixedRate = 5000)
public void send() {
rabbitTemplate.convertAndSend("queue", "hello");
rabbitTemplate.convertAndSend("queue.obj", JSONObject.toJSONString(new User(1, "name")));
}
@Data
@AllArgsConstructor
public static class User {
private Integer id;
private String name;
}
}