Zookeeper伪集群部署与分布式读写锁实现

初识Zookeeper

Zookeeper是一个典型的分布式数据一致性的解决方案,分布式应用程序可以基于它实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调通知、Master选举、分布式锁和分布式队列等功能。本篇主要是用java多线程模拟实现基于Zookeeper的分布式读写锁。

Zookeeper伪集群部署实践

Zookeeper搭建分布式集群至少需要三台服务器,手头确实没有那么多资源,好在Zookeeper允许在一台机器上完成一个伪集群的搭建。所谓伪集群其实就是所有的机器都在同一台机器上,但还是以集群的特性来对外提供服务的。这种模式和集群十分相似,只不过在同一台机器上的不同Zookeeper实例是以不同的端口号来互相通信的。从官网下载到最新的Zookeeper发行版本压缩包:zookeeper-3.4.10.tar.gz,将Zookeeper解压到我的阿里云服务器/usr/local/zookeeper地址下。

配置

cp /usr/local/zookeeper/conf/zoo_sample.cfg zoo1.cfg
cp /usr/local/zookeeper/conf/zoo_sample.cfg zoo2.cfg
cp /usr/local/zookeeper/conf/zoo_sample.cfg zoo3.cfg

zoo_sample.cfg为模板复制三个伪集群的配置。三个配置文件的详细配置如下:
zoo1.cfg:

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data_1/
clientPort=2181
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889

zoo2.cfg:

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data_2/
clientPort=2182
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889

zoo3.cfg:

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data_3/
clientPort=2183
server.1=127.0.0.1:2887:3887
server.2=127.0.0.1:2888:3888
server.3=127.0.0.1:2889:3889
  • tickTime : 心跳时间,单位是毫秒 ,session的最小超时时间是2*tickTime
  • initLimit:多少个tickTime内,允许其他server连接并初始化数据
  • syncLimit:多少个tickTime内,允许follower同步
  • dataDir:存放zookeeper数据的路径
  • clientPort:监听客户端连接的端口号
  • server.id=ip:port1:port2:其中id表示该服务器的id号,需要注意的是,集群部署必须在dataDir路径下新建一个myid的文件,内容就是当前服务器的id号。ip是当前服务器的ip,port1表示Follower服务器和Leader进行运行时通信和数据同步使用的端口号,port2端口用于Leader选举过程中的投票通信。
    使用下面命令开启三个Zookeeper server服务:

root@iZwz9hs5ueqrblutmr5bncZ:/usr/local/zookeeper# bin/zkServer.sh start zoo1.cfg
root@iZwz9hs5ueqrblutmr5bncZ:/usr/local/zookeeper# bin/zkServer.sh start zoo2.cfg
root@iZwz9hs5ueqrblutmr5bncZ:/usr/local/zookeeper# bin/zkServer.sh start zoo3.cfg

可以在/usr/local/zookeeper路径下的zookeeper.out文件中看到服务开启的日志信息。

测试客户端的连接

在测试前,需要在阿里云服务器管理中心打开相应的Zookeeper客户端连接端口。打开Windows命令行,定位到Zookeeper路径执行下面命令:

D:\zookeeper-3.4.10>bin\zkCli -server 服务器ip:2181

发现Windows Zookeeper客户端已经成功连接上阿里云上的Zookeeper集群,创建一个test结点。此时我们连接的是2181端口,也就是伪集群当中zoo1的监听的端口号。如下图所示:

image

另外在阿里云服务器中也开启一个客户端的连接,看看我们之前添加的test结点有没有真正的添加到Zookeeper集群中。

image

上图可以看到test结点确实已经在集群当中了,而且此时连接的是2183端口,也就是zoo3这个server。由此验证了Zookeeper伪集群搭建成功。

分布式读写锁实现

下面就基于上面搭建的Zookeeper伪集群,实现分布式读写锁。我们都知道当一个事务获得读锁之后,在这之后的事务只能获取读锁,写锁获取必须等到所有读锁全部释放。而写锁一旦获取后其他事务的读锁以及写锁都必须等待该写锁释放后获取。那么Zookeeper是怎么实现这一特性的呢?
首先Zookeeper创建节点时可以创建4中形式的节点,分别是持久节点(PERSISTENT)、持久顺序节点(PERSISTENT_SEQUENTIAL)、临时节点(EPHEMERAL)、临时顺序节点(EPHEMERAL_SEQUENTIAL)。

  • 持久节点:该数据节点被创建后,就会一直存在于Zookeeper服务器上,直到有删除操作主动清除这个节点。
  • 持久顺序节点:基本特性和持久节点是一致的,额外的特性表现在顺序性上,在创建节点的过程中,Zookeeper会自动为给定节点名加上一个递增的数字后缀作为新的节点名。
  • 临时节点:临时节点的生命周期和客户端的会话绑定在一起,也就是说,如果客户端会话失效,那么节点就会被自动清理掉。另外,临时节点不能创建子结点。
  • 临时顺序节点:特性和临时节点一致,额外拥有顺序的特性。

