1. 方法论
RabbitMQ 是一个由 Erlang 语言开发的 并基于AMQP 的开源实现。
AMQP :Advanced Message Queue Protocol,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
-
下面权且盗用网络中一张RabbitMQ结构流程图:
2. 理解几个概念
Exchange
交换机,用来接收发布者的消息及消息的投递。
Binding
路由和队列的绑定关系,由它来决定消息最终被存放在哪个队列。
Queue
经路由消息最终被存放的地方,一个消息可以存放在多个队列中。
Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
3. AMQP的消息路由
如上所述,AMQP 中存在 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。
Exchange的类型
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:
direct: 消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。它是属于完全匹配,单播的模式。
fanout: 不会处理路由键,会把消息分发到所有绑定的队列中,类似于广播。
topic: 将消息的路由键跟某个模式进行匹配。
4. RabbitMQ集群
RabbitMQ 最优秀的功能之一就是内建集群,这个功能设计的目的是允许消费者和生产者在节点崩溃的情况下继续运行,以及通过添加更多的节点来线性扩展消息通信吞吐量。RabbitMQ 内部利用 Erlang 提供的分布式通信框架 OTP 来满足上述需求,使客户端在失去一个 RabbitMQ 节点连接的情况下,还是能够重新连接到集群中的任何其他节点继续生产、消费消息。
RabbitMQ 会始终记录以下四种类型的内部元数据:
- 队列元数据
包括队列名称和它们的属性,比如是否可持久化,是否自动删除 - 交换器元数据
交换器名称、类型、属性 - 绑定元数据
内部是一张表格记录如何将消息路由到队列 - vhost 元数据
为 vhost 内部的队列、交换器、绑定提供命名空间和安全属性
在单一节点中,RabbitMQ 会将所有这些信息存储在内存中,同时将标记为可持久化的队列、交换器、绑定存储到硬盘上。存到硬盘上可以确保队列和交换器在节点重启后能够重建。而在集群模式下同样也提供两种选择:存到硬盘上(独立节点的默认设置),存在内存中。
如果在集群中创建队列,集群只会在单个节点而不是所有节点上创建完整的队列信息(元数据、状态、内容)。结果是只有队列的所有者节点知道有关队列的所有信息,因此当集群节点崩溃时,该节点的队列和绑定就消失了,并且任何匹配该队列的绑定的新消息也丢失了。还好RabbitMQ 2.6.0之后提供了镜像队列以避免集群节点故障导致的队列内容不可用。
RabbitMQ 集群中可以共享 user、vhost、exchange等,所有的数据和状态都是必须在所有节点上复制的,例外就是上面所说的消息队列。RabbitMQ 节点可以动态的加入到集群中。
当在集群中声明队列、交换器、绑定的时候,这些操作会直到所有集群节点都成功提交元数据变更后才返回。集群中有内存节点和磁盘节点两种类型,内存节点虽然不写入磁盘,但是它的执行比磁盘节点要好。内存节点可以提供出色的性能,磁盘节点能保障配置信息在节点重启后仍然可用,那集群中如何平衡这两者呢?
RabbitMQ 只要求集群中至少有一个磁盘节点,所有其他节点可以是内存节点,当节点加入或离开集群时,它们必须要将该变更通知到至少一个磁盘节点。如果只有一个磁盘节点,刚好又是该节点崩溃了,那么集群可以继续路由消息,但不能创建队列、创建交换器、创建绑定、添加用户、更改权限、添加或删除集群节点。换句话说集群中的唯一磁盘节点崩溃的话,集群仍然可以运行,但直到该节点恢复,否则无法更改任何东西。
5. python实现
这里是调用pika的接口来实现的
## 公共部分
# 连接rabbitmq
conn = pika.BlockingConnect(pika.ConnectionParameters(host='localhost'))
channel = conn.channel()
# 声明队列
channel.queue_declare(queue='hello', durable=True)
## 发送部分
channel.basic_publish(exchange='', routing_key='hello', body='hello world')
conn.close()
## 接收部分
# 首先定义一个回调函数
def callback():
pass
channel.basic_consume(callback, queue='hello') # 默认对消息进行确认
channel.start_consuming() # 开始循环从queue中接收message并使用callback进行处理
## 公平分配
channel.basicPos(prefetchCount=1)
6. RabbitMQ RPC实现
RPC,远程过程调用,实现本地调用远程的服务或者函数,而RabbitMQ实现RPC其实质是消费者得到消息consume之后,把结果再通过队列的形式回传给生产者消费来获取期望的结果,这里一般要用到AMQP协议里消息队列的两个属性来实现:
- reply_to:指明回调的队列;
- correlation_id:在请求中关联处理RPC响应;
下面给出一些客户端和服务端的一些核心的python伪代码:
# 服务端
conn = pike.BlockingConnection()
channel = conn.channel()
channel.queue_declare(queue='rpc_queue')
def func(param):
return 'RPC SUCCESS: %s' % str(param)
def callback(ch, method, properties, body):
print(str(body))
response = func(body)
ch.basic_publish(exchange='', routing_key=properties.reply_to, properties=pike.BasicProperties(correlation_id= props.correlation_id), body=response)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='rpc_queue')
print("Awaiting RPC requests")
channel.start_consuming()
# 客户端
class RPCClient:
def __init__(self):
self.conn = pike.BlockingConnection()
self.channel = self.conn.channel()
#
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(on_response, queue=self.callback_queue, no_ack=True)
def on_response(self, ch, method, properties, body):
if self.correlation_id = properties.correlation_id
self.response = body
def call(self, param00):
self.correlation_id = str(uuid.uuid4())
self.response = None
self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pike.BasicProperties(reply_to=self.callback_queue, correlation_id=self.correlation_id))
while self.response is None:
self.connection.process_data_events()
return self.response
rpc_client = RpcClient()
response = rpc_client.call('test')
print(" [.] Got %s" % response)
7. 常见应用场景
异步处理、应用解耦、流量削峰等,就不一一举例说明了。