0. 背景
消息队列(Message Queue)提供一个异步通信机制,消息的发送者不必苦苦等待着消息被处理完成,转而继续自己的工作。消息中间件负责处理网络通信,如果网络连接不可用,消息被暂存于队列当中,当网络畅通的时候再用。消息队列在企业中应用很广泛,可选择的有ActiveMQ、RabbitMQ,Kafka,阿里巴巴自主开发RocketMQ等。本文讨论 RabbitMQ 。
1.介绍
欲了解 RabbitMQ 先要了解 MQ。 RabbitMQ 是 MQ 的一种实现。
1.1 MQ 介绍
MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。一般用来解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。
它由这些组成:
- 生产者:生产者产生消息,并把消息放入队列。
- 队列:把要传输的数据(消息)放在队列中,用队列机制来实现消息传递。生产者产生消息并把消息放入队列,然后由消费者去处理。
- 消费者:消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。
作用:
- 解耦:对应用/模块进行分离,引入了消息代理作为中间消息服务
- 异步:主业务执行中将消息放入MQ后不等待,从业务异步执行返回结果。
- 削峰:高并发情况下,业务异步处理,提供高峰期业务处理能力,避免系统瘫痪
RabbitMQ 是 MQ 的一种实现,下面介绍下 RabitMQ。
1.2 RabbitMQ 介绍
RabBMQ是一个广泛部署的开源消息代理。
特点:
- 异步消息传递
- 易于部署
- 支持集群,用于高可用性和吞吐量。支持分布式部署。
- 开发者友好,支持各种流行的开发语言。比如Java,Ruby,GO。
- 方便的 管理与监控 工具
2. AMQP(高级消息队列协议)概述
RabbitMQ 是一个实现了 AMQP协议 的工具软件,所以 AMQP 中的概念和准则也适用于 RabbitMQ。下面重点介绍AMQP,它能帮助我们深刻的理解。
AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用 和消息中间件代理之间进行通信。
为什么会有 AMQP?
软件系统中存在不同厂商的不兼容产品的问题,异构系统的集成是非常昂贵和复杂的。早期的消息传递解决方案也非常昂贵,往往专门用于大公司负担得起。
AMQP 设计目标:
- 为消息中间件创建开放标准
- 启用各种技术和平台之间的互操作性
AMQP 不仅使这些不同的系统能够相互通信,而且能够实现不同的产品。
消息中间件(经纪人)及其角色
“消息代理” 收到 “消息生产者”并将它们 “路由” 到 “消费者”。
(发布它们的应用程序) --> 消息代理 ---> (处理它们的应用程序)
2. AMQP 模型简介
2.1 工作过程
它工作过程如下图:
- 消息(Message)被发布者(Publisher)发送给交换机(Exchange)
- 交换机(Exchange)可以理解成邮局,交换机将收到的消息根据路由规则分发给绑定的队列(Queue)
- 最后,AMQP代理会将消息投递给订阅了此队列的消费者(Consumer),或者消费者按照需求自行获取。
一些其他情况:
消息属性:
发布消息时可以给消息指定各种消息属性(message meta-data)。有些属性会被消息代理(brokers)使用,有些只能被接收消息的应用所使用。
回执(acknowledgement):
网络是不可靠的,有可能在处理消息的时候失败。AMQP 包含了一个消息确认的概念:当一个消息成功到达消费者后(consumer),消费者会通知一下消息代理(broker),这个可以是自动的也可以由开发者执行。
当“消息确认”被启用的时候,消息代理不会完全将消息从队列中删除,直到它收到来自消费者的确认回执(acknowledgement)。
无法到达
当一个消息无法被成功路由时,消息或许会被返回给发布者并被丢弃。或者,如果消息代理执行了延期操作,消息会被放入一个所谓的死信队列中。此时,消息发布者可以选择某些参数来处理这些特殊情况。
2.2 AMQP 内部模型
3. 交换机(Exchange)
交换机是要掌握的重点,这一章节重点来讲。
交换机 用来传输消息的,交换机拿到一个消息之后将它路由给一个队列。
它的传输策略是由交换机类型和被称作绑定(bindings)的规则所决定的。
四种交换机:
Name(交换机类型) | 默认名称 |
---|---|
Direct exchange(直连交换机) | (空字符串) , amq.direct |
Fanout exchange(扇型交换机) | amq.fanout |
Topic exchange(主题交换机) | amq.topic |
Headers exchange(头交换机) | amq.match (and amq.headers in RabbitMQ) |
交换机状态
交换机可以有两个状态:持久(durable)、暂存(transient)。持久化的交换机会在消息代理(broker)重启后依旧存在,而暂存的交换机则不会。并不是所有的应用场景都需要持久化的交换机。
下面分别说明四种交换机类型
3.1 直连型交换机( Direct Exchange)
消息可以携带一个属性 “路由键(routing key)”,以辅助标识被路由的方式。直连型交换机(direct exchange)根据消息携带的路由键将消息投递给对应队列的。
它如何工作:
- 将一个队列绑定到某个交换机上,同时赋予该绑定(Binding)一个路由键(routing key)
- 当一个携带着路由键为 “key1” 的消息被发送给直连交换机时,交换机会把它路由给 “Binding名称等于 key1” 的队列。
总结:
Binding 的 Routing Key 要和 消息的 Routing Key 完全匹配
3.2 扇型交换机 ( Fanout Exchange)
扇型交换机将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。
如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。扇型用来交换机处理消息的广播路由(broadcast routing)。
案例:
- MMO游戏可以使用它来处理排行榜更新等全局事件
- 体育新闻网站可以用它来实时地将比分更新分发给多端
- 在群聊的时候,它被用来分发消息给参与群聊的用户。
总结
不管 消息的Routing Key,广播给这个交换机下的所有绑定队列。
3.3 主题交换机( Topic Exchanges)
主题交换机通过对消息的路由键
和 “绑定的主题名称” 进行模式匹配,将消息路由给匹配成功的队列。
它的工作方式:
- 为绑定的 Routing Key 指定一个 “主题”。模式匹配用用 *, # 等字符进行模糊匹配。比如 usa.# 表示 以 usa.开头的多个消息 到这里来。
- 交换机将按消息的 Routing Key 的值的不同路由到 匹配的主题队列。
主题交换机经常用来实现各种分发/订阅模式及其变种。主题交换机通常用来实现消息的多播路由(multicast routing)。
使用案例:
- 由多个人完成的后台任务,每个人负责处理某些特定的任务
- 股票价格更新涉及到分类或者标签的新闻更新(
总结:
绑定 的 Routing Key 和 消息的 Routing Key 进行字符串的模糊匹配。
3.4 头交换机 (Headers exchange)
头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
在实际中并不常用。
4. 相关实体概念
4.1 队列( Queue )
队列 存储着即将被应用消费掉的消息。
名称
可以为队列指定一个名称。
队列持久化
- 持久化队列(Durable queues)会被存储在磁盘上,当消息代理(broker)重启的时候,它可以被重新恢复。
- 没有被持久化的队列称作暂存队列(Transient queues)
4.2 绑定(Binding)
绑定是交换机(exchange)将消息(message)路由给队列(queue)所需遵循的规则。
如果要指示交换机“E”将消息路由给队列“Q”,那么“Q”就需要与“E”进行绑定。绑定操作需要定义一个可选的路由键(routing key)属性给某些类型的交换机。
路由键的意义在于从发送给交换机的众多消息中选择出某些消息,将其路由给绑定的队列。
4.3 消费者 ( Consumer )
消费者即使用消息的客户。
消费者标识
每个消费者(订阅者)都有一个叫做消费者标签的标识符。它可以被用来退订消息。
一个队列可以注册多个消费者,也可以注册一个独享的消费者(当独享消费者存在时,其他消费者即被排除在外)。
4.4 消息确认 (acknowledgement)
什么时候删除消息才是正确的?有两种情况
- 自动确认模式:当消息代理(broker)将消息发送给应用后立即删除。
- 显式确认模式:待应用(application)发送一个确认回执(acknowledgement)后再删除消息。
在显式模式下,由消费者来选择什么时候发送确认回执(acknowledgement)。
- 应用可以在收到消息后立即发送
- 或将未处理的消息存储后发送
- 或等到消息被处理完毕后再发送确认回执。
如果一个消费者在尚未发送确认回执的情况下挂掉了,那代理会将消息重新投递给另一个消费者。如果当时没有可用的消费者了,消息代理会死等下一个注册到此队列的消费者,然后再次尝试投递。
拒绝消息
当一个消费者接收到某条消息后,处理过程有可能成功,有可能失败。
应用可以向消息代理表明,本条消息由于“拒绝消息(Rejecting Messages)”的原因处理失败了(或者未能在此时完成)。当拒绝某条消息时,应用可以告诉消息代理如何处理这条消息——销毁它或者重新放入队列。
4.5 消息 ( Message )
消息的组成:
- 消息属性
- 消息主体(有效载荷)
消息属性(Attributes)常见的有:
- Content type(内容类型)
- Content encoding(内容编码)
- Routing key(路由键)
- Delivery mode (persistent or not) 投递模式(持久化 或 非持久化)
- Message priority(消息优先权)
- Message publishing timestamp(消息发布的时间戳)
- Expiration period(消息有效期)
- Publisher application id(发布应用的ID)
消息体:
- 消息体即消息实际携带的数据,消息代理不会检查或者修改有效载荷。
- 消息可以只包含属性而不携带有效载荷。
- 它通常会使用类似JSON这种序列化的格式数据。
- 常常约定使用"content-type" 和 "content-encoding" 这两个字段分辨消息。
4.5 连接 (Connection)
AMQP 连接通常是长连接。AMQP是一个使用TCP提供可靠投递的应用层协议。AMQP使用认证机制并且提供TLS(SSL)保护。
当一个应用不再需要连接到AMQP代理的时候,需要优雅的释放掉AMQP连接,而不是直接将TCP连接关闭。
4.6 通道 (channels)
AMQP 提供了通道(channels)来处理多连接,可以把通道理解成共享一个TCP连接的多个轻量化连接。
这可以应对有些应用需要建立多个连接的情形,开启多个TCP连接会消耗掉过多的系统资源。
在多线程/进程的应用中,为每个线程/进程开启一个通道(channel)是很常见的,并且这些通道不能被线程/进程共享。
通道号
通道之间是完全隔离的,因此每个AMQP方法都需要携带一个通道号,这样客户端就可以指定此方法是为哪个通道准备的。
4.7 虚拟主机 (vhost)
为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等),AMQP提供了一个虚拟主机(virtual hosts - vhosts)的概念。
这跟Web servers虚拟主机概念非常相似,这为AMQP实体提供了完全隔离的环境。当连接被建立的时候,AMQP客户端来指定使用哪个虚拟主机。
5. 参考:
https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html
http://rabbitmq.mr-ping.com/AMQP/AMQP_0-9-1_Model_Explained.html
https://rubydoc.info/github/ruby-amqp/amqp/master/file/docs/AMQP091ModelExplained.textile
https://www.rabbitmq.com/blog/tag/amqp-10/
https://github.com/rabbitmq/rabbitmq-amqp1.0
https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html
https://www.cnblogs.com/sgh1023/p/11217017.html
END