[TOC]
摘要:一种用Erlang语言开发的符合AMQP的高性能消息中间件。
1 概念和原理
1.1概念
RabbitMQ是使用Erlang语言开发的符合AMQP的高性能消息中间件。它遵循AMQP,即Advanced Message Queuing Protocol。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。 总体来说RabbitMQ是位于生产者和消费者之间的缓冲队列,其内部组成包括以下几种组件:
Broker:即队列服务主体,即RabbitMQ Server本身;
vhost:一个broker里可以划分多个vhost,用作不同用户的权限分离;
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息的载体,每个消息都会被投到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
Producer:消息生产者,就是投递消息的程序.
Consumer:消息消费者,就是接受消息的程序.
Channel:消息通道,在客户端的每个连接里,可建立多个channel.
1.1.1Exchange分类
RabbitMQ中的交换机主要包括如下几种类型:
-
Direct Exchange(直连交换机)
所有发送到DE的消息被转发到RouteKey中指定的Queue。如果消息的routigKey和binding的routigKey直接匹配的话,消息将会路由到该队列。 -
Fanout Exchange(扇型交换机)
扇形交换机,转发消息到所有绑定队列(速度最快),不管消息的routigKey息和binding的参数表头部信息和值是什么,消息将会路由到所有的队列。 -
Topic Exchange(主题交换机)
主题交换机,按规则转发消息(最灵活),如果消息的routigKey和binding的routigKey符合通配符匹配的话,消息将会路由到该队列。 -
Headers Exchange(头交换机)
首部交换机 ,如果消息的头部信息和binding的参数表中匹配的话,消息将会路由到该队列。headers类型的交换机分发消息不依赖routingKey,是使用发送消息时basicProperties对象中的headers来匹配的。headers是一个键值对类型,发送者发送消息时将这些键值对放basicProperties对象中的headers字段中,队列绑定交换机时绑定一些键值对,当两者匹配时,队列就可以收到息。匹配模式有两种,在队列绑定到交换机时用x-match来指定,all代表定义的多个键值对都要满足,而any则代码只要满足一个就可以了。 -
死信队列(Dead-Letter-Exchange)
利用DLX, 当消息在一个队列中变成死信(dead message)之后, 它能被重新publish到另一个Exchange, 这个Exchange就是DLX。DLX也是一个正常的Exchange, 和一般的Exchange没有区别, 它能在任何队列上被指定, 实际上就是设置某个队列的属性为死信队列。当这个队列中有死信时, RabbitMQ就会自动将这个消息重新发布到设置的Exchange上去, 进而被路由到另一个队列。消息变成死信有以下几种情况 :- 消息被拒绝(basic.reject/basic.nack) 并且requeue重回队列设置成false
- 消息TTL过期
- 队列达到最大长度
1.2工作流程
- 消息发送流程
- Producer与 Borker建立 Connection开启 channel;
- Producer声明 Exchange,Queue以及 binding key ;
- Producer生产 msg,包括:palyload(载体)、label(标签),然后发送消息;
- Boker根据 msg中 label中的 Exchange名称 和 Routing key进行消息路由;
- 关闭 channel和 Connection。
- 消息接收流程
- Consumer与 Borker建立 Connection开启 channel;
- Consumer监听某个 Queue;
- Consumer主动拉取,或者被动接收推送的信息进行消费,并推送 Ack 确认消费到Borker;
- Boker收到 Ack 确认,将消息从 Queue中删除;
- 关闭 channel和 Connection。
2应用场景
2.1 跨系统间的通信
如:存在一个旧有的跑了很久的老代码。此时,搭建一个新的 Saas 平台,作为服务提供商,旧有的老系统需要作为其中一个服务纳入到新系统平台中,且业务需求上他们之间的用户是存在关联关系的。这两个系统除了业务上需要关联,其他都是分离,包括数据库等。这时候,两个平台的用户信息变更,如:创建,更新等,都需要进行信息同步。这时候消息中间件就派上了用场。在旧有的系统中加入 RabbitMQ 组件,在用户信息发生变更时,将变更信息可靠的投递到 RabbitMQ Borker(RabbitMQ 服务节点)中,由新的系统监听消息 Queue(队列),然后确保消息的消费,同步更新用户信息,以此达到跨系统间的通信。
2.2 保持消息的最终一致性
解决分布式事务的方案之一,Base 理论的提出,通过数据的最终一致性解决分布式事务问题,而事务的最终一致性可以通过 RabbitMQ 来完成。分布式系统间的事务处理,通过消息的发送和订阅来实现,当出现事务需要回滚的场景,同样通过消息的发送和订阅来实现业务的补偿,最终实现数据的最终一致性问题。
友链:谈谈分布式事务
2.3 基于 Pub/Sub 模型实现事件驱动
在例如:电商,金融交易等平台中,通常都存在 “订单”。如:电商中,用户下单之后并不一定会马上付款,但是此时商品资源被锁定,这时候就需要在规定时间内对未支付的订单进行取消的操作,同时释放商品资源。如:金融交易平台。平台中的金融产品需要提前预约,预约时会进行资源锁定,锁定产品份额等资源。在客户在规定时间内未进行交付,需要自动取消预约订单,同时释放额度。这时候可以通过 RabbitMQ 实现延迟队列的方式来处理。通过设定如:30分钟,未交付或者未付款的订单 进行取消订单,释放额度处理。