zookeeper主要的功能是数据的存储与分配,zookeeper的API可以根据开发者的需求设计不同的工作模式,因此可以适用于很多场景之中。
这里用python写了一个例子来实现简单的发布订阅功能。例子中实现了发布、监视的功能,完整的发布订阅还有很多功能需要实现,都可以通过zookeeper协助实现,例子中实现了server发布和client接收的功能。
- 首先我们需要搭建一个zookeeper集群,这里就不详讲了,可以参考https://www.jianshu.com/p/b76628b82208。例子中用了3台虚拟机每台分别搭建一个zookeeper服务来实现zookeeper集群。
- 安装kazoo
pip3 install kazoo
- 发布端代码
#!/bin/usr/python
#_*_ coding: utf-8 _*_
import sys
from myzk import ZooKeeper
zk = ZooKeeper(hosts='192.168.137.129:2181,192.168.137.130:2181,192.168.137.131:2181')
path = "/monitor/" + sys.argv[1]
data = sys.argv[2]
zk.create_nodes(path, data)
- 接收端代码
#!/bin/usr/python
#_*_ coding: utf-8 _*_
import sys
from myzk import ZooKeeper
zk = ZooKeeper(hosts='192.168.137.129:2181,192.168.137.130:2181,192.168.137.131:2181')
zk.watch_child_node("/monitor")
- zookeeper类
#!/bin/usr/python
#_*_ coding: utf-8 _*_
import time
from kazoo.client import KazooClient,ChildrenWatch,DataWatch
class ZooKeeper(KazooClient):
def __init__(self, timeout=15, *args, **kwargs):
super().__init__(timeout=timeout, *args, **kwargs)
self.start()
def __del__(self):
self.stop()
def create_path(self, zk_path):
self.ensure_path(zk_path)
return
def update_nodes_data(self, zk_path, data):
self.set(zk_path, bytes(data, encoding="utf8"))
return
def create_nodes(self, zk_path, data):
self.create(zk_path, bytes(data, encoding="utf8"))
return
def get_data(self, zk_path):
if self.exists(zk_path):
return self.get(zk_path)
else:
print("node not exists")
return
def watch_child_node(self, zk_path):
@ChildrenWatch(client=self, path=zk_path, send_event=True)
def get_changes(children, event):
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + " node childs had been change")
print("now: ", children)
if event:
print(event)
return children
while True:
time.sleep(5)
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + " watching...")
def watch_node_data(self, zk_path):
@DataWatch(client=self, path=zk_path)
def watch_node(data, stat):
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + " data changed")
if data:
print("Version: %s, data: %s" % (stat.version, data.decode("utf8")))
while True:
time.sleep(5)
print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + " watching...")
把接收端和Zookeeper类的代码放在三台server上,然后均运行接收端代码,然后随便选一台server运行发布端代码,每创建一个新节点,三个接收端都会发现。
运行结果:
我们随机找一台服务器运行发布端代码
接收端1
接收端2
接收端3
可以看出每次新增一个znode,接收端都能响应。