在实际生产环境当中,面对高并发、大流量等的复杂环境,分布式架构是非常常见的架构,分布式架构可以增大系统容量,加强模块可用性、扩展性强;不同企业面对不同的环境,架构也各不相同,但万变不离其中,本文就来探讨分布式架构中的使用ZooKeeper实现服务注册与服务发现。
服务注册与服务发现
服务注册,简而言之,就是当一个服务节点上线的时候,能够注册到配置服务当中去,通过配置服务节点向外部正式提供服务,其他需要依赖该服务的服务可以通过查询配置服务获得该节点的具体地址,并发起服务请求。服务注册所描述的对象是被依赖的服务节点。
服务发现就是当一个服务需要依赖别的服务的时候,可以通过配置服务的查询,获得依赖可用的节点,进而进行调用,服务发现所描述得对象是需要依赖其他服务的节点。
在这里,服务注册和发现需要解决的一个十分关键的问题是节点保活,也即是配置服务需要保证进行注册过来提供服务的节点一定是可用的,在生产环境当中常采用心跳机制保证注册节点的可用。
ZooKeeper的Znode有四种节点:
-
PERSISTENT——持久化节点
客户端与ZooKeeper断开连接后,该节点依旧存在
-
PERSISTENT_SEQUENTIAL——持久化顺序编号节点
客户端与ZooKeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号
-
EPHEMERAL——临时节点
客户端与ZooKeeper断开连接后,该节点被删除
-
EPHEMERAL_SEQUENTIAL——临时顺序编号节点
客户端与ZooKeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号
重点考察第三种节点,临时节点,采用临时节点可以实现类似心跳的机制,若服务与ZooKeeper保持连接,则认为服务在线,服务可用;若服务与ZooKeeper关闭连接即认为服务下线了,服务变为不可用。
服务注册与服务发现:使用ZooKeeper维护的Znode可以维持服务注册节点信息,比如维护一个tornado服务的服务列表;那么可以在ZooKeeper维护以下的目录结构。
/zk_registry/data/tornado_app/
这时tornado服务节点No.1上线了,地址为:192.168.149.134:2181,维护目录更新如下:
/zk_registry/data/tornado_app/192.168.149.134:2181
接着tornado服务节点No.2上线了,地址为:192.168.149.134:2182,那么更新目路节点如下:
/zk_registry/data/tornado_app/192.168.149.134:2182
这时使用zk的命令行可以获得以下信息:
ls /zk_registry/data/tornado_app/
# 192.168.149.134:2181 192.168.149.134:2182
如果使用临时节点,当节点:192.168.149.134:2182下线的时候,相关记录会自动消失
ls /zk_registry/data/tornado_app/
# 192.168.149.134:2181
基于此过程,我们可以实现服务注册和服务发现的机制。
Kazoo 实现
配置文件
# config.py
# 数据路径
ZK_REGISTER_PATH = '/zk_registry/data'
# zk地址
ZK_SERVER_URL = '192.168.149.134:2181'
# 服务标识
TORNADO_APP_LABEL = 'tornado_app'
ZooKeeper操作管理类
# manager.py
import random
from kazoo.client import KazooClient
import config
class SRSDManager(object):
def __init__(self, zk_service_url):
'''
初始化
:param zk_service_url: zk集群地址,多个用“,”分割
'''
self.zk_client = None
self.service_map = {}
logging.basicConfig()
print('Init with zk address: ' + str(zk_service_url))
zk_client = KazooClient(zk_service_url)
zk_client.start()
self.zk_client = zk_client
print('Init ZookeeperManager success.')
def register_service(self, service_label, service_address):
'''
注册服务
:param service_label: 服务标记,标记属于什么服务
:param service_address: 需要注册的服务地址
:return:
'''
pass
def discover_service(self, service_label, all_variable_service=True):
'''
服务发现
:param service_label: 服务标记,标记属于什么服务
:param all_variable_service: 是否返回所有可用服务地址列表,默认True
:return: 服务列表
'''
pass
if __name__ == '__main__':
pass
服务注册
def register_service(self, service_label, service_address):
'''
注册服务
:param service_label: 服务标记,标记属于什么服务
:param service_address: 需要注册的服务地址
:return:
'''
zk_client = self.zk_client
path = '%s/%s' % (config.ZK_REGISTER_PATH, service_label)
zk_client.ensure_path(path)
node = '%s/%s' % (path, service_address)
print('Create node with data: %s => %s' % (node, service_address))
zk_client.create(node, b'', ephemeral=True)
return True
服务发现
def discover_service(self, service_label, all_variable_service=True):
'''
服务发现
:param service_label: 服务标记,标记属于什么服务
:param all_variable_service: 是否返回所有可用服务地址列表,默认True
:return: 服务列表
'''
zk_client = self.zk_client
path = '%s/%s' % (config.ZK_REGISTER_PATH, service_label)
nodes = zk_client.get_children(path)
if all_variable_service:
return nodes
else:
return random.sample(nodes, 1)
管理类的简单测试
if __name__ == '__main__':
manager = SRSDManager(config.ZK_SERVER_URL)
manager.register_service('hello', 'world')
service_list = manager.discover_service('hello')
print(service_list)
测试服务注册
可以实现一个简单的应用使用上述实现的API进行服务注册,以下使用tornado实现一个简单的app。
# service.py
import json
import time
import socket
import platform
import tornado.ioloop
import tornado.web
import config
import srsd.manager
class MainHandler(tornado.web.RequestHandler):
def get(self):
cpu_info = platform.processor()
system_info = platform.platform()
localtime = time.strftime("%a %b %d %H:%M:%S %Y", time.localtime())
m = {}
m['cpu_info'] = cpu_info
m['system_info'] = system_info
m['localtime'] = localtime
self.write(json.dumps(m))
def make_app():
return tornado.web.Application([
(r"/info", MainHandler),
])
def register_service(service_label, ip, port):
zkm = srsd.manager.SRSDManager(config.ZK_SERVER_URL)
service_addr = '%s:%d' % (ip, port)
zkm.register_service(service_label, service_addr)
if __name__ == "__main__":
app = make_app()
service_label = config.TORNADO_APP_LABEL
port = 8888
ip = socket.gethostbyname(socket.gethostname())
register_service(service_label, ip, port)
print('Start tornado app at port: ' + str(port))
app.listen(port)
tornado.ioloop.IOLoop.current().start()
测试服务发现
可以实现一个客户端测试服务发现的功能,如下:
# client.py
import requests
import random
import config
from srsd.manager import SRSDManager
zkm = SRSDManager(config.ZK_SERVER_URL)
def tornado_app_client():
# 通过zk查找可用service
available_services = zkm.discover_service(config.TORNADO_APP_LABEL)
# 随机挑选
index = random.randint(0, len(available_services) - 1)
service = available_services[index]
# 发送请求
api = '/info'
url = 'http://%s%s' % (service, api)
print("Request url: ", url)
response = requests.get(url)
print(response.text)
if __name__ == '__main__':
tornado_app_client()