一、RabbitMQ整体架构:
组成部分说明如下:
-
Broker
:标识消息队列服务器实体【消息队列服务进程】,此进程包括两个部分Exchange
和Queue
。 -
Exchange
:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。 -
Queue
:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。 -
Banding
:绑定,用于消息队列和交换机之间的关联。一个绑定就是基于路由键将交换机和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 -
Channel
:信道,多路复用连接中的一条独立的双向数据流通道。新到是建立在真实的TCP连接内地虚拟链接,AMQP命令都是通过新到发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说,建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。
类似概念:TCP是电缆,信道就是里面的光纤,每个光纤都是独立的,互不影响。 -
Connection
:网络连接,比如一个TCP连接。 -
Producer
:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。 -
Consumer
:消息消费者,即消费方客户端,接收MQ转发的消息。 -
Message
:消息,消息是不具名的,它是由消息头和消息体组成。消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(优先级)、delivery-mode(消息可能需要持久性存储[消息的路由模式])等。
二、消息发布接收流程:
1. 生产者发送消息流程:
1、生产者连接到RabbitMQ Broker,建立一个连接connection,开启一个信道channel
2、生产者声明一个交换器,并设置相关属性,如交换机类型、是否持久化等
3、生产者声明一个队列并设置相关属性,如是否排他、是否持久化、是否自动删除等
4、生产者通过路由键将交换器和队列绑定起来
5、生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息
6、相应的交换器根据接收到的路由键查找相匹配的队列
7、如果找到,将消息存入相应的队列中
8、如果没有找到,根据生产者配置的属性选择丢弃或者退回给生产者
9、关闭信道
10、关闭连接
2. 消费者接收消息的过程
1、消费者连接到RabbitMQ Broker,建立一个连接connection,开启一个信道channel
2、消费者向RabbitMQ Broker请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作
3、等待RabbitMQ Broker回应并投递相应队列中的消息,消费者接收消息
4、消费者确认ack接收到的消息
5、RabbitMQ从队列中删除相应已经被确认的消息
6、关闭信道
7、关闭连接
三、几个概念
1. exchange 交换机和绑定routing key
exchange的作用就是类似路由器,routing key 就是路由键,服务器会根据路由键将消息从交换器路由到队列上去。
2. vhost 是什么?起什么作用?
vhost 可以理解为虚拟 broker
,即 mini-RabbitMQ server。其内部均含有独立的queue
、exchange
和 binding
等,但最最重要的是,其拥有独立的权限系统
,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离
的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。
3. RabbitMQ 上的一个 queue 中存放的 message 是否有数量限制?
可以认为是无限制
,因为限制取决于机器的内存,但是消息过多会导致处理效率的下降。
4. channel、exchange 和 queue
queue 具有自己的 erlang 进程;exchange 内部实现为保存 binding 关系的查找表;
channel 是实际进行路由工作的实体,即负责按照 routing_key 将 message 投递给queue 。由 AMQP 协议描述可知,channel 是真实 TCP 连接之上的虚拟连接,所有
AMQP 命令都是通过 channel 发送的,且每一个 channel 有唯一的 ID。一个 channel 只能被单独一个操作系统线程使用,故投递到特定 channel 上的 message 是有顺序的。
5. RabbitMQ 允许发送的 message 最大可达多大?
RabbitMQ是基于amqp协议的实现,amqp0-9-1协议中规定了消息的大小是无限制
的。原文:
To handle messages of any size without significant limit.
客户端与RabbitMQ服务端的最大帧是128K,但消息大小却可支持数MB,这是因为底层做了拆包组包的.效率不是太高,频繁组包容易引起rabbitmq服务down掉。
四、工作模式
exchange有多个种类:
direct
,fanout
,topic
,header
(非路由键匹配,功能和direct类似,很少用)。
- Direct Exchange
1:1
直连型交换机
,Direct Exchange 是 RabbitMQ 默认的 Exchange,完全根据 RoutingKey 来路由消息。设置 Exchange 和 Queue 的 Binding 时需指定 RoutingKey(一般为 Queue Name),发消息时也指定一样的 RoutingKey,消息就会被路由到对应的Queue。- Fanout Exchange
1:N
扇型交换机
,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。- Topic Exchange
N:1
主题交换机
,Topic Exchange 和 Direct Exchange 类似,也需要通过 RoutingKey 来路由消息,区别在于Direct Exchange 对 RoutingKey 是精确匹配,而 Topic Exchange 支持模糊匹配。分别支持 * 和 # 通配符,* 表示匹配一个单词, # 则表示匹配没有或者多个单词。
-
简单模式
:一个生产者,一个消费者。【不用定义交换机】
1)消息产生者将消息放入队列
2)消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除
3)应用场景:聊天 -
work工作模式(资源的竞争)
:一个生产者,多个消费者,每个消费者获取到的消息唯一。【不用定义交换机】
1)消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2,同时监听同一个队列,消息被消费?C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患,高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize,与同步锁的性能不一样) 保证一条消息只能被一个消费者使用)
2)应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢) -
publish&subscribe/发布订阅(共享资源)
:一个生产者发送的消息会被多个消费者获取。【扇形交换机】
1)X代表交换机,消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费
2)相关场景:邮件群发,群聊天,广播(广告) -
routing路由模式
:发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key【直连交换机】
1)消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;
2)根据业务功能定义路由字符串
3)从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误; -
topic 主题模式(路由模式的一种)
:将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,* 号代表一个单词,#号代表零个或多个单词。【主题交换机】
1)星号井号代表通配符
2)* 号代表一个单词,#号代表零个或多个单词
3)路由功能添加模糊匹配
4)消息产生者产生消息,把消息交给交换机
5)交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费
五、SpringBoot整合RibbitMQ
使用spring-boot-starter-amqp会自动添加spring-rabbit依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐amqp</artifactId>
</dependency>
配置application.yml
server:
port: 8021
spring:
application:
name: rabbitmq-provider
#配置rabbitMq 服务器
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: root
#虚拟host 可以不设置,使用server默认host
virtual-host: JCcccHost
1. 简单使用
1)队列配置
@Configuration
public class QueueConfig {
@Bean
public Queue simpleQueue() {
return new Queue("simple");
}
}
2)消息提供者
@Component
public class SimpleSend {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private AmqpTemplate amqpTemplate;
public void send() {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String message = "Hello Spring Boot " + simpleDateFormat.format(new Date());
amqpTemplate.convertAndSend("simple", message);
logger.info("消息推送成功!");
}
}
3)消息消费者
@Component
@RabbitListener(queues = "simple")
public class SimpleReceive {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@RabbitHandler
public void process(String message) {
logger.info("Receive :{}", message);
}
}
2. Topic Exchange
@Configuration
public class TopicConfig {
private final String message = "topic.message";
private final String messages = "topic.messages";
@Bean
public Queue queueMessage() {
return new Queue(this.message);
}
@Bean
public Queue queueMessages() {
return new Queue(this.messages);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
/**
*这里队列 queueMessages 可以同时匹配两个 route_key ,而队列 queueMessage 只能匹配 topic.message 。
**/
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
}
消息的生产者
@Component
public class TopicSend {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private AmqpTemplate rabbitTemplate;
/**
*调用 send1() 消息会由 Exchange 同时转发到两个队列, 而调用 send2() 则只会转发至 receive2 。
**/
public void send1() {
String message = "message 1";
logger.info("send:{}", message);
rabbitTemplate.convertAndSend("topicExchange", "topic.message", message);
}
public void send2() {
String message = "message 2";
logger.info("send:{}", message);
rabbitTemplate.convertAndSend("topicExchange", "topic.messages", message);
}
}
3. Fanout Exchange
@Configuration
public class FanoutConfig {
@Bean
public Queue MessageA() {
return new Queue("fanout.A");
}
@Bean
public Queue MessageB() {
return new Queue("fanout.B");
}
@Bean
public Queue MessageC() {
return new Queue("fanout.C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
@Bean
Binding bindingExchangeA(Queue MessageA, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(MessageA).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue MessageB, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(MessageB).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue MessageC, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(MessageC).to(fanoutExchange);
}
}
消息生产者
@Component
public class FanoutSend {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String message = "Hello FanoutSend.";
logger.info("send:{}", message);
this.rabbitTemplate.convertAndSend("fanoutExchange","", message);
}
}