其实分布式读写锁就是基于Zookeeper顺序节点特性来实现的。每个事务想要获得锁时都去同一个节点/lock下,创建命名规范为[hostname]-锁类型-序号的临时顺序节点。如果自身想要获取的是读锁,那么只要查看/lock下节点顺序比自身小的节点中没有写类型的节点便可获得读锁。如果自身想要获取写锁,那么只要看到/lock下自己是顺序最小的节点便可获得写锁。下面是多线程模拟事务的详细实现:

public class DistributedLockDemo {
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    private static String rootPath = "/lock";
    private static ZooKeeper zooKeeper;

    public static void main(String[] args) {
        try {
            zooKeeper = new ZooKeeper("119.23.216.241:2181", 5000, null); //连接我的Zookeeper集群
            zooKeeper.create(rootPath, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            WriteThread[] writeThreads = new WriteThread[5];  //创建5个写锁线程
            ReadThread[] readThreads = new ReadThread[10];    //创建10个读锁线程
            for (int i = 0; i < writeThreads.length; i++) {
                writeThreads[i] = new WriteThread("WriteThread_" + i);
                writeThreads[i].start();
            }
            for (int i = 0; i < readThreads.length; i++) {
                readThreads[i] = new ReadThread("ReadThread_" + i);
                readThreads[i].start();
            }
            TimeUnit.SECONDS.sleep(1);
            countDownLatch.countDown();
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    static class WriteThread extends Thread {
        public WriteThread(String name){
            super(name);
        }
        @Override
        public void run() {
            try {
                countDownLatch.await(); //多个线程同一时间竞争资源
                //path示例:/lock/WriteThread_0-W-0000000001
                String path = zooKeeper.create(rootPath + "/" + Thread.currentThread().getName() + "-W-", "".getBytes(),
                        ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                System.out.println(Thread.currentThread().getName() + ": try to acquire write lock...");
                while (!canGetWriteLock(path)) {
                    TimeUnit.MILLISECONDS.sleep(500);
                }
                afterGetLockDo(path, true);
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class ReadThread extends Thread {
        public ReadThread(String name){
            super(name);
        }
        @Override
        public void run() {
            try {
                countDownLatch.await();
                String path = zooKeeper.create(rootPath + "/" + Thread.currentThread().getName() + "-R-", "".getBytes(),
                        ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                System.out.println(Thread.currentThread().getName() + ": try to acquire read lock...");
                while (!canGetReadLock(path)){
                    TimeUnit.MILLISECONDS.sleep(500);
                }
                afterGetLockDo(path, false);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    }

    //判断是否可以获取写锁
    private static boolean canGetWriteLock(String path) {
        List<String> children = null; // /lock下所有子节点列表
        try {
            children = zooKeeper.getChildren(rootPath, false);
            //基于节点名当中的序号进行排序
            Collections.sort(children, new Comparator<String>() {
                public int compare(String o1, String o2) {
                    int index1 = o1.lastIndexOf("-");
                    int index2 = o2.lastIndexOf("-");
                    long a = Long.parseLong(o1.substring(index1 + 1));
                    long b = Long.parseLong(o2.substring(index2 + 1));
                    return (int) (a - b);
                }
            });
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //只要最小的节点是自身便可获得写锁
        boolean result = path.replace("/lock/", "").equals(children.get(0));
        return result;
    }

    //判断是否可以获得读锁
    private static boolean canGetReadLock(String path){
        List<String> children = null;
        try {
            children = zooKeeper.getChildren(rootPath, false);
            Collections.sort(children, new Comparator<String>() {
                public int compare(String o1, String o2) {
                    int index1 = o1.lastIndexOf("-");
                    int index2 = o2.lastIndexOf("-");
                    long a = Long.parseLong(o1.substring(index1 + 1));
                    long b = Long.parseLong(o2.substring(index2 + 1));
                    return (int) (a - b);
                }
            });
            //只要序号比自己小的节点中没有写类型节点便可获得读锁
            int index = children.indexOf(path.replace("/lock/", ""));
            for (int i = 0; i < index; i++) {
                String child = children.get(i);
                int k = child.indexOf("-");
                if (child.substring(k+1, k+2).equals("W")) return false;
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
        return true;
    }

    //获得锁后所做的工作
    private static void afterGetLockDo(String path, boolean isWriteLock) {
        if (isWriteLock) {
            System.out.println(Thread.currentThread().getName() + ": get write lock...");
            try {
                TimeUnit.SECONDS.sleep(2); //模拟写事务所做的工作
                System.out.println(Thread.currentThread().getName() + ": release write lock...");
                zooKeeper.delete(path, -1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        } else {
            System.out.println(Thread.currentThread().getName() + ": get read lock...");
            try {
                TimeUnit.SECONDS.sleep(1);
                System.out.println(Thread.currentThread().getName() + ": release read lock...");
                zooKeeper.delete(path, -1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    }

}

最后的运行结果如下图所示:

image

** 可以看到所有线程都是符合正常的读写锁逻辑并且获取到相关的锁资源。至此,Zookeeper读写锁就实现啦。 **

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,076评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,658评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,732评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,493评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,591评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,598评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,601评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,348评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,797评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,114评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,278评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,953评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,585评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,202评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,180评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,139评论 2 352

推荐阅读更多精彩内容