[TOC]
一、Zookeeper 一致性原理
-
两阶段提交 two-phase commit (2PC)
-
三阶段提交 three-phase commit (3PC)
-
Paxos算法
- ZAB算法
ZAB协议是专门为zookeeper实现分布式协调功能而设计。zookeeper主要是根据ZAB协议是实现分布式系统数据一致性。
zookeeper根据ZAB协议建立了主备模型完成zookeeper集群中数据的同步。这里所说的主备系统架构模型是指,在zookeeper集群中,只有一台leader负责处理外部客户端的事物请求(或写操作),然后leader服务器将客户端的写操作数据同步到所有的follower节点中。
ZAB的协议核心是在整个zookeeper集群中只有一个节点即Leader将客户端的写操作转化为事物(或提议proposal)。Leader节点再数据写完之后,将向所有的follower节点发送数据广播请求(或数据复制),等待所有的follower节点反馈。在ZAB协议中,只要超过半数follower节点反馈OK,Leader节点就会向所有的follower服务器发送commit消息。即将leader节点上的数据同步到follower节点之上。
ZAB协议中主要有两种模式,第一是消息广播模式;第二是崩溃恢复模式
二、Zookeeper集群特点
- 顺序一致性
- 原子性
- 单一视图
- 可靠性
- 实时性
- 角色轮换避免单点故障
1.Leader 集群工作机制中的核心
- 事务请求的唯一调度和处理者,保证集群事务处理的顺序性
- 集群内部个服务器的调度者(管理follower,数据同步)
2.Follower 集群工作机制中的跟随者
- 处理非事务请求,转发事务请求给Leader
- 参与事务请求proposal投票
- 参与leader选举投票
3.Observer (3.30以上版本提供)和follower功能相同,但不参与 任何形式投票
- 处理非事务请求,转发事务请求给 Leader
- 提高集群非事务处理能力
配置:
peerType=observer
server.3=192.168.0.102:2888:3888:observer
TIPS
通过stat命令可以查看服务器角色
三、集群配置
1.配置文件zoo.cfg
clientPort=2181
initLimit=5
syncLimit=2
peerType=observer
server.ID1=IP1:2888:3888
server.ID2=IP2:2888:3888
server.ID3=IP3:nnnn:mmmm:observer
配置说明
IDn: server id,集群中机器序号
IPn: 机器ip地址
nnnn: 同步端口
mmmm: 选举端口
observer: 是否为观察者
2.创建myid 文件
在dataDir所配置的目录下,创建一个名为myid的文件,在该文件的第一行写上 一个数字,和zoo.cfg中当前机器的编号对应上
3.配置其他机器
集群其他机器zoo.cfg和myid文件
4.验证服务器
使用stat命令
四、Zookeeper集群一致性协议ZAB
ZAB 协议是为分布式协调服务 ZooKeeper 专门设计的一种支持崩溃恢复的原子广 播协议。在 ZooKeeper 中,主要依赖 ZAB 协议来实现分布式数据一致性;
ZAB协议模式:
- 消息广播
-
崩溃恢复
集群生命周期:
1.消息广播
Step 1:
Leader 接收到消息请求后,将消息赋予一个全局唯一的 64 位自增 id,叫做:zxid ,通过 zxid 的大小比较即可实现因果有序这一特性。
Step 2:
Leader 通过先进先出队列(通过 TCP 协议来实现,以此实现了全局有序这一特性) 将带有 zxid 的消息作为一个提案(proposal)分发给所有 follower
Step 3:
当 follower 接收到 proposal,先将 proposal 写到硬盘,写硬盘成功后再向 leader 回一个 ACK
Step 4:
当 leader 接收到合法数量的 ACKs 后,leader 就向所有 follower 发送 COMMIT 命令,同事会在本地执行该消息。
Step 5:
当 follower 收到消息的 COMMIT 命令时,就会执行该消息
2.崩溃恢复
Step 1:
每个Server会发出一个投票,第一次都是投自己。投票信息:(myid,ZXID)
Step 2:
收集来自各个服务器的投票
Step 3:
处理投票并重新投票,处理逻辑:优先比较ZXID,然后比较myid
Step 4:
统计投票,只要超过半数的机器接收到同样的投票信息,就可以确定leader
Step 5:
改变服务器状态
TIPS:
集群机器的数量必须是奇数
五、Java客户端连接集群
ZkClient client = new ZkClient("host1,host2,host3,host4,host5");
Zk客户端处理过程:解析→打散→形成环形地址列表队列
六、Zookeeper典型应用场景
- 数据发布/订阅(配置中心)
- 命名服务
- 集群管理
- Master选举
- 分布式锁
1.数据发布/订阅(配置中心)
2.命名服务
3.集群管理
- 集群中多少机器在正常工作
- 每台机器的运行时状态收集
- 对集群中的机器进行上下线操作
4.Master选举
- 读写分离场景中,由master处理 读请求;
- Master处理复杂的逻辑,并将处 理结果通知到其他集群机器;
5.分布式锁
分布式锁的思路:
缺点:惊群效应
优化后的思路:
七、Zookeeper使用注意事项
1. Zk数据与日志清理
dataDir目录、dataLogDir两个目录会随着时间推移变得庞大,容易造成硬盘满了,清理办法:
- 自己编写shell脚本,保留最新的n个文件
- 使用zk自带的zkClient.sh保留最新的n个文件,zkClient.sh –n 15
- 配置autopurge.snapRetainCount和autopurge.purgeInterval两个参数配合使用;
2.Too many connections
配置maxClientCnxns参数,配置单个客户端机器创建的最大连接数;
3.磁盘管理
磁盘的I/O性能直接制约zookeeper更新操作速度,为了提高zk的写性能建议:
- 使用单独的磁盘
- Jvm堆内存设置要小心
4.磁盘管理集群数量
集群中机器的数量并不是越多越好,一个写操作需要半数以上的节点ack,所以集群节点数越多
,整个集群可以抗挂点的节点数越多(越可靠),但是吞吐量越差。集群的数量必须为奇数;
5.磁盘管理集群数量
zk是基于内存进行读写操作的,有时候会进行消息广播,因此不建议在节点存取容量比较大的数据;
八、Curator实例操作
1.Maven工程依赖引入
<dependencies>
<!--Zookeeper-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.8</version>
</dependency>
<!--Curator-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.8.1</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.7.4</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.7.4</version>
</dependency>
</dependencies>
新建一个ZK工具类,如下所示:
package com.yongliang.zookeeper.demo;
import com.yongliang.zookeeper.demo.listener.ZkCuratorWatcher;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import java.util.List;
/**
* Curator工具类
*
* @author zhangyongliang
* @create 2019-01-12 14:07
**/
public class CuratorUtils {
public CuratorFramework client=null;
private static final String zkServerIps="192.168.92.20:2181,192.168.92.21:2181,192.168.92.22:2181";
/**
* 同步创建ZK实例,设置重连策略
*/
public CuratorUtils() {
//设置重连策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
//实例化Curator客户端
//使用工厂建立客户端对象
client= CuratorFrameworkFactory.builder()
//放入ZK连接服务器IP
.connectString(zkServerIps)
//设定会话时间以及重连策略
.sessionTimeoutMs(10000)
//设定重连策略
.retryPolicy(retryPolicy)
.build();
// 启动Curator客户端
client.start();
}
//关闭客户端连接
private void closeZkClient(){
if(client!=null){
this.client.close();
}
}
//获取当前客户端的状态
private static void getZkStatus() {
CuratorUtils curatorUtil=new CuratorUtils();
boolean zkStatus=curatorUtil.client.isStarted();
System.out.println("当前客户端的状态:" + (zkStatus ? "连接中..." : "已关闭..."));
}
//创建节点
private static void createNonde()throws Exception{
//节点路径
String nodePath="/yongliang/testNode";
//节点数据
byte[] data="this is testZk Data".getBytes();
CuratorUtils curatorUtil=new CuratorUtils();
//创建父节点,递归创建
String result=curatorUtil.client.create().creatingParentsIfNeeded()
//创建持久节点
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(nodePath,data);
System.out.println(result+"节点,创建成功...");
//关闭客户端
curatorUtil.closeZkClient();
//获取状态信息
getZkStatus();
}
//更新节点
private static int updateNode()throws Exception{
CuratorUtils curatorUtil=new CuratorUtils();
//节点路径
String nodePath="/yongliang/testNode";
//更新数据
byte[] data="this is new Data".getBytes();
//指定数据版本
Stat resultStatus=curatorUtil.client.setData().withVersion(0)
.forPath(nodePath,data);
return resultStatus.getVersion();
}
//删除节点
private static void deleteNode()throws Exception{
CuratorUtils curatorUtil=new CuratorUtils();
//节点路径
String nodePath="/yongliang/testNode";
//删除节点, 如果删除失败会继续删除
curatorUtil.client.delete().guaranteed()
//子节点也递归删除
.deletingChildrenIfNeeded()
//删除对应的版本
.withVersion(1)
.forPath(nodePath);
curatorUtil.closeZkClient();
getZkStatus();
}
//查询节点信息
private static String getNodeInfo()throws Exception{
CuratorUtils curatorUtil=new CuratorUtils();
Stat statInfo=new Stat();
//节点路径
String nodePath="/yongliang/testNode";
byte[] nodeData=curatorUtil.client.getData().storingStatIn(statInfo).forPath(nodePath);
System.out.println("节点 " + nodePath + " 的数据为:" + new String(nodeData));
System.out.println("该节点的数据版本号为:" + statInfo.getVersion());
return new String(nodeData);
}
//获取父节点的子节点列表信息
private static List<String> getNodeListInfo()throws Exception{
CuratorUtils curatorUtil=new CuratorUtils();
List<String> nodeInfoList=null;
//节点路径
String nodePath="/yongliang/testNode";
nodeInfoList=curatorUtil.client.getChildren().forPath(nodePath);
System.out.println(nodePath + " 节点下的子节点列表:");
for (String childNode : nodeInfoList) {
System.out.println(childNode);
}
curatorUtil.closeZkClient();
return nodeInfoList;
}
//查询节点是否存在
private static boolean isNodeModify() throws Exception{
CuratorUtils curatorUtil=new CuratorUtils();
//节点路径
String nodePath="/yongliang/testNode";
// 查询某个节点是否存在,存在就会返回该节点的状态信息,如果不存在的话则返回空
Stat statExist = curatorUtil.client.checkExists().forPath(nodePath);
if (statExist != null) {
System.out.println(nodePath + " 节点存在");
return true;
}
curatorUtil.closeZkClient();
return false;
}
//Watcher 实现 修改节点数据就会触发监听 (有效性1次)
private static void wathcerZK()throws Exception{
CuratorUtils curatorUtil=new CuratorUtils();
//节点路径
String nodePath="/yongliang/testNode";
curatorUtil.client.getData().usingWatcher(new ZkCuratorWatcher()).forPath(nodePath);
}
//一次注册,多次监听(只针对NodeChance事件)
private static void periestWathcerZk()throws Exception{
CuratorUtils curatorUtil=new CuratorUtils();
//节点路径
String nodePath="/yongliang/testNode";
// NodeCache: 缓存节点,并且可以监听数据节点的变更,会触发事件
final NodeCache nodeCache = new NodeCache(curatorUtil.client, nodePath);
// 参数 buildInitial : 初始化的时候获取node的值并且缓存
nodeCache.start(true);
// 获取缓存里的节点初始化数据
if (nodeCache.getCurrentData() != null) {
System.out.println("节点初始化数据为:" + new String(nodeCache.getCurrentData().getData()));
} else {
System.out.println("节点初始化数据为空...");
}
// 为缓存的节点添加watcher,或者说添加监听器
nodeCache.getListenable().addListener(new NodeCacheListener() {
// 节点数据change事件的通知方法
@Override
public void nodeChanged() throws Exception {
// 防止节点被删除时发生错误
if (nodeCache.getCurrentData() == null) {
System.out.println("获取节点数据异常,无法获取当前缓存的节点数据,可能该节点已被删除");
return;
}
// 获取节点最新的数据
String data = new String(nodeCache.getCurrentData().getData());
System.out.println(nodeCache.getCurrentData().getPath() + " 节点的数据发生变化,最新的数据为:" + data);
}
});
}
//监听一个节点下的所有事件
private static void pathCildWatcher()throws Exception{
CuratorUtils curatorUtil=new CuratorUtils();
//节点路径
String nodePath="/yongliang/testNode";
final String PARENT_NODE_PATH = "/super"; // 父节点
// PathChildrenCache: 监听数据节点的增删改,可以设置触发的事件
final PathChildrenCache childrenCache = new PathChildrenCache(curatorUtil.client, PARENT_NODE_PATH, true);
/**
* StartMode: 初始化方式
* POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件
* NORMAL:异步初始化
* BUILD_INITIAL_CACHE:同步初始化
*/
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
// 列出子节点数据列表,需要使用BUILD_INITIAL_CACHE同步初始化模式才能获得,异步是获取不到的
List<ChildData> childDataList = childrenCache.getCurrentData();
System.out.println("当前节点的子节点详细数据列表:");
for (ChildData childData : childDataList) {
System.out.println("\t* 子节点路径:" + new String(childData.getPath()) + ",该节点的数据为:" + new String(childData.getData()));
}
// 添加事件监听器
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event) throws Exception {
// 通过判断event type的方式来实现不同事件的触发
if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) { // 子节点初始化时触发
System.out.println("\n--------------\n");
System.out.println("子节点初始化成功");
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { // 添加子节点时触发
if (event.getData().getPath().trim().equals(nodePath)) {
System.out.println("\n--------------\n");
System.out.print("子节点:" + event.getData().getPath() + " 添加成功,");
System.out.println("该子节点的数据为:" + new String(event.getData().getData()));
}
System.out.println("\n--------------\n");
System.out.print("子节点:" + event.getData().getPath() + " 添加成功,");
System.out.println("该子节点的数据为:" + new String(event.getData().getData()));
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { // 删除子节点时触发
if (event.getData().getPath().trim().equals(nodePath)) {
System.out.println("\n--------------\n");
System.out.println("子节点:" + event.getData().getPath() + " 删除成功");
}
System.out.println("\n--------------\n");
System.out.println("子节点:" + event.getData().getPath() + " 删除成功");
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) { // 修改子节点数据时触发
if (event.getData().getPath().trim().equals(nodePath)) {
System.out.println("\n--------------\n");
System.out.print("子节点:" + event.getData().getPath() + " 数据更新成功,");
System.out.println("子节点:" + event.getData().getPath() + " 新的数据为:" + new String(event.getData().getData()));
}
System.out.println("\n--------------\n");
System.out.print("子节点:" + event.getData().getPath() + " 数据更新成功,");
System.out.println("子节点:" + event.getData().getPath() + " 新的数据为:" + new String(event.getData().getData()));
}
}
});
}
public static void main(String[] args) throws Exception {
// getZkStatus();
// createNonde();
// int result=updateNode();
// System.out.println(result);
// deleteNode();
// String reuslt=getNodeInfo();
// System.out.println(reuslt);
// getNodeListInfo();
// isNodeModify();
}
}