概述
本指南概述了AMQP 0-9-1协议,它是RabbitMQ支持的协议之一。
AMQP 0-9-1和AMQP模型的高层面概述
什么是AMQP 0-9-1?
AMQP 0-9-1(高级消息队列协议)是一种消息传递协议,使符合要求的客户端应用程序可以与符合要求的消息传递中间件消息代理进行通信。
消息代理及其充当的角色
消息代理从发布者(发布消息的应用程序,也称为生产者)接收消息,并将其路由到消费者(处理它们的应用程序)。
由于这是一个网络协议,因此发布者,消费者和消息代理都可以位于不同的计算机上。
AMQP 0-9-1模型简介
AMQP 0-9-1模型具有以下的类似于现实世界中的情景:消息发布到类似于邮局或邮箱的交换机(exchanges)。 然后,交换机使用称为绑定(bindings)的规则将消息副本分发到队列(queues)。 然后,消息代理要么将消息传递给订阅了相关队列的消费者,要么消费者按需从队列中获取/拉取消息。
发布消息时,发布者可以指定各种消息属性(消息元数据)。 这些元数据中的某些可能由消息代理使用,但是其余部分对消息代理完全不透明,并且仅由接收消息的应用程序使用。
网络不可靠并且应用程序可能在处理消息失败,因此AMQP 0-9-1模型具有消息确认(message acknowledgements)的概念:将消息传递给消费者时,消费者会通知消息代理,要么自动地,要么直到应用程序开发人员选择这样做之后。 使用消息确认时,消息代理只有在收到该消息(或消息组)的通知时,才会从队列中完全删除该消息。
在某些情况下,例如,当无法路由消息时,消息可能会返回给发布者,被丢弃,或者,如果消息代理实施了扩展,则可能被放入所谓的“死信队列”中。 发布者通过使用某些参数发布消息来选择如何处理这种情况。
队列,交换和绑定统称为AMQP实体(AMQP entities)。
AMQP 0-9-1是一个可编程协议
从AMQP 0-9-1实体和路由方案主要由应用程序本身而不是由消息代理管理员定义的意义上讲,AMQP 0-9-1是一种可编程协议。 因此,为声明队列和交换,定义它们之间的绑定,订阅队列等等的协议操作作了规定。
这为应用程序开发人员提供了很多自由,但同时也要求他们意识到潜在的定义冲突。 在实践中,定义冲突很少发生,并且通常表明配置错误。
应用程序声明所需的AMQP 0-9-1实体,定义必要的路由方案,并且在不再使用AMQP 0-9-1实体时可以选择删除它们。
交换机和交换机类型
交换机是发送消息的AMQP 0-9-1实体。 交换机接收一条消息并将其路由到零个或多个队列中。 使用的路由算法取决于交换机类型和称为绑定的规则。 AMQP 0-9-1消息代理提供四种交换类型:
交换机类型 | 默认的预定义名称 |
---|---|
Direct | (空字符串) 和 amq.direct |
Fanout | amq.fanout |
Topic | amq.topic |
Headers | amq.match (在RabbitMQ是amq.headers) |
除了交换机的类型之外,还使用许多属性用于声明交换机,其中最重要的是:
- 名称(Name)
- 持久性(Durability) (交换机是否在消息代理重启后依然存在)
- 自动删除(Auto-delete)(交换机在最后一个队列解除绑定时被删除)
- 参数(Arguments) (可选,由消息代理的插件和特定于消息代理的功能使用)
交换机可以是持久的,也可以是一次性的。 持久的交换可在消息代理重新启动后继续存在,而一次性的交换机则不能(它们必须在消息代理重启上线时被重新声明才能继续工作)。 并非所有方案和用例都要求交换机具有持久性。
Default交换机
默认交换机是消息代理预先声明的不带名称(空字符串)的Direct类型的交换机。 它具有一个特殊的属性,使其对于简单的应用程序非常有用:每个创建的队列都使用与该队列名称相同的路由键(routing key)自动绑定到该交换机。
例如,当您声明一个名称为“ search-indexing-online”的队列时,AMQP 0-9-1消息代理将使用“ search-indexing-online”作为路由键将其绑定到默认交换机(在此上下文中,有时又称为绑定键(binding key))。 因此,使用路由关键字“ search-indexing-online”发布到默认交换机的消息将被路由到队列“ search-indexing-online”。 换句话说,默认交换机使得看起来有可能将消息直接传递到队列,即使这不是技术上发生的事情。
Direct交换机
Direct交换机基于消息路由密钥将消息传递到队列。 直接交换是消息单播路由的理想选择(尽管它们也可以用于多播路由)。 下面是它的工作原理:
- 队列使用路由键K绑定到交换机
- 当一条带着路由键R的消息抵达Direct交换机时,如果K = R, 交换机会将它路由到刚刚的队列
Direct交换机通常用于以轮循方式在多个工作程序(同一应用程序的实例)之间分配任务。 这样做时,重要的是要了解,在AMQP 0-9-1中,消息在使用者之间而不是队列之间进行负载平衡。
Direct交换机可以用以下图形表示:
Fanout交换机
Fanout交换机将消息路由到与其绑定的所有队列,并且路由键将被忽略。 如果将N个队列绑定到Fanout交换机,则将新消息发布到该交换机时,该消息的副本将传递到所有N个队列。 Fanout交换机是消息广播路由的理想选择。
因为Fanout交换机将消息的副本发送到绑定到它的每个队列,所以它的用例非常相似:
- 大型多人在线(MMO)游戏可以将其用于排行榜更新或其他全球性事件
- 体育新闻网站可以使用扇出交换以近乎实时的方式向移动客户端分发得分更新
- 分布式系统可以广播各种状态和配置更新
- 群组聊天可以使用扇出交换在参与者之间分发消息(尽管AMQP没有内置的在线状态概念,因此XMPP可能是更好的选择)
Fanout交换机可以用以下图形表示:
Topic交换机
Topic交换机根据消息路由键和用于将队列绑定到交换机的模式匹配将消息路由到一个或多个队列。 Topic交换机类型通常用于实现各种发布/订阅模式变体。 Topic交换机通常用于消息的多播路由。
Topic交换机有非常广泛的用例。 每当问题涉及多个使用者/应用程序,这些使用者/应用程序有选择地选择他们希望接收的消息类型时,应考虑使用Topic交换机。
使用示例:
- 分发与特定地理位置有关的数据,例如销售点
- 由多个工作程序完成的后台任务处理,每个工作人员都可以处理特定的任务集
- 股票价格更新(以及其他种类的财务数据的更新)
- 涉及分类或标记的新闻更新(例如,仅针对特定运动或团队)
- 云中各种服务的编排
- 分布式体系结构/特定于操作系统的软件构建或打包,其中每个构建器只能处理一个体系结构或OS
Headers交换机
Headers交换机被设计成用于在多个属性上进行路由,而这些属性比一个路由键更容易表示为消息标头的场景。 Headers交换机忽略路由键属性。 取而代之的是,用于路由的属性取自headers属性。 如果标头的值等于绑定时指定的值,则认为消息匹配。
可以使用多个用于匹配的标头将队列绑定到Headers交换机。 在这种情况下,消息代理需要从应用程序开发人员那里获得另一条信息,即,它是否应考虑具有任何匹配的标头或全部匹配的消息? 这就是“ x-match”绑定参数的作用。 当“ x-match”参数设置为“ any”时,仅一个匹配的标头值就足够了。 或者,将“ x-match”设置为“ all”要求所有值必须匹配。
Headers交换机可以看作是“类固醇的Direct交换机”。 由于它们基于标头值进行路由,因此可以将它们用作Direct交换机,而路由密钥不必是字符串。 例如,它可以是整数或哈希(字典)。
请注意,以字符串x-
开头的标头将不用于评估匹配项。
队列
AMQP 0-9-1模型中的队列与其他消息和任务排队系统中的队列非常相似:它们存储应用程序使用的消息。 队列与交换共享一些属性,但也具有一些其他属性:
- 名称(Name)
- 持久(Durable)(队列将在消息代理重启后幸存)
- 独占(Exclusive)(仅由一个连接使用,并且该连接关闭时,队列将被删除)
- 自动删除(Auto-delete)(至少有一个消费者的队列在最后一个消费者取消订阅时被删除)
- 参数(Arguments)(可选;由插件和特定于消息代理的功能使用,例如消息TTL,队列长度限制等)
必须先声明队列,然后才能使用它。 声明队列将导致它创建(如果尚不存在)。 如果队列已经存在并且其属性与声明中的属性相同,则该声明将无效。 当现有队列属性与声明中的属性不同时,将引发代码为406(PRECONDITION_FAILED)
的信道(channel)级异常。
队列名称
应用程序可以选择给定队列的名称,也可以要求消息代理为它们生成名称。 队列名称最多可以包含255个字节的UTF-8字符。 AMQP 0-9-1消息代理可以代表应用程序生成唯一的队列名称。 要使用此功能,请传递一个空字符串作为队列名称参数。 生成的名称将与队列声明响应一起返回给客户端。
以“ amq”开头的队列名称保留为消息代理内部使用。 尝试用违反此规则的名称声明队列将导致信道级异常,其应答代码为403(ACCESS_REFUSED)
。
队列的持久性
在AMQP 0-9-1协议中,可以将队列声明为持久的或临时性的。 持久的队列的元数据存储在磁盘上,而临时的队列的元数据在可能的情况下存储在内存中。
发布时的消息也有相同的区别。
在持久性很重要的环境和用例中,应用程序必须使用持久队列,并确保发布者将已发布消息标记为持久。
队列指南中更详细地介绍了此主题。
绑定
绑定是交换机使用(除其他事项外)将消息路由到队列的规则。 为了指示交换机E将消息路由到队列Q,必须将Q绑定到E。绑定可能具有某些交换机类型使用的可选的路由键属性。 路由键的目的是选择某些发布到交换机的消息路由到绑定队列。 换句话说,路由键的作用就像过滤器。
打个比方:
- 队列就像您在纽约市的目的地
- 交换机就像肯尼迪国际机场
- 绑定是从肯尼迪国际机场到目的地的路线。 可能有零种或多种方式达到目标
有了这一间接层,就可以直接使用发布到队列来实现不可能或很难实现的路由方案,并且还消除了应用程序开发人员必须做的一定数量的重复工作。
如果消息无法路由到任何队列(例如,由于该消息所发布到的交换机没有绑定),则消息将被丢弃或返回给发布者,,具体取决于发布者设置的消息属性。
消费者
除非应用程序可以使用它们,否则将消息存储在队列中是没有用的。 在AMQP 0-9-1协议模型中,应用程序可以通过两种方式执行此操作:
- 订阅以将消息传递给他们(“推送API”):这是推荐的选项
- 轮询(“拉动API”):这种方式效率很低,在大多数情况下应避免使用
使用“推送API”,应用程序必须表明有兴趣使用来自特定队列的消息。 当他们这样做时,我们说他们注册了一个消费者,或者简单地说,订阅了一个队列。 每个队列可能有一个以上的消费者,或者注册一个排他的消费者(在使用队列时,从队列中排除所有其他使用者)。
每个消费者(订阅)都有一个称为消费者标签(consumer tag)的标识符。 它可以用于取消订阅消息。 消费者标签只是字符串。
消息确认
消费者应用程序(即接收和处理消息的应用程序)有时可能无法处理单个消息,有时甚至会崩溃。 网络问题也有可能引起问题。 这就提出了一个问题:代理何时应从队列中删除消息? AMQP 0-9-1协议让消费者可以对此进行控制。 有两种确认模式:
- 消息代理将消息发送到消费者应用程序后(使用basic.deliver或basic.get-ok方法)。
- 消费者应用程序发送回确认后(使用basic.ack方法)。
前者称为自动确认模型,而后者称为显式确认模型。 使用显式模型,应用程序选择何时发送确认。 可以在收到消息之后,或者在将消息持久保存到数据存储之前(在处理之前)或在完全处理消息之后(例如,成功获取Web页,将其处理并将其存储到某些持久数据存储中)。
如果某个消费者在没有发送确认的情况下死亡,则消息代理会将其重新交付给另一位消费者,或者,如果当时没有人可用,则该消息代理将等到至少一个消费者为同一队列注册后再尝试重新交付。
拒绝消息
当消费者应用程序接收到一条消息时,对该消息的处理可能会成功也可能不会成功。 消费者应用程序可以通过拒绝消息来向消息代理表示消息处理已失败(或当时无法完成)。 拒绝消息时,消费者应用程序可以要求代理放弃或重新排队。 当队列中只有一个消费者时,请您确保不要一次又一次地通过拒绝并重新将消息放入队列来创建无限的来自同一消费者的消息。
负面确认
使用basic.reject
方法拒绝消息。 basic.reject方法有一个限制:没有办法像消息确认那样拒绝多个消息。 但是,如果您使用RabbitMQ,那么有解决方案。 RabbitMQ提供了AMQP 0-9-1协议扩展,称为negative acknowledgements或nacks。 有关更多信息,请参阅确认和basic.nack拓展扩展指南。
预取消息
对于多个消费者共享一个队列的情况,在发送下一个确认之前,能够指定每个消费者可以一次发送多少消息是很有用的。 如果消息倾向于批量发布,则可以将其用作简单的负载平衡技术或提高吞吐量。 例如,如果生产者应用程序由于其工作性质而每分钟发送一次消息,
请注意,RabbitMQ仅支持信道级别的预取计数,不支持基于连接或基于大小的预取。
消息属性和有效载荷
AMQP 0-9-1协议模型中的消息具有属性。 一些属性非常常见,以至于AMQP 0-9-1协议规范定义了它们,应用程序开发人员不必考虑确切的属性名称。 一些例子是:
- Content type
- Content encoding
- Routing key
- Delivery mode (persistent or not)
- Message priority
- Message publishing timestamp
- Expiration period
- Publisher application id
某些属性由AMQP代理使用,但大多数属性都可以由接收它们的应用程序进行解释。 一些属性是可选的,称为标头。 它们类似于HTTP中的X-Header。 消息发布时设置消息属性。
消息还具有有效负载(它们携带的数据),AMQP代理将其视为不透明字节数组。 代理将不会检查或修改有效负载。 消息可能仅包含属性而没有有效载荷。 通常使用JSON,Thrift,协议缓冲区和MessagePack等序列化格式来序列化结构化数据,以将其发布为消息有效负载。 协议对等方通常使用“内容类型”和“内容编码”字段来传达此信息,但这仅是约定。
消息可能被发布为持久消息,这使得代理将它们持久保存到磁盘。 如果重新启动服务器,系统将确保接收的持久消息不会丢失。 仅仅将消息发布到持久性交换或路由到持久性队列的事实并不能使消息持久化:这全都取决于消息本身的持久性模式。 将消息持久发布会影响性能(就像数据存储一样,持久性会以一定的性能成本为代价)。
在发布者指南中了解更多信息。
消息应答
由于网络不可靠且应用程序可能处理失败,因此进行某些处理应答通常是必要的。 有时仅需要应答已收到消息这一事实。 有时,应答表示消息已由消费者验证和处理,例如,被验证为具有必需数据并持久保存到数据存储或建立索引。
这种情况非常普遍,因此AMQP 0-9-1协议具有称为消息应答(有时称为ack)的内置功能,供消费者用来应答消息的传递和/或处理。 如果消费者应用程序崩溃(关闭连接时AMQP消息代理会注意到此情况),如果希望收到消息确认但AMQP消息代理未收到该消息,则会对该消息重新排队(并可能立即传递给另一个消费者,如果有的话)。
协议中内置了确认,有助于开发人员构建更强大的软件。
AMQP 0-9-1方法
AMQP 0-9-1被构造为多种方法。 方法是操作(如HTTP方法),与面向对象编程语言中的方法没有任何共同之处。 AMQP 0-9-1中的协议方法分为几类。 类只是AMQP方法的逻辑分组。 AMQP 0-9-1参考包含所有AMQP方法的完整详细信息。
让我们看一下交换机类,这是一组与交换机操作有关的方法。 它包括以下操作:
- exchange.declare
- exchange.declare-ok
- exchange.delete
- exchange.delete-ok
(请注意,RabbitMQ站点参考还包括对Exchange类的RabbitMQ特定的扩展,我们将不在本指南中讨论)。
上面的操作形成逻辑对:exchange.declare和exchange.declare-ok,exchange.delete和exchange.delete-ok。 这些操作是“请求”(由客户发送)和“响应”(作为消息代理对前面提到的“请求”的响应发送)。
例如,客户端使用exchange.declare方法要求消息代理声明新的交换机:
如上图所示,exchange.declare带有几个参数。 它们使客户端能够指定交换名称,类型,持久性标记等。
如果操作成功,则消息代理将使用exchange.declare-ok方法进行响应:
exchange.declare-ok方法除信道号外不携带任何参数(信道将在本指南后面介绍)。
对于AMQP 0-9-1队列方法类上的另一个方法对,事件的顺序非常相似:queue.declare和queue.declare-ok:
并非所有的AMQP 0-9-1协议的方法都有对应的方法。 一些(basic.publish是使用最广泛的方法)没有相应的“响应”方法,而另一些(例如basic.get)则具有多个可能的“ response”方法。
连接
AMQP 0-9-1协议的连接通常是长期的。 AMQP 0-9-1是使用TCP进行可靠传递的应用程序级别协议。 连接使用身份验证,并且可以使用TLS进行保护。 当不再需要将应用程序连接到服务器时,它应优雅地关闭其AMQP 0-9-1协议的连接,而不是突然关闭基础TCP连接。
信道
某些应用程序需要与消息代理有多个连接。 但是,不希望同时打开许多TCP连接,因为这样做会消耗系统资源,并使配置防火墙更加困难。 AMQP 0-9-1连接与可以被认为是“共享单个TCP连接的轻型连接”的信道 复用。
客户端执行的每个协议操作都在信道上发生。 某个特定信道上的通信与另一个信道上的通信是完全分开的,因此每种协议方法还带有一个信道ID(channel ID)(又称通道号(channel number)),该整数是代理和客户端都用来确定该方法用于哪个通道的整数。
信道仅存在于连接的上下文中,而不是单独存在的。 关闭连接后,其上的所有信道也将关闭。
对于使用多个线程/进程进行处理的应用程序,很常见的是为每个线程/进程打开一个新信道,而不在它们之间共享信道。
虚拟主机
为了使单个消息代理可以承载多个隔离的“环境”(用户组,交换,队列等),AMQP 0-9-1协议包括了虚拟主机(vhost)的概念。 它们类似于许多流行的Web服务器使用的虚拟主机,并提供AMQP实体所在的完全隔离的环境。 协议客户端指定在连接协商期间要使用的虚拟主机。
AMQP是可扩展的
AMQP 0-9-1协议具有几个扩展点:
- 定制交换机类型使开发人员可以实现路由方案,这些路由方案提供的现成交换机类型不能很好地覆盖,例如基于地理数据的路由。
- 交换机和队列的声明可以包含消息代理可以使用的其他属性。 例如,RabbitMQ中的 per-queue message TTL就是通过这种方式实现的。
- 协议的特定于消息代理的扩展。 例如,请参阅RabbitMQ实现的扩展。
- 可以引入新的AMQP 0-9-1方法类。
- 可以使用其他插件来扩展代理,例如,RabbitMQ管理前端和HTTP API被实现为插件。
这些功能使AMQP 0-9-1模型更加灵活,可适用于非常广泛的问题。
AMQP 0-9-1客户端生态系统
有许多用于许多流行编程语言和平台的AMQP 0-9-1客户端。 其中一些紧密遵循AMQP术语,仅提供AMQP方法的实现。 其他一些具有附加功能,便捷方法和抽象。 有些客户端是异步的(非阻塞),有些是同步的(阻塞),有些则支持这两种模型。 一些客户端支持特定于供应商的扩展(例如,特定于RabbitMQ的扩展)。
因为AMQP的主要目标之一是互操作性,所以对于开发人员来说,理解协议操作而不是将自己局限于特定客户端库的术语是一个好主意。 这种方式将大大简化与使用不同库的开发人员进行的通信。