一、nameko简单了解:
● Nameko是一个用python语言写的微服务框架,
● 支持通过 rabbitmq 消息队列传递的 rpc 调用,也支持 http 调用。
● 小巧简洁,简单且强大;
● 可以让你专注于应用逻辑
二、nameko用途是什么:
● 可通过RabbitMq消息组件来实现RPC服务
三、依赖库:
● pip install nameko
四、简单案例(小试牛刀)
# service.py 向远程RabbitMq发布消息
from nameko.standalone.rpc import ClusterRpcProxy
CONFIG = {'AMQP_URI': "amqp://guest:guest@localhost"}
def publish():
with ClusterRpcProxy(CONFIG) as rpc:
rpc.hello_service.hello()
# hello_service.hello() 服务名为hello_service 方法名为hello
# nameko_server.py 启动一个nameko服务
from nameko.rpc import rpc
class hello_service:
name = "hello_service" # name为自定义的服务名称
@rpc # 入口标记点
def hello(self):
print("hello world")
五、Flask中使用nameko
5.1 案例一
from flask import Flask, request, Response
from nameko.standalone.rpc import ClusterRpcProxy
app = Flask(__name__)
CONFIG = {'AMQP_URI': "amqp://guest:guest@localhost"}
@app.route('/compute', methods=['POST'])
def compute():
operation = request.json.get('operation')
value = request.json.get('value')
other = request.json.get('other')
email = request.json.get('email')
msg = "Please wait the calculation, you'll receive an email with results"
subject = "API Notification"
with ClusterRpcProxy(CONFIG) as rpc:
# 同时调用两个服务 一个同步远程调用和一个异步远程调用
rpc.server_publish.hello(email, subject, msg)
result_async = rpc.server_publish01.hello.call_async(operation, value, other, email)
print(result_async.result())
return Response({"code": 1})
if __name__ == '__main__':
app.run(debug=True)
* 解释一下:
rpc后紧跟的是微服务定义时的类变量 name 的值即为微服务名称,接着紧跟rpc方法,
使用call_async 为异步调用,而调用 result_async.result()时会等待异步任务返回结果。
需要注意的是, 运行 ClusterRpcProxy(config) 时会创建与队列的连接,该操作比较耗时,
如果有大量的微服务调用,不应该重复创建连接,应在语句块内完成所有调用。
异步调用的结果只能在语句块内获取,即调用 .result() 等待结果。语句块之外连接断开就无法获取了。
5.2 案例二
● 个人建议使用案例2, 案例使用到一个flask_nameko包装器, 可以起到一个节约资源的目的
● 具体体现在:
- 案例1在每次调用rpc方法时都会创建一个对应的队列, 队列数量容易累积(虽然说RabbitMq会自动删除空队列, 但是好的方法没有理由拒绝)
- 案例2中flask_nameko, 解决了重复创建队列的问题, 意思就是在flask项目启动时,就会默认创建n个队列, 提供使用.
# 创建 __init__.py
# 安装nameko模块 pip install flask_nameko
# 将nameko注册在flask_app当中, 目的就是和项目一体化, 另一方面就是可以缓解对队列的消耗(节省资源)
from flask import Flask
from flask_nameko import FlaskPooledClusterRpcProxy
rpc = FlaskPooledClusterRpcProxy()
app = Flask(__name__)
app.config.update(dict(
NAMEKO_AMQP_URI='amqp://guest:guest@localhost'
))
rpc.init_app(app)
from flask import Response, request
from __init__ import (
app,
rpc
)
@app.route('/', methods=['POST'])
def index():
rpc.hello_service.hello.call_async(request.json)
return Response({"code": 1, "msg": 'nihao'})
@app.route('/one', methods=['POST'])
def index01():
rpc.hello_service01.hello01.call_async(request.json)
return Response({"code": 1, "msg": 'nihao'})
if __name__ == '__main__':
app.run(debug=True)
六、Django中使用nameko案例
1. 创建一个Django项目
2. 创建一个app
3. 安装django-nameko对应模块 pip install django-nameko
4. 在django配置文件settings中配置nameko相关信息
# 远程amqp地址
NAMEKO_CONFIG = {
'AMQP_URI': 'amqp://guest:guest@localhost'
}
# 设置连接超时
NAMEKO_TIMEOUT = 10
# nameko_tools.py
from django_nameko import get_pool
def publish_msg(data):
try:
with get_pool().next() as rpc:
res = rpc.hello_service.hello.call_async(data)
return 0, res.result()
except Exception as e:
print(e)
return 1, e
# apis.py
from rest_framework.response import Response
from rest_framework.views import APIView
from nameko_tools import publish_msg
class NameKoAPI(APIView):
def post(self, request):
code, res = publish_msg(self.request.data)
if code == 0:
return Response({'code': 0, "msg": "%s"% res})
return Response({'code': -1, "msg": "%s"% res})
● 以上均为消息发布在Flask和Django中的使用案例, 使用当中需单独运行一个nameko服务用于订阅消息:
# nameko_server.py
from nameko.rpc import rpc
class Hello_service:
name = "hello_service" # 服务名称
@rpc # 标记点入口
def hello(self, data):
print(data)
return "Hello, {}!".format(data)
class Hello_service01:
name = "hello_service01"
@rpc
def hello01(self, data):
print(data)
return "Hello, {}!".format(data)
# 启动命令(所在文件目录):
1. nameko run nameko_server
2. nameko run nameko_server --broker amqp://RabbitMq_username:RabbitMq_password@主机地址