上节介绍了通过Topic exchange精确的匹配接收消息。本节介绍,通过 Remote Procedure Call (RPC)实现:在远程机器执行一个方法并且等待返回执行结果
- Client interface
创建一个客户端类(MyRpcClient),该类有一个 call 方法,用来发送RPC请求并且阻塞等待响应结果:
length_rpc = MyRpcClient()
response = length_rpc.call(data)
print('[.] Got %r' % response)
- Callback queue
通过RabbitMQ实现RPC很简单。客户端发送请求并且服务端回复响应结果。客户端为了接收响应,需要在发送request的同时发送一个"callback"队列地址,以便服务端通过此队列回复消息。
result = self.channel.queue_declare(exclusive=True)
callback_queue = result.method.queue
channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id,
),
body = data
)
AMQP 0-9-1 协议为消息预定义了14个属性,常用属性:
delivery_mode : 消息持久化(值为2);消息短暂的(任意其他值)
content_type : 描述编码的mime-type,例如使用JSON编码格式,需要设置为application/json
reply_to : callback queue 名称
correlation_id : 关联RPC响应和请求
- Correlation id
为了调高效率,需要为每个客户端单独创建一个callback queue。客户端为了能够从队列中接收自己的消息,需要使用"correlation_id"。当从 callback queue 中接收响应时,去判断此属性,如果匹配则接受,否则忽略。
-
Summary
整个流程如下:
- 启动客户端,它会创建一个匿名的callback queue,并且在关闭连接后删除此队列(exclusive=True)
- 客户端发送消息,附带两个属性:reply_to (指定callback queue)和 correlation_id (对于每个请求都是唯一的)
- 请求发送到 rpc_queue 队列
- RPC server等待队列中的请求,请求到来时,执行任务并且将结果通过
reply_to
字段的值发送回客户端。 - 客户端在callback queue等待数据。当响应到来时,首先检测 correlation_id 属性,如果匹配则返回给应用。
完整代码
rpc_server.py 服务器端
#!/usr/bin/env python3
# coding=utf-8
import pika
# 链接RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# 声明队列'rpc_queue'
channel.queue_declare(queue='rpc_queue')
# 定义要执行的方法:统计消息的字符长度并返回
def length(data):
if data:
data_len = len(data)
else:
data_len = 0
return data_len
# 为basic_consume定义回调函数
def on_request(ch, method, props, body):
print('[.] length(%s)' % body)
response = length(body)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag = method.delivery_tag)
# 平均分发
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
# 等待请求并处理
print('[x] Awaiting RPC requests')
channel.start_consuming()
- rpc_client.py 客户端
#!/usr/bin/env python3
# coding=utf-8
import pika
import sys
import uuid
class MyRpcClient(object):
def __init__(self):
"""
连接RabbitMQ,并且声明队列
"""
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id: # 判断是否和correlation_id匹配
self.response = body
def call(self, data):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id,
),
body = data)
while self.response is None: # 不断的循环,直到响应到达
self.connection.process_data_events()
return self.response
length_rpc = MyRpcClient()
data = ' '.join(sys.argv[1:]) if len(sys.argv) > 1 else 'bloke'
print('[x] Requesting length(%s)' % data)
response = length_rpc.call(data)
print('[.] Got %r' % response)
- 执行
> ./rpc_client.py test.com
[x] Requesting length(test.com)
[.] Got b'8'
> ./rpc_server.py
[x] Awaiting RPC requests
[.] length(b'test.com')
参考文档: http://www.rabbitmq.com/tutorials/tutorial-six-python.html