consul简介
consul是一种分布式管理工具。主要可以作为服务发现或分布式配置工具来使用。此处默认您已经安装好consul并启动。
1、服务注册有什么用?
通俗来讲,就是可以知道你有多少种服务在用,是否可用(consul有服务监控检查)。服务集群有哪些节点。其次是API网关可以通过服务中心获取到服务转发的地址。例如consul有个服务的集群名字为user-groups,我们要访问这个集群的api接口,接口的endpoint为/user/info。通过zuul可以这样访问:直接访问 http://<zuul host>:<zuul port>/user-groups/user/info。zuul会通过consul自己找到该转发的api地址。
2、分布式配置工具
分布式配置工具将配置存在内存中,你的项目可以在不重启的情况下,读取一个可以动态改变的配置的值。
python通过consul完成服务注册
import consul
import threading
from flask import Flask
from random import choice, choices
class AiopsConsul(object):
def __init__(self, consul_host="127.0.0.1", consul_port=8500, host="127.0.0.1"):
"""初始化,连接consul服务器"""
self.host = host
self._consul = consul.Consul(consul_host, consul_port)
self.service_weight = {}
def register_service(self, name, host, port, tags=None):
tags = tags or []
# 注册服务
self._consul.agent.service.register(
name,
name,
host,
port,
tags,
# 健康检查ip端口,检查时间:5,超时时间:30,注销时间:30s
check=consul.Check().tcp(host, port, "5s", "30s", "30s"))
def get_service(self, name):
_, nodes = self._consul.health.service(service=name, passing=True)
if len(nodes) == 0:
raise Exception('service is empty.')
weights = []
for node in nodes:
service = node.get('Service')
address = "http://{0}:{1}".format(service['Address'], service['Port'])
if self.service_weight.get(address) is None:
self.service_weight[address] = 100
weights.append(self.service_weight[address])
service = choices(nodes, weights=weights, k=1)[0].get("Service")
return "http://{0}:{1}".format(service['Address'], service['Port'])
def update_weight(self, address, success):
if success is True:
self.service_weight[address] = max(100, self.service_weight[address]+5)
else:
self.service_weight[address] = max(100, self.service_weight[address]/2)
def app(self):
app = Flask(__name__)
@app.route('/health', methods=['GET'])
def get_task():
return '{"status":"UP"}'
app.run(debug=False, host='0.0.0.0', port=8893)
def health(self):
"""注册服务"""
self.register_service("XXXXX", self.host, 8893)
consul.Check().tcp(self.host, 8893, "5s", "30s", "30s")
t = threading.Thread(target=self.app, name='LoopThread')
t.start()
if __name__ == '__main__':
c = AiopsConsul('default.consul_url')
print(c.get_service("task"))
python通过consul完成服务发现
我们实现Feign类:一个使用起来更加方便的 HTTP 客户端。采用接口的方式, 将需要调用的其他服务的方法定义成抽象方法, 不需要自己构建 http 请求。然后就像是调用自身工程的方法调用,而感觉不到是调用远程方法,使得编写 客户端变得非常容易。
import requests
import logging
from manage.aiops_consul import AiopsConsul
class XXXXXFeign(object):
def __init__(self, project='aiops-station-model'):
"""初始化,连接consul服务器"""
self.sr = SR(project)
self.aiops_consul = AiopsConsul(consul_host=self.sr.getProperty('default.consul_url'),
consul_port='default.consul_port')
self.client = requests.session()
self.headers = {'Content-Type': 'application/json', 'Connection': 'keep-alive',
'buc-auth-token': 'default.buc-auth-token'}
def get(self, uri):
try_count = 0
while try_count < 5:
address = self.aiops_consul.get_service('service')
url = '{0}{1}'.format(address, uri)
response = self.client.get(
url=url,
headers=self.headers)
if response.status_code is 200:
self.aiops_consul.update_weight(address, success=True)
return response.content
self.aiops_consul.update_weight(address, success=False)
try_count += 1
logging.warning('{0}: status_code is {1}'.format(url, response.status_code))
raise Exception('service service is error')
def post(self, uri, data):
try_count = 0
while try_count < 5:
address = self.aiops_consul.get_service('service)
url = '{0}{1}'.format(address, uri)
response = self.client.post(
url=url,
headers=self.headers,
data=data)
if response.status_code is 200:
self.aiops_consul.update_weight(address, success=True)
return response.content
self.aiops_consul.update_weight(address, success=False)
try_count += 1
logging.warning('{0}: status_code is {1}'.format(url, response.status_code))
raise Exception('service service is error')
def get_task(self, task_id, user):
return self.get('/task/get/{0}?user={1}'.format(task_id, user))
def get_and_run_task(self, task, user):
return self.get('/task/getAndRun/{0}?user={1}'.format(task, user))
if __name__ == '__main__':
feign = AiopsStarLinkFeign()
print(feign.get_task('', user='sun'))
print(feign.get_and_run_task(task="task", user='sun'))