RabbitMQ+GRPC的快速使用(1)

使用背景

之前写的同步发送通知随着业务量的增长,已经不再适用,所以快速实现一个基本的rq队列+grpc的方式来投递通知数据并交给rq的worker去调用grpc的服务。
但是之前调用的地方太多了,所以最好还是以patch的方式去修改

思路

原有的结构大致为图1所示


图1

首先flask调用grpc再由grpc请求微信服务器发送消息,然后由微信响应请求后返回通知结果给grpc,grpc再返回结果给flask最终返回给客户端,所以除非等到grpc返回调用结果,否则将会一直阻塞
现在则为


图2

flask投递消息到队列中去就就结束了,直接返回到客户端,这里就不会阻塞,而是让监听rabbitMQ的worker去执行

这里暂时只创建一个队列去分发所有类型的通知所以message的格式需要固定
{"method":"method_name", "data":{}},客户端调用publish传入对应的参数即可

# client.py
import pika
import pickle


class RabbitClient(object):
    def __init__(self, host="localhost", port=5672, routing_key=None):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host=host, port=port))
        self.channel = self.connection.channel()
        self.routing_key = routing_key

    def publish(self, method_name, **kwargs):
        message = self.package(method_name, **kwargs)

        self.channel.basic_publish(exchange='', routing_key=self.routing_key, body=message)

    def package(self, method_name, **kwargs):
        temp = {"method": method_name}
        temp.update(data=kwargs)
        return pickle.dumps(temp)
# 这里是调用的工具module,原来的方式已经注释
from apps import rq


# def sen_message_test(user_id, message):
#     """
#
#     :param user_id:
#     :param message: {"title":"","message":""}
#     :return:
#     """
#     with grpc.insecure_channel("{0}:{1}".format(_HOST, _PORT)) as channel:
#         client = send_server_pb2_grpc.SendServiceStub(channel=channel)
#         response = client.SendMessage(send_server_pb2.SendMessageParam(user_id=user_id, message=json.dumps(message)))
#     print("received: " + str(response))

def sen_message_test(user_id, message):
    rq.publish("sen_message_test", user_id=user_id, message=message)

def debt_remind_test(user_id=None, bill_id=None):
    rq.publish("debt_remind_test", user_id, bill_id)

def repair_remind_test(user_id=None, repair_id=None):
    rq.publish("repair_remind_test", user_id=user_id, repair_id=repair_id)

# 太多了就不全列出来了,总之就是要保证原来的业务逻辑代码还能用
# worker.py
import pika
import pickle


class RabbitServer(object):
    def __init__(self, host="localhost", port=5672, queue=None):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host=host, port=port))
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue=queue, durable=True)

        self.channel.basic_consume(on_message_callback=self.callback, queue=queue, auto_ack=True)
        self.dispatcher = RpcMethodDispatcher()
        self.setup = self.dispatcher.setup

    def callback(self, ch, method, properties, body):
        body = pickle.loads(body)
        print(body)

        func = self.dispatcher.dispatch(body.get("method"))
        if not func:
            return
        try:
            func(**body.get("data"))
        except Exception as e:
            print(e)

    def run(self):
        print("wait")
        self.channel.start_consuming()


class RpcMethodDispatcher(object):
    def __init__(self):
        self.map = []

    def setup(self, name):
        # 和message中的method相互对应类似于@app.route("/"),将所有路由添加过来
        def deco(f):
            self.map.append(MethodMap(name, f))

            def wrapper(*args, **kwargs):
                return f(*args, **kwargs)

            return wrapper

        return deco

    def dispatch(self, name):
        for i in self.map:
            if i.name == name:
                return i.method


class MethodMap(object):
    def __init__(self, name, method):
        self.name = name
        self.method = method


server = RabbitServer(queue="task_queue")

if __name__ == '__main__':
    server.run()

给标题后面加了个(1),我知道这玩意儿很快就会还要修改
可能看到这里就会有同学问了,为啥不new一个thread去执行嘞?


image.png
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,148评论 1 32
  • GRPC是基于protocol buffers3.0协议的. 本文将向您介绍gRPC和protocol buffe...
    二月_春风阅读 18,052评论 2 28
  • __block和__weak修饰符的区别其实是挺明显的:1.__block不管是ARC还是MRC模式下都可以使用,...
    LZM轮回阅读 3,387评论 0 6
  • 一种能飞的汽车,不用任何汽油等柴油。只要有指纹他就能飞起来而且不需要任何轨道,想去哪去。 也可以飞到海里,只需要关...
    赵霞_a476阅读 1,410评论 0 1
  • 鬼神神差那次买了村上春树的《当我谈跑步时,我谈些什么》,加上当时有好朋友迷上了跑步,经常在朋友圈晒各种跑马拉松的趣...
    暴走君萨阅读 463评论 10 5