1. 概述
一个第三方的分布式服务程序, 为别的分布式程序服务,存储的都是状态数据
数据保管(分布式中每一台的状态信息、数据信息)
节点监听(监听掉线)
分布式服务器主从选举、掉线分配管理
配置管理(管理分布式服务器的通用配置参数、文件,用于配置更新)
分布式共享锁
2. 基本要求
因此zookeeper要求自身高可靠的分布式集群(奇数个节点最少一台,一般三台,半数以上节点存活,zk就能正常服务)
3. zookeeper的集群角色分配原理
三台机器如图,需要配置id分别为1、2、3,配置文件决定zookeeper集群有哪些备选节点
首先启动mini1,先在集群中投票,发现只有自己启动就投票给自己,此时mini1 一票
再启动mini2,在集群中mini1发现mini2上线,给mini2投一票;mini2先给自己投一票,再给mini1投一票。此时mini1有2票,mini2也有2票
mini1、2各有两票,假设机制是根据id大小来决定,由于mini2的id>mini1的id,那么mini2和mini1都会投票给mini2。此时mini2有4票,mini1有2票,那么mini2做为leader,mini1作为follower
最后mini3上线,发现mini2已经是一个leader,自己就作为follower。
当有数据提交到follower的时候,follower都会将数据直接转发给leader,再由leader分发给集群中的follower
4. zookeeper官网:https://zookeeper.apache.org/
5. 配置zookeeper
5.1 配置文件(需要将zoo_sample.cfg改成zoo.cfg)
/conf/zoo.cfg 配置详解:https://www.cnblogs.com/xiohao/p/5541093.html
5.2 配置实例
(这个配置在zookeeper集群中每一台都相同) 注意mini1,mini2, mini3需要映射ip,如果不想做主机名-IP映射,就改成主机ip
对配置文件的添加或者修改用粗体标明了:
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/kluter/zookeeper-3.4.11/data
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
#configure myid and port1/port2
#port1:default lead/follower port 2888
#prot2:default vote port 3888
server.1=zookeeper1:2888:3888
server.2=zookeeper2:2888:3888
server.3=zookeeper3:2888:3888
5.3 分别进入每一台zookeeper数据目录配置myid
第一台为1, 第二台为2, 第三台为3
# cd /kluter/zookeeper-3.4.11/data/
# echo 1 > myid
5.4 运行zookeeper集群 zkServer.sh
这里参考一偏连接异常的文章,主要是配置文件的主机名问题
https://www.cnblogs.com/xiaohua92/p/5460515.html
#./bin/zkServer.sh start
#jps
#./bin/zkServer.sh status
出现Error contacting service. It is probably not running. 说明集群状态异常。一台zookeeper也会异常,因为至少需要2台,最好是官方的3台或3台以上的奇数台
需要停止iptables服务,否则2888、3888端口不通
#systemctl stop firewalld.service
5.5 命令行本地客户端连接
#/bin/zkCli.sh#显示[zk: localhost:2181(CONNECTED) 0] 表示命令行连接本机成功
[zk: localhost:2181(CONNECTED) 0]help
ZooKeeper -server host:port cmd args
stat path [watch]
set path data [version]
ls path[watch]#ls / //查看根znode下面的节点
delquota [-n|-b] path
ls2 path [watch]
setAcl path acl
setquota -n|-b val path
history
redo cmdno
printwatches on|off
delete path [version]
sync path
listquota path
rmr path
get path[watch]#get /app1 查看节点app1的数据
create [-s] [-e] path data acl
addauth scheme auth
quit
getAcl path
close
connect host:port
[zk: localhost:2181(CONNECTED) 0]connect 10.10.77.192:2181#连接另一台
5.6 zookeeper目录结构
参考zkCli.sh详解:https://blog.csdn.net/xyang81/article/details/53053642
PS:这里的断开连接指的是客户端断开与服务器的连接,客户端不断开连接,创建的EPHEMERAL会一直存在。客户端断开了,EPHEMERAL节点会被服务器自动删除
5.6.1 创建两种类型节点并添加数据
短暂连接-e(ephemeral)用于集群中节点掉线
[zk: localhost:2181(CONNECTED) 10]create -e /app-emphemeral 88888
[zk: localhost:2181(CONNECTED) 10]quit #退出,那么短暂连接的节点自己删除了
#./zkCli.sh #再次进入命令行模式
[zk: localhost:2181(CONNECTED) 0]ls / 查看验证结果
反之用-s则退出后不会删除节点
[zk: localhost:2181(CONNECTED) 2] create -s /testZnode alsdkjfslkdfj
[zk: localhost:2181(CONNECTED) 10] ls /
[zookeeper, app1, testZnode0000000002] #发现自动在testZnode增加了序号
5.6.2 更新节点数据
set /testZnode0000000002 11111111
5.6.3 监听节点数据更改
[zk: 10.10.77.191:2181(CONNECTED) 10] get /testZnode0000000002 watch
[zk: 10.10.77.192:2181(CONNECTED) 10] set /testZnode0000000002 xxxxxxx
192上更新数据后,191上自动收到消息,但是只生效一次,而且监听不到子节点创建
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/testZnode0000000002
5.6.4 监听子节点【数据】
[zk: 10.10.77.191:2181(CONNECTED) 10]ls /testZnode00000000002 watch
[zk: 10.10.77.192:2181(CONNECTED) 10] create /testZnode00000000002/crate 11111
192上创建子节点后,191上自动收到消息,而且只有数据更新会收到消息
type:NodeChildrenChanged path:/testZnode0000000002
5.6.5 节点删除
删除一个节点 delete /testZnode0000000002/testnode3
递归删除根节点 rmr /testZnode0000000002
6. zookeeper的java api
zookeeper API文档地址:https://zookeeper.apache.org/doc/r3.4.11/api/index.html
6.1. 方式一:本地导入zookeeper包
看到这里,我假设大家都有java基础并且使用eclipse进行过开发
在eclipse中新建project:zookeeper,并增加一个lib目录
解压zookeeper-3.4.11.tar.gz,将zookeeper中的7个jar包全部导入eclipse的lib目录中并且bulild path
new一个class:
测试代码:
测试方式就是右键run Junit test
6.2. 方式二:使用maven自动导包
新建一个zookeeper的maven project,创建过程参考:https://www.jianshu.com/p/662a8291e0e3
这里只给出maven的pom.xml:
注意:
process()方法只是zkCli的一个回调函数,如果设置监听为true,当监听的节点发生了变化,就会调用process回调函数。监听功能是一个Daemon守护线程,当主线程退出,守护线程也会退出。
关于守护线程,请参看:https://www.jianshu.com/p/303507fc8b6d 的第10. 守护线程
运行创建节点的代码发生Excepption:MarshallingErrorException。原因是第三个参数不能为null,必须写一个完整的。修改如下:
运行虽然成功了,但是去命令行查看创建的新节点Kluter并不存在,原因是CreateMode使用的是EPHEMERAL,客户端终止后就自动删除了。改为CreateMode.PERSISTENT,即便客户端终止,节点Kluter也不会消失了。
之前的代码并不能监听节点的变化,下面getChildren方法,第二个参数设置为true可以监听"/"下的所有节点。
测试监听:
运行代码:
休眠是为了让程序不退出。接着再命令行删除程序创建的节点kluter,这样程序的console会打印:
但是,再删除一个根下的节点,并不会有监听事件发生,说明初始化中的监听事件只会发生一次,如果希望持续发生,则每次监听到事件之后需要再设置下一次的监听。修改初始化代码如下:
余下代码和6.1. 中的代码相同,不再赘述。
7. 客户端动态感知集群中的服务器上下线
架构解析:
1. 上半部分是服务器集群,每台服务器上线的时候都需要到zookeeper上去创建节点,同一类服务器在同一个根节点下创建子节点(使用EPHEMERAL_SEQUENTIAL临时序列化增长的模式)。
2. 用户的客户端程序启动时,每次到zookeeper去获取某一类服务器根节点下面的子节点,并getChildren做监听,就能知道服务器的online和offline
3. 负载均衡:当有用户需要做业务查询时,先比较每个服务器节点的data(data中记录的是连接数),选取连接数最小的去做查询,并且set data连接数+1,查询完成后设置连接数-1.
7.1. 程序实现
每一台server端运行代码:
public class DistributedSrv {
private static final String connectString = "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181";
private static final int sessionTimeout = 2000;
private static final String parentNode = "/servers";
private ZooKeeper zkHandler = null;
public void getConnect() throws Exception {
//get zk connection
zkHandler = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent event) {
System.out.println(event.getType() + "#############" + event.getPath());
try {
zkHandler.getChildren("/", true);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
public void registerServer(String hostname) throws Exception {
String createPath = zkHandler.create(parentNode + "/server", hostname.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname + "is online" + createPath);
}
public void handleBusiness(String hostName) throws Exception {
System.out.println(hostName + " start working......");
Thread.sleep(Long.MAX_VALUE);
}
}
每一台客户端需要运行:
public class DistributedCli {
private static final String connectString = "10.10.77.191:2181,10.10.77.192:2181,10.10.77.193:2181";
private static final int sessionTimeout = 2000;
private static final String parentNode = "/servers";
private volatile List<String> serverList; //volatile: muti-thread do with the serverList with no copy
private ZooKeeper zkHandler = null;
public void getConnect() throws Exception {
//get zk connection
zkHandler = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
public void process(WatchedEvent event) {
System.out.println(event.getType() + "#############" + event.getPath());
try {
getServerList();//node changed, update node list
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
public void getServerList() throws Exception {
List<String> childNode = zkHandler.getChildren(parentNode, true); //register watch
//create a local srvList
ArrayList<String> srvList = new ArrayList<String>();
for(String nodeName: childNode) {//show nodeName
byte[] nodeData = zkHandler.getData(parentNode + "/" + nodeName, false, null);
srvList.add(new String(nodeData));
}
//give to the object member
serverList = srvList;
System.out.println(srvList);
}
public void handleBusiness() throws Exception {
System.out.println("client do business......");
Thread.sleep(Long.MAX_VALUE);
}
}
7.2. 打包并用java命令运行程序
由于我们编辑、编译、运行代码是在eclipse中完成的,但是这样并不能够在linux环境下用脚本或者命令来执行。我们可以通过打包的方式来运行调试好的程序:
下面以server端为例,打包步骤:
1. 先运行一下调试好的程序,否则添加jar包不会又相应的选项
2.
3.
4.
5. 打包成功后,可以在命令行直接运行了
-jar: 后面跟刚才打包好的zk.jar包
zookeeper1:是程序中的参数arg[0]
8. 使用Zookeeper实现分布式共享锁的功能
在多线程同步中:https://www.jianshu.com/p/6f98f03430eb, 我们学习了如何在多个线程间保证同步访问。但如果遇到多个客户端访问某个服务器同一个资源的情况就不合适了,多个客户端相当于是多个不同的进程,进程间想要同步,这里就可以使用zookeeper来实现。
实现步骤:
1. 程序节点启动时,到zk上注册一个节点(EPHEMERAL_SEQUENTIAL),并监听其父节点
2. 获取父节点下的所有程序子节点,比较序号的大小
3. 因为序号是单调递增的,让序号最小的获取到“锁”(不是一个真正的锁,不是线程同步锁),去访问资源,访问完后,删除自己的节点(相当于释放锁),并且重新注册一个新的节点,序号又+1增长
4. 其他程序节点会收到时间通知,则可以去zk上看是否节点序号是最小的,最小的才能获取锁
伪代码:
//getConnect()
//registerLock(/lock/app.EPHEMERAL_SEQUENTIAL)
//getLock(){获取子节点,比较自己的序号是否是最小的,如果是,则返回锁获取成功}
//访问资源
//释放锁releaseLock(){删除自己的子节点,并创建一个新的节点}
//监听器
process(){getLock()}
完整代码:
public class DistributedCliLock {
private static final int SESSION_TIMEOUT = 5000;
private String hosts = "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181";
private String groupNode = "locks";
private String subNode = "sub";
private boolean haveLock = false;
private ZooKeeper zk;
private volatile String thisPath; //record the childNode path of itself
/**
*
* connect to zookeeper
*/
public void connectZookeeper() throws Exception{
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher(){
public void process(WatchedEvent event){
try{
//check event type, only process children Node
if(event.getType() == EventType.NodeChildrenChanged && event.getPath().equals("/" + groupNode)){
//get child node, and watch parent node
List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
String thisNode = thisPath.substring(("/" + groupNode + "/").length());
//sort and compare the id
Collections.sort(childrenNodes);
if(childrenNodes.indexOf(thisNode) == 0){
//critical section and release the node
doSomething();
//re-create a node, id+1
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
}catch(Exception e){e.printStackTrace();}
}
});
// regist node at the first time
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
//wait for a while, in order to see the result
Thread.sleep(new Random().nextInt(1000));
//get all the sub nodes
List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
//only one client, you can do something right now
if(childrenNodes.size() == 1){
doSomething();
thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
}
}
//do with the critical section, at the end of the function, release lock
private void doSomething() throws Exception {
try {
out.println("gain lock: " + thisPath);
Thread.sleep(2000);
} finally {
out.println("finished: " + thisPath);
zk.delete(this.thisPath, -1);
}
}
public static void main(String[] args) throws Exception {
DistributedCliLock dl = new DistributedCliLock();
dl.connectZookeeper();
Thread.sleep(Long.MAX_VALUE);
}
}