RabbitMQ -- part6 [Remote procedure call (RPC)]

上节介绍了通过Topic exchange精确的匹配接收消息。本节介绍,通过 Remote Procedure Call (RPC)实现:在远程机器执行一个方法并且等待返回执行结果
  • Client interface

创建一个客户端类(MyRpcClient),该类有一个 call 方法,用来发送RPC请求并且阻塞等待响应结果:

length_rpc = MyRpcClient()
response = length_rpc.call(data)
print('[.] Got %r' % response)
  • Callback queue

通过RabbitMQ实现RPC很简单。客户端发送请求并且服务端回复响应结果。客户端为了接收响应,需要在发送request的同时发送一个"callback"队列地址,以便服务端通过此队列回复消息。

result = self.channel.queue_declare(exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
    routing_key='rpc_queue',
    properties=pika.BasicProperties(
        reply_to = self.callback_queue,
        correlation_id = self.corr_id,
    ),
    body = data
)

AMQP 0-9-1 协议为消息预定义了14个属性,常用属性:
delivery_mode : 消息持久化(值为2);消息短暂的(任意其他值)
content_type : 描述编码的mime-type,例如使用JSON编码格式,需要设置为application/json
reply_to : callback queue 名称
correlation_id : 关联RPC响应和请求

  • Correlation id

为了调高效率,需要为每个客户端单独创建一个callback queue。客户端为了能够从队列中接收自己的消息,需要使用"correlation_id"。当从 callback queue 中接收响应时,去判断此属性,如果匹配则接受,否则忽略。

  • Summary
    Summary

整个流程如下:

  1. 启动客户端,它会创建一个匿名的callback queue,并且在关闭连接后删除此队列(exclusive=True)
  2. 客户端发送消息,附带两个属性:reply_to (指定callback queue)和 correlation_id (对于每个请求都是唯一的)
  3. 请求发送到 rpc_queue 队列
  4. RPC server等待队列中的请求,请求到来时,执行任务并且将结果通过reply_to字段的值发送回客户端。
  5. 客户端在callback queue等待数据。当响应到来时,首先检测 correlation_id 属性,如果匹配则返回给应用。
  • 完整代码

  • rpc_server.py 服务器端

#!/usr/bin/env python3
# coding=utf-8

import pika

# 链接RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 声明队列'rpc_queue'
channel.queue_declare(queue='rpc_queue')

# 定义要执行的方法:统计消息的字符长度并返回
def length(data):
    if data:
        data_len = len(data)
    else:
        data_len = 0
    return data_len

# 为basic_consume定义回调函数
def on_request(ch, method, props, body):
    print('[.] length(%s)' % body)
    response = length(body)
    ch.basic_publish(exchange='',
        routing_key=props.reply_to,
        properties=pika.BasicProperties(correlation_id=props.correlation_id),
        body=str(response))
    ch.basic_ack(delivery_tag = method.delivery_tag)

# 平均分发
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')

# 等待请求并处理
print('[x] Awaiting RPC requests')
channel.start_consuming()
  • rpc_client.py 客户端
#!/usr/bin/env python3
# coding=utf-8

import pika
import sys
import uuid


class MyRpcClient(object):
    def __init__(self):
        """
        连接RabbitMQ,并且声明队列
        """
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        self.channel = self.connection.channel()
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue
        self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue)
    
    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:    # 判断是否和correlation_id匹配
            self.response = body

    def call(self, data):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to = self.callback_queue,
                correlation_id = self.corr_id,
                ),
            body = data)
        while self.response is None:    # 不断的循环,直到响应到达
            self.connection.process_data_events()
        return self.response

length_rpc = MyRpcClient()

data = ' '.join(sys.argv[1:]) if len(sys.argv) > 1 else 'bloke'

print('[x] Requesting length(%s)' % data)
response = length_rpc.call(data)
print('[.] Got %r' % response)
  • 执行
> ./rpc_client.py test.com
[x] Requesting length(test.com)
[.] Got b'8'
> ./rpc_server.py
[x] Awaiting RPC requests
[.] length(b'test.com')

参考文档: http://www.rabbitmq.com/tutorials/tutorial-six-python.html


©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,591评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,448评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,823评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,204评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,228评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,190评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,078评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,923评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,334评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,550评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,727评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,428评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,022评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,672评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,826评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,734评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,619评论 2 354

推荐阅读更多精彩内容