初识Zookeeper
Zookeeper
是一个典型的分布式数据一致性的解决方案,分布式应用程序可以基于它实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调通知、Master选举、分布式锁和分布式队列等功能。本篇主要是用java多线程模拟实现基于Zookeeper
的分布式读写锁。
Zookeeper伪集群部署实践
Zookeeper
搭建分布式集群至少需要三台服务器,手头确实没有那么多资源,好在Zookeeper
允许在一台机器上完成一个伪集群的搭建。所谓伪集群其实就是所有的机器都在同一台机器上,但还是以集群的特性来对外提供服务的。这种模式和集群十分相似,只不过在同一台机器上的不同Zookeeper实例是以不同的端口号来互相通信的。从官网下载到最新的Zookeeper
发行版本压缩包:zookeeper-3.4.10.tar.gz
,将Zookeeper
解压到我的阿里云服务器/usr/local/zookeeper
地址下。
配置
cp /usr/local/zookeeper/conf/zoo_sample.cfg zoo1.cfg
cp /usr/local/zookeeper/conf/zoo_sample.cfg zoo2.cfg
cp /usr/local/zookeeper/conf/zoo_sample.cfg zoo3.cfg
以zoo_sample.cfg
为模板复制三个伪集群的配置。三个配置文件的详细配置如下:
zoo1.cfg:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data_1/
clientPort=2181
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889
zoo2.cfg:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data_2/
clientPort=2182
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889
zoo3.cfg:
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data_3/
clientPort=2183
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889
-
tickTime
: 心跳时间,单位是毫秒 ,session的最小超时时间是2*tickTime -
initLimit
:多少个tickTime内,允许其他server连接并初始化数据 -
syncLimit
:多少个tickTime内,允许follower同步 -
dataDir
:存放zookeeper数据的路径 -
clientPort
:监听客户端连接的端口号 -
server.id=ip:port1:port2
:其中id表示该服务器的id号,需要注意的是,集群部署必须在dataDir
路径下新建一个myid
的文件,内容就是当前服务器的id号。ip是当前服务器的ip,port1表示Follower服务器和Leader进行运行时通信和数据同步使用的端口号,port2端口用于Leader选举过程中的投票通信。
使用下面命令开启三个Zookeeper server服务:
root@iZwz9hs5ueqrblutmr5bncZ:/usr/local/zookeeper# bin/zkServer.sh start zoo1.cfg
root@iZwz9hs5ueqrblutmr5bncZ:/usr/local/zookeeper# bin/zkServer.sh start zoo2.cfg
root@iZwz9hs5ueqrblutmr5bncZ:/usr/local/zookeeper# bin/zkServer.sh start zoo3.cfg
可以在/usr/local/zookeeper
路径下的zookeeper.out
文件中看到服务开启的日志信息。
测试客户端的连接
在测试前,需要在阿里云服务器管理中心打开相应的Zookeeper
客户端连接端口。打开Windows命令行,定位到Zookeeper
路径执行下面命令:
D:\zookeeper-3.4.10>bin\zkCli -server 服务器ip:2181
发现Windows Zookeeper
客户端已经成功连接上阿里云上的Zookeeper
集群,创建一个test结点。此时我们连接的是2181端口,也就是伪集群当中zoo1的监听的端口号。如下图所示:
另外在阿里云服务器中也开启一个客户端的连接,看看我们之前添加的test结点有没有真正的添加到Zookeeper
集群中。
上图可以看到test结点确实已经在集群当中了,而且此时连接的是2183端口,也就是zoo3这个server。由此验证了
Zookeeper
伪集群搭建成功。
分布式读写锁实现
下面就基于上面搭建的Zookeeper
伪集群,实现分布式读写锁。我们都知道当一个事务获得读锁之后,在这之后的事务只能获取读锁,写锁获取必须等到所有读锁全部释放。而写锁一旦获取后其他事务的读锁以及写锁都必须等待该写锁释放后获取。那么Zookeeper
是怎么实现这一特性的呢?
首先Zookeeper
创建节点时可以创建4中形式的节点,分别是持久节点(PERSISTENT)、持久顺序节点(PERSISTENT_SEQUENTIAL)、临时节点(EPHEMERAL)、临时顺序节点(EPHEMERAL_SEQUENTIAL)。
- 持久节点:该数据节点被创建后,就会一直存在于
Zookeeper
服务器上,直到有删除操作主动清除这个节点。 - 持久顺序节点:基本特性和持久节点是一致的,额外的特性表现在顺序性上,在创建节点的过程中,
Zookeeper
会自动为给定节点名加上一个递增的数字后缀作为新的节点名。 - 临时节点:临时节点的生命周期和客户端的会话绑定在一起,也就是说,如果客户端会话失效,那么节点就会被自动清理掉。另外,临时节点不能创建子结点。
- 临时顺序节点:特性和临时节点一致,额外拥有顺序的特性。
其实分布式读写锁就是基于Zookeeper
顺序节点特性来实现的。每个事务想要获得锁时都去同一个节点/lock
下,创建命名规范为[hostname]-锁类型-序号
的临时顺序节点。如果自身想要获取的是读锁,那么只要查看/lock
下节点顺序比自身小的节点中没有写类型的节点便可获得读锁。如果自身想要获取写锁,那么只要看到/lock
下自己是顺序最小的节点便可获得写锁。下面是多线程模拟事务的详细实现:
public class DistributedLockDemo {
private static CountDownLatch countDownLatch = new CountDownLatch(1);
private static String rootPath = "/lock";
private static ZooKeeper zooKeeper;
public static void main(String[] args) {
try {
zooKeeper = new ZooKeeper("119.23.216.241:2181", 5000, null); //连接我的Zookeeper集群
zooKeeper.create(rootPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
WriteThread[] writeThreads = new WriteThread[5]; //创建5个写锁线程
ReadThread[] readThreads = new ReadThread[10]; //创建10个读锁线程
for (int i = 0; i < writeThreads.length; i++) {
writeThreads[i] = new WriteThread("WriteThread_" + i);
writeThreads[i].start();
}
for (int i = 0; i < readThreads.length; i++) {
readThreads[i] = new ReadThread("ReadThread_" + i);
readThreads[i].start();
}
TimeUnit.SECONDS.sleep(1);
countDownLatch.countDown();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
static class WriteThread extends Thread {
public WriteThread(String name){
super(name);
}
@Override
public void run() {
try {
countDownLatch.await(); //多个线程同一时间竞争资源
//path示例:/lock/WriteThread_0-W-0000000001
String path = zooKeeper.create(rootPath + "/" + Thread.currentThread().getName() + "-W-", "".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName() + ": try to acquire write lock...");
while (!canGetWriteLock(path)) {
TimeUnit.MILLISECONDS.sleep(500);
}
afterGetLockDo(path, true);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class ReadThread extends Thread {
public ReadThread(String name){
super(name);
}
@Override
public void run() {
try {
countDownLatch.await();
String path = zooKeeper.create(rootPath + "/" + Thread.currentThread().getName() + "-R-", "".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName() + ": try to acquire read lock...");
while (!canGetReadLock(path)){
TimeUnit.MILLISECONDS.sleep(500);
}
afterGetLockDo(path, false);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
//判断是否可以获取写锁
private static boolean canGetWriteLock(String path) {
List<String> children = null; // /lock下所有子节点列表
try {
children = zooKeeper.getChildren(rootPath, false);
//基于节点名当中的序号进行排序
Collections.sort(children, new Comparator<String>() {
public int compare(String o1, String o2) {
int index1 = o1.lastIndexOf("-");
int index2 = o2.lastIndexOf("-");
long a = Long.parseLong(o1.substring(index1 + 1));
long b = Long.parseLong(o2.substring(index2 + 1));
return (int) (a - b);
}
});
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
//只要最小的节点是自身便可获得写锁
boolean result = path.replace("/lock/", "").equals(children.get(0));
return result;
}
//判断是否可以获得读锁
private static boolean canGetReadLock(String path){
List<String> children = null;
try {
children = zooKeeper.getChildren(rootPath, false);
Collections.sort(children, new Comparator<String>() {
public int compare(String o1, String o2) {
int index1 = o1.lastIndexOf("-");
int index2 = o2.lastIndexOf("-");
long a = Long.parseLong(o1.substring(index1 + 1));
long b = Long.parseLong(o2.substring(index2 + 1));
return (int) (a - b);
}
});
//只要序号比自己小的节点中没有写类型节点便可获得读锁
int index = children.indexOf(path.replace("/lock/", ""));
for (int i = 0; i < index; i++) {
String child = children.get(i);
int k = child.indexOf("-");
if (child.substring(k+1, k+2).equals("W")) return false;
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
return true;
}
//获得锁后所做的工作
private static void afterGetLockDo(String path, boolean isWriteLock) {
if (isWriteLock) {
System.out.println(Thread.currentThread().getName() + ": get write lock...");
try {
TimeUnit.SECONDS.sleep(2); //模拟写事务所做的工作
System.out.println(Thread.currentThread().getName() + ": release write lock...");
zooKeeper.delete(path, -1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
} else {
System.out.println(Thread.currentThread().getName() + ": get read lock...");
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + ": release read lock...");
zooKeeper.delete(path, -1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
}
最后的运行结果如下图所示:
** 可以看到所有线程都是符合正常的读写锁逻辑并且获取到相关的锁资源。至此,Zookeeper读写锁就实现啦。 **