一 Zookeeper
入门
1️⃣概述
Zookeeper
是一个开源的分布式的,为分布式应用提供协调服务的Apache
项目。
2️⃣特点
3️⃣数据结构
4️⃣应用场景 : 提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。
5️⃣下载地址
官网首页:https://zookeeper.apache.org/
二 Zookeeper
的安装
1️⃣本地模式安装部署
1.安装前准备
(1)安装Jdk
(2)拷贝Zookeeper
安装包到Linux
系统下
(3)解压到指定目录tar -zxvf zookeeper-3.4.10.tar.gz -C /opt/module/
2.配置修改
(1)将/opt/module/zookeeper-3.4.10/conf
这个路径下的zoo_sample.cfg
修改为zoo.cfg
;mv zoo_sample.cfg zoo.cfg
(2)打开zoo.cfg
文件,修改dataDir
路径:vim zoo.cfg
修改如下内容:dataDir=/opt/module/zookeeper-3.4.10/zkData
(3)在/opt/module/zookeeper-3.4.10/
这个目录上创建zkData
文件夹;mkdir zkData
3.操作Zookeeper
(1)启动Zookeeper
;bin/zkServer.sh start
(2)查看进程是否启动;jps
(3)查看状态:bin/zkServer.sh status
(4)启动客户端:bin/zkCli.sh
(5)退出客户端:quit
(6)停止Zookeeper :bin/zkServer.sh stop
2️⃣配置参数解读 :
Zookeeper
中的配置文件zoo.cfg
中参数含义解读如下.
1.tickTime =2000
:通信心跳数,Zookeeper
服务器与客户端心跳时间,单位毫秒Zookeeper
使用的基本时间,服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个tickTime
时间就会发送一个心跳,时间单位为毫秒。它用于心跳机制,并且设置最小的session
超时时间为两倍心跳时间。(session
的最小超时时间是2*tickTime
)
2.initLimit =10
:LF
初始通信时限集群中的Follower
跟随者服务器与Leader
领导者服务器之间初始连接时能容忍的最多心跳数(tickTime
的数量),用它来限定集群中的Zookeeper
服务器连接到Leader
的时限。
3.syncLimit =5
:LF
同步通信时限集群中Leader
与Follower
之间的最大响应时间单位,假如响应超过syncLimit * tickTime
,Leader
认为Follwer
死掉,从服务器列表中删除Follwer
。
4.dataDir
:数据文件目录+数据持久化路径主要用于保存Zookeeper
中的数据。
5.clientPort =2181
:客户端连接端口监听客户端连接的端口。
三 Zookeeper
实战
1️⃣分布式安装部署
1.集群规划 : 在hadoop102
、hadoop103
和hadoop104
三个节点上部署Zookeeper
。
2.解压安装
(1)解压Zookeeper
安装包到/opt/module/
目录下 :tar -zxvf zookeeper-3.4.10.tar.gz -C /opt/module/
(2)同步/opt/module/zookeeper-3.4.10
目录内容到hadoop103
、hadoop104
:xsync zookeeper-3.4.10/
3.配置服务器编号
(1)在/opt/module/zookeeper-3.4.10/
这个目录下创建zkData
:mkdir -p zkData
(2)在/opt/module/zookeeper-3.4.10/zkData
目录下创建一个myid
的文件touch myid
添加myid
文件,注意一定要在linux
里面创建,外部创建很可能会乱码
(3)编辑myid
文件 :vi myid
在文件中添加与server
对应的编号2
(4)拷贝配置好的zookeeper
到其他机器上 :xsync myid
,并分别在hadoop103
、hadoop104
上修改myid
文件中内容为3
、4
;
4.配置zoo.cfg
文件
(1)重命名/opt/module/zookeeper-3.4.10/conf
这个目录下的zoo_sample.cfg
为zoo.cfg
:mv zoo_sample.cfg zoo.cfg
(2)打开zoo.cfg
文件 :vim zoo.cfg
修改数据存储路径配置dataDir=/opt/module/zookeeper-3.4.10/zkData
增加如下配置#######################cluster########################## server.2=hadoop102:2888:3888 server.3=hadoop103:2888:3888 server.4=hadoop104:2888:3888
(3)同步
zoo.cfg
配置文件 :xsync zoo.cfg
(4)配置参数解读 :server.A=B:C:D
。
A
是一个数字,表示这个是第几号服务器;集群模式下配置一个文件myid
,这个文件在dataDir
目录下,这个文件里面有一个数据就是A
的值,Zookeeper
启动时读取此文件,拿到里面的数据与zoo.cfg
里面的配置信息比较从而判断到底是哪个server
。
B
是这个服务器的地址;
C
是这个服务器Follower
与集群中的Leader
服务器交换信息的端口;
D
是万一集群中的Leader
服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader
,而这个端口就是用来执行选举时服务器相互通信的端口。
- 集群操作
(1)分别启动Zookeeper
;bin/zkServer.sh start
(2)查看状态;bin/zkServer.sh status
2️⃣客户端命令行操作
1.启动客户端 :bin/zkCli.sh
2.显示所有操作命令 :help
3.查看当前znode中所包含的内容 :ls /
4.查看当前节点详细数据 : 'ls2 /'
5.分别创建2个普通节点 :create /sanguo "jinlian"
,create /sanguo/shuguo "liubei"
6.获得节点的值 :get /sanguo
7.创建短暂节点 :create -e /sanguo/wuguo "zhouyu"
(1)在当前客户端是能查看到的 :ls /sanguo
(2)退出当前客户端然后再重启客户端 :quit
,bin/zkCli.sh
(3)再次查看根目录下短暂节点已经删除 :ls /sanguo
8.创建带序号的节点
(1)先创建一个普通的根节点/sanguo/weiguo
:create /sanguo/weiguo "caocao"
(2)创建带序号的节点 :create -s /sanguo/weiguo/xiaoqiao "jinlian" create -s /sanguo/weiguo/daqiao "jinlian" create -s /sanguo/weiguo/diaocan "jinlian"
如果原来没有序号节点,序号从0开始依次递增。如果原节点下已有2个节点,则再排序时从2开始,以此类推。
9.修改节点数据值 :set /sanguo/weiguo "simayi"
10.节点的值变化监听
(1)在hadoop104
主机上注册监听/sanguo节点数据变化 :get /sanguo watch
(2)在hadoop103
主机上修改/sanguo
节点的数据 :set /sanguo "xisi"
(3)观察hadoop104
主机收到数据变化的监听WATCHER:: WatchedEvent state:SyncConnected type:NodeDataChanged path:/sanguo
11.节点的子节点变化监听(路径变化)
(1)在hadoop104
主机上注册监听/sanguo
节点的子节点变化 :ls /sanguo watch
(2)在hadoop103
主机/sanguo
节点上创建子节点
create /sanguo/jin "simayi"
(3)观察hadoop104
主机收到子节点变化的监听WATCHER:: WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/sanguo
12.删除节点 :
delete /sanguo/jin
13.递归删除节点 :rmr /sanguo/shuguo
14.查看节点状态 :stat /sanguo
3️⃣
API
应用
1.创建一个Maven
工程
2.添加pom
文件<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.10</version> </dependency> </dependencies>
3.拷贝
log4j.properties
文件到项目根目录 : 需要在项目的src/main/resources
目录下,新建一个文件,命名为“log4j.properties”
,在文件中填入log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n log4j.appender.logfile=org.apache.log4j.FileAppender log4j.appender.logfile.File=target/spring.log log4j.appender.logfile.layout=org.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
- 创建
ZooKeeper
客户端private static String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; private static int sessionTimeout = 2000; private ZooKeeper zkClient = null; @Before public void init() throws Exception { zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { // 收到事件通知后的回调函数(用户的业务逻辑) System.out.println(event.getType() + "--" + event.getPath()); // 再次启动监听 try { zkClient.getChildren("/", true); } catch (Exception e) { e.printStackTrace(); } } });
- 创建子节点
// 创建子节点 @Test public void create() throws Exception { // 参数1:要创建的节点的路径; 参数2:节点数据 ; 参数3:节点权限 ;参数4:节点的类型 String nodeCreated = zkClient.create("/xxx", "jinlian".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); }
- 获取子节点并监听节点变化
// 获取子节点 @Test public void getChildren() throws Exception { List<String> children = zkClient.getChildren("/", true); for (String child : children) { System.out.println(child); } // 延时阻塞 Thread.sleep(Long.MAX_VALUE); }
- 判断
Znode
是否存在// 判断znode是否存在 @Test public void exist() throws Exception { Stat stat = zkClient.exists("/eclipse", false); System.out.println(stat == null ? "not exist" : "exist"); }
4️⃣ 监听服务器节点动态上下线案例(扩展)
1.需求 : 某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。
2.需求分析3.具体实现
(1)先在集群上创建/servers
节点 :create /servers "servers"
;
(2)服务器端向Zookeeper
注册代码package com.xxx.zkcase; import java.io.IOException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; public class DistributeServer { private static String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; private static int sessionTimeout = 2000; private ZooKeeper zk = null; private String parentNode = "/servers"; // 创建到zk的客户端连接 public void getConnect() throws IOException{ zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { } }); } // 注册服务器 public void registServer(String hostname) throws Exception{ String create = zk.create(parentNode + "/server", hostname.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println(hostname +" is online "+ create); } // 业务功能 public void business(String hostname) throws Exception{ System.out.println(hostname+" is working ..."); Thread.sleep(Long.MAX_VALUE); } public static void main(String[] args) throws Exception { // 1获取zk连接 DistributeServer server = new DistributeServer(); server.getConnect(); // 2 利用zk连接注册服务器信息 server.registServer(args[0]); // 3 启动业务功能 server.business(args[0]); } }
(3)客户端代码
package com.xxx.zkcase; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; public class DistributeClient { private static String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; private static int sessionTimeout = 2000; private ZooKeeper zk = null; private String parentNode = "/servers"; // 创建到zk的客户端连接 public void getConnect() throws IOException { zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { // 再次启动监听 try { getServerList(); } catch (Exception e) { e.printStackTrace(); } } }); } // 获取服务器列表信息 public void getServerList() throws Exception { // 1获取服务器子节点信息,并且对父节点进行监听 List<String> children = zk.getChildren(parentNode, true); // 2存储服务器信息列表 ArrayList<String> servers = new ArrayList<>(); // 3遍历所有节点,获取节点中的主机名称信息 for (String child : children) { byte[] data = zk.getData(parentNode + "/" + child, false, null); servers.add(new String(data)); } // 4打印服务器列表信息 System.out.println(servers); } // 业务功能 public void business() throws Exception{ System.out.println("client is working ..."); Thread.sleep(Long.MAX_VALUE); } public static void main(String[] args) throws Exception { // 1获取zk连接 DistributeClient client = new DistributeClient(); client.getConnect(); // 2获取servers的子节点信息,从中获取服务器信息列表 client.getServerList(); // 3业务进程启动 client.business(); } }
四 Zookeeper
内部原理
1️⃣节点类型
2️⃣
Stat
结构体
1)czxid
-创建节点的事务zxid
每次修改ZooKeeper
状态都会收到一个zxid
形式的时间戳,也就是ZooKeeper
事务ID
。
事务ID
是ZooKeeper
中所有修改总的次序。每个修改都有唯一的zxid
,如果zxid1
小于zxid2
,那么zxid1
在zxid2
之前发生。
2)ctime
-znode
被创建的毫秒数(从1970年开始)
3)mzxid
-znode
最后更新的事务zxid
4)mtime
-znode
最后修改的毫秒数(从1970年开始)
5)pZxid
-znode
最后更新的子节点zxid
6)cversion
-znode
子节点变化号,znode
子节点修改次数
7)dataversion
-znode
数据变化号
8)aclVersion
-znode
访问控制列表的变化号
9)ephemeralOwner
- 如果是临时节点,这个是znode
拥有者的session id
。如果不是临时节点则是0
。
10)dataLength
-znode
的数据长度
11)numChildren
-znode
子节点数量
3️⃣监听器原理(面试重点)
4️⃣
Paxos
算法(扩展)
Paxos
算法一种基于消息传递且具有高度容错特性的一致性算法。
分布式系统中的节点通信存在两种模型:共享内存(Shared memory)
和消息传递(Messages passing)
。基于消息传递通信模型的分布式系统,不可避免的会发生以下错误:进程可能会慢、被杀死或者重启,消息可能会延迟、丢失、重复,在基础Paxos
场景中,先不考虑可能出现消息篡改即拜占庭错误的情况。Paxos
算法解决的问题是在一个可能发生上述异常的分布式系统中如何就某个值达成一致,保证不论发生以上任何异常,都不会破坏决议的一致性。Paxos
算法流程中的每条消息描述如下:
1.Prepare
:Proposer
生成全局唯一且递增的Proposal ID
(可使用时间戳加Server ID
),向所有Acceptors
发送Prepare
请求,这里无需携带提案内容,只携带Proposal ID
即可。
2.Promise
:Acceptors
收到Prepare
请求后,做出“两个承诺,一个应答”。
两个承诺:
a.不再接受Proposal ID
小于等于(注意:这里是<=
)当前请求的Prepare
请求。
b.不再接受Proposal ID
小于(注意:这里是<
)当前请求的Propose
请求。
一个应答:
c.不违背以前做出的承诺下,回复已经Accept
过的提案中Proposal ID
最大的那个提案的Value
和Proposal ID
,没有则返回空值。
3.Propose
:Proposer
收到多数Acceptors
的Promise
应答后,从应答中选择Proposal ID
最大的提案的Value
,作为本次要发起的提案。如果所有应答的提案Value
均为空值,则可以自己随意决定提案Value
。然后携带当前Proposal ID
,向所有Acceptors
发送Propose
请求。
4.Accept
:Acceptor
收到Propose
请求后,在不违背自己之前做出的承诺下,接受并持久化当前Proposal ID
和提案Value
。
5.Learn
:Proposer
收到多数Acceptors
的Accept
后,决议形成,将形成的决议发送给所有Learners
。
下面我们针对上述描述做三种情况的推演举例:为了简化流程,我们这里不设置Learner
。Paxos
算法缺陷:在网络复杂的情况下,一个应用Paxos
算法的分布式系统,可能很久无法收敛,甚至陷入活锁的情况。造成这种情况的原因是系统中有一个以上的Proposer
,多个Proposers
相互争夺Acceptors
,造成迟迟无法达成一致的情况。针对这种情况,一种改进的Paxos
算法被提出:从系统中选出一个节点作为Leader
,只有Leader
能够发起提案。这样,一次Paxos
流程中只有一个Proposer
,不会出现活锁的情况,此时只会出现例子中第一种情况。
5️⃣选举机制
1)半数机制:集群中半数以上机器存活,集群可用。所以Zookeeper
适合安装奇数台服务器。
2)Zookeeper
虽然在配置文件中并没有指定Master
和Slave
。但是,Zookeeper
工作时,是有一个节点为Leader
,其他则为Follower
,Leader
是通过内部的选举机制临时产生的。
3)以一个简单的例子来说明整个选举的过程。
假设有五台服务器组成的Zookeeper
集群,它们的id
从1-5
,同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是一样的。假设这些服务器依序启动,来看看会发生什么,如下图所示。(1)服务器1启动,发起一次选举。服务器1投自己一票。此时服务器1票数一票,不够半数以上(3票),选举无法完成,服务器1状态保持为LOOKING
;
(2)服务器2启动,再发起一次选举。服务器1和2分别投自己一票并交换选票信息:此时服务器1发现服务器2的ID比自己目前投票推举的(服务器1)大,更改选票为推举服务器2。此时服务器1票数0票,服务器2票数2票,没有半数以上结果,选举无法完成,服务器1,2状态保持LOOKING
(3)服务器3启动,发起一次选举。此时服务器1和2都会更改选票为服务器3。此次投票结果:服务器1为0票,服务器2为0票,服务器3为3票。此时服务器3的票数已经超过半数,服务器3当选Leader
。服务器1,2更改状态为FOLLOWING
,服务器3更改状态为LEADING
;
(4)服务器4启动,发起一次选举。此时服务器1,2,3已经不是LOOKING
状态,不会更改选票信息。交换选票信息结果:服务器3为3票,服务器4为1票。此时服务器4服从多数,更改选票信息为服务器3,并更改状态为FOLLOWING
;
(5)服务器5启动,同4一样当小弟。
6️⃣写数据流程
五 常见问题汇总
1️⃣
ZooKeeper
的部署方式有哪几种?集群中的角色有哪些?集群最少需要几台机器?
(1)部署方式单机模式、集群模式
(2)角色:Leader
和Follower
(3)集群最少需要机器数:3
2️⃣
ZooKeeper
的常用命令 :ls create get delete set…
3️⃣
ZooKeeper
的选举机制 : 参考4.5
4️⃣
ZooKeeper
的监听原理是什么 : 参考4.3