Zookeeper简介
Zookeeper是什么?
-
Zookeeper
是一个分布式协调服务的开源框架。 - 主要⽤用来解决分布式集群中应⽤用系统的⼀致性问题,解决分布式系统中数据存在⼀致性的问题
- 例如怎样避免同时操作同⼀数据造成脏读的问题
-
ZooKeeper
本质上是一个分布式的⼩文件存储系统- 提供基于类似于⽂件系统的目录树方式的数据存储,并且可以对树中的节点进行有效管理
-
ZooKeeper
提供给客户端监控存储在zk
内部数据的功能- 从而可以达到基于数据的集群管理,如监听文件变化
- 如:统一命名服务(
dubbo
)、分布式配置管理(solr
的配置集中管理理)、分布式消息队列 (sub/pub
)、分布式锁、分布式协调等功能
Zookeeper架构组成
-
ZK
也是master/slave
架构,Leader
角色是启动时投票选举出来的
Leader
-
Zookeeper
集群⼯作的核⼼⻆⾊ - 集群内部各个服务器的调度者。
- 事务请求(写操作) 的唯⼀调度和处理者,保证集群事务处理的顺序性;对于
create
,setData
,delete
等有写操作的请求,则需要统⼀转发给leader
处理,leader
需要决定编号、执⾏操作,这个过程称为⼀个事务
Follower
- 处理客户端⾮事务(读操作) 请求,转发事务请求给
Leader
; - 参与集群
Leader
选举投票2n-1
台可以做集群投票。 - 针对访问量⽐较⼤的
zookeeper
集群, 还可新增观察者⻆⾊。
Observer
- server
- 观察者⻆⾊,观察
Zookeeper
集群的最新状态变化并将这些状态同步过来,其对于⾮事务请求可以进⾏独⽴处理,对于事务请求,则会转发给Leader
服务器进⾏处理。 - 不会参与任何形式的投票只提供⾮事务服务,通常⽤于在不影响集群事务处理能⼒的前提下提升集群的⾮事务处理能⼒。增加了集群增加并发的读请求。
Zookeeper特点
-
Zookeeper
:⼀个领导者(leader
:⽼⼤),多个跟随者(follower
:⼩弟)组成的集群。
-
-
Leader
负责进⾏投票的发起和决议,更新系统状态(内部原理)
-
-
Follower
⽤于接收客户请求并向客户端返回结果,在选举Leader
过程中参与投票
-
- 集群中只要有半数以上节点存活,
Zookeeper
集群就能正常服务。
- 集群中只要有半数以上节点存活,
- 全局数据⼀致:每个
server
保存⼀份相同的数据副本,Client
⽆论连接到哪个server
,数据都是⼀致的。
- 全局数据⼀致:每个
- 更新请求顺序进⾏(内部原理)
- 数据更新原⼦性,⼀次数据更新要么成功,要么失败
Zookeeper环境搭建
- 版本:
3.4.14
安装
tar -zxvf zookeeper-3.4.14.tar.gz -C ../servers/
创建文件目录
- 修改配置⽂件创建data与log⽬录
mkdir -p /opt/servers/zookeeper/data/logs
- 配置文件
# cd /opt/servers/zookeeper/conf
# mv zoo_sample.cfg zoo.cfg
# vi zoo.cfg
#更新datadir
dataDir=/opt/servers/zookeeper/data
#增加logdir
dataLogDir=/opt/servers/zookeeper/data/logs
#增加集群配置
##server.服务器ID=服务器IP地址:服务器之间通信端⼝:服务器之间投票选举端⼝
server.1=os1:2888:3888
server.2=os2:2888:3888
server.3=os3:2888:3888
#打开注释
#ZK提供了⾃动清理事务⽇志和快照⽂件的功能,这个参数指定了清理频率,单位是⼩时
autopurge.purgeInterval=1
- 添加myid配置
cd /opt/servers/zookeeper/data
echo 1 > myid
# os2
echo 2 > myid
# os3
echo 3 > myid
- 启动
zookeeper
# 三台节点分别执行
/opt/servers/zookeeper/bin/zkServer.sh start
- 集群启动停⽌脚本
vi zk.sh
#!/bin/sh
echo "start zookeeper server..."
if(($#==0));then
echo "no params";
exit;
fi
hosts="os1 os2 os3"
for host in $hosts
do
ssh $host "source /root/.bash_profile; /opt/servers/zookeeper/bin/zkServer.sh $1"
done
Zookeeper数据结构与监听机制
ZooKeeper数据模型Znode
- 在
ZooKeeper
中,数据信息被保存在⼀个个数据节点上,这些节点被称为ZNode
。 -
ZNode
是Zookeeper
中最⼩数据单位 - 在
ZNode
下⾯⼜可以再挂ZNode
,这样⼀层层下去就形成了⼀个层次化命名空间ZNode
树,我们称为ZNode Tree
,它采⽤了类似⽂件系统的层级树状结构进⾏管理
ZNode 的类型
-
Zookeeper
节点类型可以分为三⼤类:- 持久性节点(
Persistent
)- 指节点被创建后会⼀直存在服务器,直到删除操作主动清除,最常见的类型
- 临时性节点(
Ephemeral
)- 会被⾃动清理掉的节点,它的⽣命周期和客户端会话绑在⼀起,客户端会话结束,节点会被删除掉。
- 与持久性节点不同的是,临时节点不能创建⼦节点
- 顺序性节点(
Sequential
)- 有顺序的持久节点
- 有顺序的临时性节点
- 持久性节点(
事务ID
- 事务是对物理和抽象的应⽤状态上的操作集合
- 狭义上的事务通常指的是数据库事务,⼀般包含了⼀系列对数据库有序的读写操作
- 这些数据库事务具有所谓的ACID特性,即原⼦性(
Atomic
)、⼀致性(Consistency
)、隔离性(Isolation
)和持久性(Durability
) - ⽽在
ZooKeeper
中,事务是指能够改变ZooKeeper
服务器状态的操作- 事务操作或更新操作,⼀般包括数据节点创建与删除、数据节点内容更新等操作
- 对于每⼀个事务请求,
ZooKeeper
都会为其分配⼀个全局唯⼀的事务ID
,通常是⼀个64
位的数字 - 每⼀个
ZXID
对应⼀次更新操作,间接表示更新操作请求的全局顺序
ZNode 的状态信息
#使⽤zkCli.sh 连接到zk集群
[zk: localhost:2181(CONNECTED) 2] get /zookeeper
cZxid = 0x0
ctime = Wed Dec 31 19:00:00 EST 1969
mZxid = 0x0
mtime = Wed Dec 31 19:00:00 EST 1969
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
-
cZxid
:Create ZXID,表示节点被创建时的事务ID。 -
ctime
:Create Time,表示节点创建时间。 -
mZxid
:Modified ZXID,表示节点最后⼀次被修改时的事务ID。 -
mtime
:Modified Time,表示节点最后⼀次被修改的时间。 -
pZxid
:表示该节点的⼦节点列表最后⼀次被修改时的事务 ID。只有⼦节点列表变更才会更新pZxid
,⼦节点内容变更不会更新。 -
cversion
:表示⼦节点的版本号。 -
dataVersion
:表示内容版本号。 -
aclVersion
:标识acl
版本 -
ephemeralOwner
:表示创建该临时节点时的会话sessionID
,如果是持久性节点那么值为 0 -
dataLength
:表示数据⻓度。 -
numChildren
:表示直系⼦节点数。
Watcher 机制
-
Zookeeper
使⽤Watcher
机制实现分布式数据的发布/订阅功能 - ⼀个典型的发布/订阅模型系统:定义了⼀种⼀对多的订阅关系,能够让多个订阅者同时监听某⼀个主题对象,当这个主题对象⾃身状态变化时,会通知所有订阅者,使它们能够做出相应的处理
-
ZooKeeper
允许客户端向服务端注册⼀个Watcher
监听,当服务端的⼀些指定事件触发了这个Watcher
,那么Zk
就会向指定客户端
发送⼀个事件通知来实现分布式的通知功能
主要包括:
- 客户端线程
- 客户端
WatcherManager
-
Zookeeper
服务器
具体⼯作流程为
- 客户端在向
Zookeeper
服务器注册的同时,会将Watcher
对象存储在客户端的WatcherManager当中 - 当
Zookeeper
服务器触发Watcher
事件后,会向客户端发送通知 - 客户端线程从
WatcherManager
中取出对应的Watcher
对象来执⾏回调逻辑
Zookeeper的基本使⽤
ZooKeeper命令⾏操作
- 通过
zkClient
进⼊Zookeeper
客户端命令⾏
./zkcli.sh 连接本地的zookeeper服务器
./zkCli.sh -server ip:port(2181) 连接指定的服务器
- 创建节点
create [-s][-e] path data
- 其中,-s或-e分别指定节点特性,顺序或临时节点,若不指定,则创建持久节点
# 创建顺序节点,内容为123 create -s /zk-test 123 # 创建临时节点 create -e /zk-temp 123 # 创建永久节点 create /zk-permanent 123
- 临时节点
quit
后自动删除 - 永久节点不同于顺序节点,不会⾃动在后⾯添加⼀串数字
- 读取节点
-
ls path
:-
ls
命令可以列出Zookeeper
指定节点下的所有⼦节点,但只能查看指定节点下的第⼀级的所有⼦节点;
-
-
get path
:-
get
命令可以获取Zookeeper
指定节点的数据内容和属性信息
-
-
- 更新节点
-
set path data
:使⽤set
命令,可以更新指定节点的数据内容
-
- 删除节点
-
delete path
:使⽤delete
命令可以删除Zookeeper
上的指定节点 - 若删除节点存在⼦节点,那么⽆法删除该节点,必须先删除⼦节点,再删除⽗节点
-
Zookeeper-开源工具
ZkClient
ZkClient是Github上⼀个开源的zookeeper客户端,在Zookeeper原⽣API接⼝之上进⾏了包装,是⼀个更易⽤的Zookeeper客户端,同时,zkClient在内部还实现了诸如Session超时重连、Watcher反复注册等功能
添加依赖
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.2</version>
</dependency>
</dependencies>
创建会话
public class ZkDemo {
public static void main(String[] args) {
// 获取zk client 对象, 通讯端口2181
// 建立会话
ZkClient zkClient = new ZkClient("tctj1:2181");
System.out.println("zkclient is ready, ZooKeeper session established.");
// 创建节点, createParents的值设置为true,可以递归创建节点
zkClient.createPersistent("/lg-zkClient/lg-c1",true);
System.out.println("success create znode.");
}
}
创建节点
-
createPersistent
方法
public class Create_Node_Sample {
public static void main(String[] args) {
ZkClient zkClient = new ZkClient("127.0.0.1:2181");
System.out.println("ZooKeeper session established.");
//createParents的值设置为true,可以递归创建节点
zkClient.createPersistent("/lg-zkClient/lg-c1",true);
System.out.println("success create znode.");
}
}
删除节点
-
deleteRecursive
方法
public class Del_Data_Sample {
public static void main(String[] args) throws Exception {
String path = "/lg-zkClient/lg-c1";
ZkClient zkClient = new ZkClient("127.0.0.1:2181", 5000);
zkClient.deleteRecursive(path);
System.out.println("success delete znode.");
}
}
监听节点变化
-
subscribeChildChanges
方法
public class Get_Child_Change {
public static void main(String[] args) throws InterruptedException {
// 获取zclient
ZkClient zkClient = new ZkClient("tctj1:2181");
String path = "/lg-client";
zkClient.subscribeChildChanges(path, new IZkChildListener() {
public void handleChildChange(String s, List<String> list) throws Exception {
//打印节点信息
System.out.println(s + " childs changes ,current childs " +
list);
}
});
// 测试
zkClient.createPersistent("/lg-client");
Thread.sleep(1000); //只是为了⽅便观察结果数据
zkClient.createPersistent("/lg-client/c1");
Thread.sleep(1000);
zkClient.delete("/lg-client/c1");
Thread.sleep(1000);
zkClient.delete("/lg-client");
Thread.sleep(Integer.MAX_VALUE);
/*
1 监听器可以对不存在的⽬录进⾏监听
2 监听⽬录下⼦节点发⽣改变,可以接收到通知,携带数据有⼦节点列表
3 监听⽬录创建和删除本身也会被监听到
*/
}
}
获取数据(节点是否存在、更新、删除)
-
subscribeDataChanges
方法 -
ZkStrSerializer
序列化接口
public class Get_Data_Change {
public static void main(String[] args) throws InterruptedException {
// 获取zkClient对象
ZkClient zkClient = new ZkClient("tctj1:2181");
// 设置序列化
zkClient.setZkSerializer(new ZkStrSerializer());
//判断节点是否存在,不存在创建节点并赋值
String path = "/lg-client1";
final boolean exists = zkClient.exists(path);
if (!exists) {
zkClient.createEphemeral(path, "123");
}
// 监听变化
zkClient.subscribeDataChanges(path, new IZkDataListener() {
public void handleDataChange(String s, Object o) throws Exception {
//定义接收通知之后的处理逻辑
System.out.println(s + " data is changed ,new data " +
o);
}
public void handleDataDeleted(String s) throws Exception {
System.out.println(s + " is deleted!!");
}
});
// 测试
final Object o = zkClient.readData(path);
System.out.println(o);
zkClient.writeData(path, "new data");
Thread.sleep(1000);
//删除节点
zkClient.delete(path);
Thread.sleep(Integer.MAX_VALUE);
}
}
public class ZkStrSerializer implements ZkSerializer {
public byte[] serialize(Object o) throws ZkMarshallingError {
return String.valueOf(o).getBytes();
}
public Object deserialize(byte[] bytes) throws ZkMarshallingError {
return new String(bytes);
}
}
Zookeeper内部原理
Leader选举
选举机制
- 半数机制:集群中半数以上机器存活,集群可⽤。所以
Zookeeper
适合安装奇数台
服务器。 -
Zookeeper
虽然在配置⽂件中并没有指定Master
和Slave
。但是,Zookeeper
⼯作时,是有⼀个节点为Leader
,其它为Follower
,Leader
是通过内部的选举机制产⽣的。
首次启动
*(1)服务器1启动,此时只有它⼀台服务器启动了,它发出去的报⽂没有任何响应,所以它的选举状态⼀直是LOOKING
状态。
*(2)服务器2启动,它与最开始启动的服务器1进⾏通信,互相交换⾃⼰的选举结果,由于两者都没有历史数据,所以id
值较⼤的服务器2胜出,但是由于没有达到超过半数以上的服务器都同意选举它(这个
例⼦中的半数以上是3),所以服务器1、2还是继续保持LOOKING状态。
*(3)服务器3启动,根据前⾯的理论分析,服务器3成为服务器1、2、3中的⽼⼤,⽽与上⾯不同的是,此时有三台服务器选举了它,所以它成为了这次选举的Leader
。
*(4)服务器4启动,根据前⾯的分析,理论上服务器4应该是服务器1、2、3、4中最⼤的,但是由于前⾯已经有半数以上的服务器选举了服务器3,所以它只能接收当follower
。
*(5)服务器5启动,同4⼀样称为follower
。
集群⾮⾸次启动
- 每个节点在选举时都会参考⾃身节点的
zxid
值(事务ID
) - 优先选择
zxid
值⼤的节点称为Leader
ZAB⼀致性协议
分布式数据⼀致性问题
- 将数据复制到分布式部署的多台机器中,可以消除单点故障,防⽌系统由于某台(些)机器宕机导致的不可⽤。
- 通过负载均衡技术,能够让分布在不同地⽅的数据副本全都对外提供服务。有效提⾼系统性能。
- 在分布式系统中引⼊数据复制机制后,多台数据节点之间由于⽹络等原因很容易产⽣数据不⼀致的情况
ZAB
-
ZK
就是分布式⼀致性问题的⼯业解决⽅案 - 使⽤了⼀种称为
Zookeeper Atomic Broadcast
(ZAB
,Zookeeper
原⼦消息⼴播协议)的协议作为其数据⼀致性的核⼼算法。
ZAB协议
-
ZAB
协议是为分布式协调服务Zookeeper
专⻔设计的⼀种⽀持崩溃恢复和原⼦⼴播协议。 -
主备模式保证⼀致性
ZK如何处理集群数据
- 所有客户端写⼊数据都是写⼊
Leader
中,然后,由Leader
复制到Follower
中。 -
ZAB
会将服务器数据的状态变更以事务Proposal
的形式⼴播到所有的副本进程上,ZAB
协议能够保证了事务操作的⼀个全局的变更序号(ZXID
)。
⼴播消息
-
ZAB
协议的消息⼴播过程类似于⼆阶段提交过程 - 对于客户端发送的写请求,全部由
Leader
接收, -
Leader
将请求封装成⼀个事务Proposal
(提议),将其发送给所有Follwer
- 如果收到超过半数反馈
ACK
,则执⾏Commit
操作(先提交⾃⼰,再发送Commit
给所有Follwer
)。 - 不能正常反馈
Follower
恢复正常后会进⼊数据同步阶段最终与Leader
保持⼀致 - 注意:
-
Leader
接收到Client
请求之后,会将这个请求封装成⼀个事务 - 并给这个事务分配⼀个全局递增的唯⼀
ID
,称为事务ID
(ZXID
) -
ZAB
协议要求保证事务的顺序,因此必须将每⼀个事务按照ZXID
进⾏先后排序然后处理 -
ZK
集群为了保证任何事务操作能够有序的顺序执⾏,只能是Leader
服务器接受写请求,即使是Follower
服务器接受到客户端的请求,也会转发到Leader
服务器进⾏处理
-
Leader 崩溃问题
-
Leader
宕机后,ZK
集群⽆法正常⼯作,ZAB
协议提供了⼀个⾼效且可靠的leader
选举算法, 需要解决:-
ZAB
协议确保那些已经在Leader
提交的事务最终会被所有服务器提交。 -
ZAB
协议确保丢弃那些只在Leader
提出/复制,但没有提交的事务。 - 关键点:保证选举出的新
Leader
拥有集群中所有节点最⼤编号(ZXID
)的事务 - 保证,被
Leader
提交的事务被集群接受,丢弃还没有提交的事务
-
ZooKeeper应用
服务器动态上下线监听
具体实现
服务器端
public class ServerMain {
private ZkClient zkClient = null;
//获取到zk对象
private void connectZK(){
zkClient = new ZkClient("tctj1:2181,tctj2:2181,tctj3:2181");
if(!zkClient.exists("/servers")){
zkClient.createPersistent("/servers");
}
}
// 注册服务端信息到zk节点
private void registerServerInfo(String ip,String port){
//创建临时顺序节点
final String path =
zkClient.createEphemeralSequential("/servers/server", ip +":"+port);
System.out.println("---->>> 服务器注册成功,ip="+ip+";port ="+port+";节点路径信息="+path);
}
public static void main(String[] args) {
final ServerMain server = new ServerMain();
server.connectZK();
server.registerServerInfo(args[0],args[1] );
//启动⼀个服务线程提供时间查询
new TimeServer(Integer.parseInt(args[1])).start();
}
}
public class TimeServer extends Thread {
private int port=0;
public TimeServer(int port) {
this.port = port;
}
@Override
public void run() {
//启动serversocket监听⼀个端⼝
try {
final ServerSocket serverSocket = new ServerSocket(port);
while(true){
final Socket socket = serverSocket.accept();
final OutputStream out = socket.getOutputStream();
out.write(new Date().toString().getBytes());
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
客户端
public class Client {
//获取zkclient
ZkClient zkClient = null;
//维护⼀个serversi 信息集合
ArrayList<String> infos = new ArrayList<String>();
private void connectZk() {
// 创建zkclient
zkClient = new ZkClient("tctj1:2181,tctj2:2181");
//第⼀次获取服务器信息,所有的⼦节点
final List<String> childs = zkClient.getChildren("/servers");
for (String child : childs) {
//存储着ip+port
final Object o = zkClient.readData("/servers/" + child);
infos.add(String.valueOf(o));
}
//对servers⽬录进⾏监听
zkClient.subscribeChildChanges("/servers", new IZkChildListener() {
public void handleChildChange(String s, List<String> children)
throws Exception {
//接收到通知,说明节点发⽣了变化,client需要更新infos集合中的数据
ArrayList<String> list = new ArrayList<String>();
//遍历更新过后的所有节点信息
for (String path : children) {
final Object o = zkClient.readData("/servers/" + path);
list.add(String.valueOf(o));
}
//最新数据覆盖⽼数据
infos = list;
System.out.println("--》接收到通知,最新服务器信息为:" + infos);
}
});
}
//发送时间查询的请求
public void sendRequest() throws IOException {
//⽬标服务器地址
final Random random = new Random();
final int i = random.nextInt(infos.size());
final String ipPort = infos.get(i);
final String[] arr = ipPort.split(":");
//建⽴socket连接
final Socket socket = new Socket(arr[0], Integer.parseInt(arr[1]));
final OutputStream out = socket.getOutputStream();
final InputStream in = socket.getInputStream();
//发送数据
out.write("query time".getBytes());
out.flush();
//接收返回结果
final byte[] b = new byte[1024];
in.read(b);//读取服务端返回数据
System.out.println("client端接收到server:+" + ipPort + "+返回结果:" +
new String(b));
//释放资源
in.close();
out.close();
socket.close();
}
public static void main(String[] args) throws InterruptedException {
final Client client = new Client();
client.connectZk(); //监听器逻辑
while (true) {
try {
client.sendRequest(); //发送请求
} catch (IOException e) {
e.printStackTrace();
try {
client.sendRequest();
} catch (IOException e1) {
e1.printStackTrace();
}
}
//每隔⼏秒中发送⼀次请求到服务端
Thread.sleep(2000);
}
}
}
分布式锁
Hadoop HA
HA 概述
- 所谓
HA
(High Available
),即⾼可⽤(7*24⼩时不中断服务)。
- 所谓
-
- 实现⾼可⽤最关键的策略是消除单点故障。
Hadoop-HA
严格来说应该分成各个组件的HA
机制:
-
HDFS
-HA
-
YARN
-HA
。
- 实现⾼可⽤最关键的策略是消除单点故障。
-
Hadoop2.0
之前,在HDFS
集群中NameNode
存在单点故障(SPOF
)。
-
-
NameNode
主要在以下两个⽅⾯影响HDFS
集群
-
单点存在的问题
-
NameNode
机器发⽣意外,如宕机,集群将⽆法使⽤,直到管理员重启 -
NameNode
机器需要升级,包括软件、硬件升级,此时集群也将⽆法使⽤ -
HDFS HA
功能通过配置Active/Standby
两个NameNodes
实现在集群中对NameNode
的热备来解决上述问题。如果出现故障,如机器崩溃或机器需要升级维护,这时可通过此种⽅式将NameNode
很快的切换到另外⼀台机器。
HDFS-HA ⼯作机制
- 通过双
NameNode
消除单点故障(Active/Standby
)
HDFS-HA⼯作要点
-
- 元数据管理⽅式需要改变
- 内存中各⾃保存⼀份元数据;
-
Edits
⽇志只有Active
状态的NameNode
节点可以做写操作; - 两个
NameNode
都可以读取Edits
; - 共享的
Edits
放在⼀个共享存储中管理(qjournal
和NFS
两个主流实现);
-
- 需要⼀个状态管理功能模块
- 实现了⼀个
zkfailover
,常驻在每⼀个NameNode
所在的节点,每⼀个zkfailover
负责监控⾃⼰所在NameNode
节点,利⽤zk
进⾏状态标识,当需要进⾏状态切换时,由zkfailover
来负责切换,切换
时需要防⽌brain split
现象的发⽣(集群中出现两个Active
的Namenode
)。
- 必须保证两个
NameNode
之间能够ssh
⽆密码登录
- 必须保证两个
- 隔离(
Fence
),即同⼀时刻仅仅有⼀个NameNode
对外提供服务
- 隔离(