ZooKeeper + Curator 实现分布式锁

在 JDK 的 java.util.concurrent.locks 中, 为我们提供了可重入锁, 读写锁, 及超时获取锁的方法. 为我们提供了完好的支持, 但是在分布式系统中, 当多个应用需要共同操作某一个资源时. 我么就无法使用 JDK 来实现了, 这时就需要使用一个外部服务来为此进行支持, 现在我们选用 ZooKeeper + Curator 来完成分布式锁

项目环境

  • ZooKeeper 3.5.3-beta
  • Curator 4.0.0

如果 ZooKeeper 版本为 3.4.x, 请进行兼容处理

准备工作

下载、安装、启动 ZooKeeper, 可以查看这篇博文ZooKeeper 的安装、配置、启动和使用(一)——单机模式
如果想跳过这一步的话请参考最下面的[便捷测试](# 便捷测试)
创建一个 Maven 工程, 然后引入所需资源

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.0.0</version>
</dependency>
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
</dependency>

在 src/test/java 下创建一个 DistributedLockDemo 类
基本代码如下

public class DistributedLockDemo {

    // ZooKeeper 锁节点路径, 分布式锁的相关操作都是在这个节点上进行
    private final String lockPath = "/distributed-lock";

    // ZooKeeper 服务地址, 单机格式为:(127.0.0.1:2181), 集群格式为:(127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183)
    private String connectString;
    // Curator 客户端重试策略
    private RetryPolicy retry;
    // Curator 客户端对象
    private CuratorFramework client;
    // client2 用户模拟其他客户端
    private CuratorFramework client2;
    
    // 初始化资源
    @Before
    public void init() throws Exception {
        // 设置 ZooKeeper 服务地址为本机的 2181 端口
        connectString = "127.0.0.1:2181";
        // 重试策略
        // 初始休眠时间为 1000ms, 最大重试次数为 3
        retry = new ExponentialBackoffRetry(1000, 3);
        // 创建一个客户端, 60000(ms)为 session 超时时间, 15000(ms)为链接超时时间
        client = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry);
        client2 = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry);
        // 创建会话
        client.start();
        client2.start();
    }
    
    // 释放资源
    @After
    public void close() {
        CloseableUtils.closeQuietly(client);
    }
}   

共享锁

@Test
public void sharedLock() throws Exception {
    // 创建共享锁
    InterProcessLock lock = new InterProcessSemaphoreMutex(client, lockPath);
    // lock2 用于模拟其他客户端
    InterProcessLock lock2 = new InterProcessSemaphoreMutex(client2, lockPath);

    // 获取锁对象
    lock.acquire();

    // 测试是否可以重入
    // 超时获取锁对象(第一个参数为时间, 第二个参数为时间单位), 因为锁已经被获取, 所以返回 false
    Assert.assertFalse(lock.acquire(2, TimeUnit.SECONDS));
    // 释放锁
    lock.release();

    // lock2 尝试获取锁成功, 因为锁已经被释放
    Assert.assertTrue(lock2.acquire(2, TimeUnit.SECONDS));
    lock2.release();
}

共享可重入锁

public void sharedReentrantLock() throws Exception {
    // 创建可重入锁
    InterProcessLock lock = new InterProcessMutex(client, lockPath);
    // lock2 用于模拟其他客户端
    InterProcessLock lock2 = new InterProcessMutex(client2, lockPath);
    // lock 获取锁
    lock.acquire();
    try {
        // lock 第二次获取锁
        lock.acquire();
        try {
            // lock2 超时获取锁, 因为锁已经被 lock 客户端占用, 所以获取失败, 需要等 lock 释放
            Assert.assertFalse(lock2.acquire(2, TimeUnit.SECONDS));
        } finally {
            lock.release();
        }
    } finally {
        // 重入锁获取与释放需要一一对应, 如果获取 2 次, 释放 1 次, 那么该锁依然是被占用, 如果将下面这行代码注释, 那么会发现下面的 lock2 获取锁失败
        lock.release();
    }
    // 在 lock 释放后, lock2 能够获取锁
    Assert.assertTrue(lock2.acquire(2, TimeUnit.SECONDS));
    lock2.release();
}

共享可重入读写锁

@Test
public void sharedReentrantReadWriteLock() throws Exception {
    // 创建读写锁对象, Curator 以公平锁的方式进行实现
    InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, lockPath);
    // lock2 用于模拟其他客户端
    InterProcessReadWriteLock lock2 = new InterProcessReadWriteLock(client2, lockPath);
    // 使用 lock 模拟读操作
    // 使用 lock2 模拟写操作
    // 获取读锁(使用 InterProcessMutex 实现, 所以是可以重入的)
    InterProcessLock readLock = lock.readLock();
    // 获取写锁(使用 InterProcessMutex 实现, 所以是可以重入的)
    InterProcessLock writeLock = lock2.writeLock();

    /**
     * 读写锁测试对象
     */
    class ReadWriteLockTest {
        // 测试数据变更字段
        private Integer testData = 0;
        private Set<Thread> threadSet = new HashSet<>();

        // 写入数据
        private void write() throws Exception {
            writeLock.acquire();
            try {
                Thread.sleep(10);
                testData++;
                System.out.println("写入数据 \ t" + testData);
            } finally {
                writeLock.release();
            }
        }

        // 读取数据
        private void read() throws Exception {
            readLock.acquire();
            try {
                Thread.sleep(10);
                System.out.println("读取数据 \ t" + testData);
            } finally {
                readLock.release();
            }
        }

        // 等待线程结束, 防止 test 方法调用完成后, 当前线程直接退出, 导致控制台无法输出信息
        public void waitThread() throws InterruptedException {
            for (Thread thread : threadSet) {
                thread.join();
            }
        }

        // 创建线程方法
        private void createThread(int type) {
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        if (type == 1) {
                            write();
                        } else {
                            read();
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            threadSet.add(thread);
            thread.start();
        }

        // 测试方法
        public void test() {
            for (int i = 0; i < 5; i++) {
                createThread(1);
            }
            for (int i = 0; i < 5; i++) {
                createThread(2);
            }
        }
    }

    ReadWriteLockTest readWriteLockTest = new ReadWriteLockTest();
    readWriteLockTest.test();
    readWriteLockTest.waitThread();
}

测试结果如下:

写入数据 1
写入数据 2
读取数据 2
写入数据 3
读取数据 3
写入数据 4
读取数据 4
读取数据 4
写入数据 5
读取数据 5

读取数据线程总是能看到最新写入的数据

共享信号量

@Test
public void semaphore() throws Exception {
    // 创建一个信号量, Curator 以公平锁的方式进行实现
    InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, lockPath, 6);
    // semaphore2 用于模拟其他客户端
    InterProcessSemaphoreV2 semaphore2 = new InterProcessSemaphoreV2(client2, lockPath, 6);

    // 获取一个许可
    Lease lease = semaphore.acquire();
    Assert.assertNotNull(lease);
    // semaphore.getParticipantNodes() 会返回当前参与信号量的节点列表, 俩个客户端所获取的信息相同
    Assert.assertEquals(semaphore.getParticipantNodes(), semaphore2.getParticipantNodes());

    // 超时获取一个许可
    Lease lease2 = semaphore2.acquire(2, TimeUnit.SECONDS);
    Assert.assertNotNull(lease2);
    Assert.assertEquals(semaphore.getParticipantNodes(), semaphore2.getParticipantNodes());

    // 获取多个许可, 参数为许可数量
    Collection<Lease> leases = semaphore.acquire(2);
    Assert.assertTrue(leases.size() == 2);
    Assert.assertEquals(semaphore.getParticipantNodes(), semaphore2.getParticipantNodes());

    // 超时获取多个许可, 第一个参数为许可数量
    Collection<Lease> leases2 = semaphore2.acquire(2, 2, TimeUnit.SECONDS);
    Assert.assertTrue(leases2.size() == 2);
    Assert.assertEquals(semaphore.getParticipantNodes(), semaphore2.getParticipantNodes());

    // 目前 semaphore 已经获取 3 个许可, semaphore2 也获取 3 个许可, 加起来为 6 个, 所以他们无法再进行许可获取
    Assert.assertNull(semaphore.acquire(2, TimeUnit.SECONDS));
    Assert.assertNull(semaphore2.acquire(2, TimeUnit.SECONDS));

    // 释放一个许可
    semaphore.returnLease(lease);
    semaphore2.returnLease(lease2);
    // 释放多个许可
    semaphore.returnAll(leases);
    semaphore2.returnAll(leases2);
}

多重共享锁

@Test
public void multiLock() throws Exception {
    // 可重入锁
    InterProcessLock interProcessLock1 = new InterProcessMutex(client, lockPath);
    // 不可重入锁
    InterProcessLock interProcessLock2 = new InterProcessSemaphoreMutex(client2, lockPath);
    // 创建多重锁对象
    InterProcessLock lock = new InterProcessMultiLock(Arrays.asList(interProcessLock1, interProcessLock2));
    // 获取参数集合中的所有锁
    lock.acquire();

    // 因为存在一个不可重入锁, 所以整个 InterProcessMultiLock 不可重入
    Assert.assertFalse(lock.acquire(2, TimeUnit.SECONDS));
    // interProcessLock1 是可重入锁, 所以可以继续获取锁
    Assert.assertTrue(interProcessLock1.acquire(2, TimeUnit.SECONDS));
    // interProcessLock2 是不可重入锁, 所以获取锁失败
    Assert.assertFalse(interProcessLock2.acquire(2, TimeUnit.SECONDS));

    // 释放参数集合中的所有锁
    lock.release();

    // interProcessLock2 中的锁已经释放, 所以可以获取
    Assert.assertTrue(interProcessLock2.acquire(2, TimeUnit.SECONDS));

}

Curator 分布式锁解决的问题

分布式锁服务宕机, ZooKeeper 一般是以集群部署, 如果出现 ZooKeeper 宕机, 那么只要当前正常的服务器超过集群的半数, 依然可以正常提供服务
持有锁资源服务器宕机, 假如一台服务器获取锁之后就宕机了, 那么就会导致其他服务器无法再获取该锁. 就会造成 死锁 问题, 在 Curator 中, 锁的信息都是保存在临时节点上, 如果持有锁资源的服务器宕机, 那么 ZooKeeper 就会移除它的信息, 这时其他服务器就能进行获取锁操作

便捷测试

为了测试上面的代码, 我们需要下载、安装、启动一个 ZooKeeper 服务, 然后将该服务地址配置为 connectString. 如果更换环境的话又需要重新安装, 未免麻烦了点. Curator 为我们提供一个专门用于开发、测试的便捷方法, 让我们更加专注于编写与 ZooKeeper 相关的程序.
首先需要导入 curator-test 测试包

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-test</artifactId>
    <version>4.0.0</version>
    <scope>test</scope>
</dependency>

在这个包中为我们提供了一个 TestingServer 类, 主要用法如下
构造方法有多个, 但是主要使用到的有这两个

TestingServer()
TestingServer(int port, File tempDirectory)

port 为端口
tempDirectory 为临时的 dataDir 目录
如果调用 TestingServer() 方法构造, 会获取一个空闲端口, 同时在 java.io.tmpdir 创建一个临时目录当作本次的 dataDir 目录
然后使用以下方法创建客户端

TestingServer server=new TestingServer();
// server.getConnectString() 方法会返回可用的服务链接地址, 如: 127.0.0.1:2181
CuratorFramework client=CuratorFrameworkFactory.newClient(server.getConnectString(), retry);

另外在测试完成记得进行资源释放

@After
public void close() {
    CloseableUtils.closeQuietly(client);
    CloseableUtils.closeQuietly(server);
}

TestingServer 能为我们简单的启动一个 ZooKeeper 服务器, 但是如果需要进行集群测试呢?
这个时候我们可以使用 TestingCluster 启动 ZooKeeper 集群
TestingCluster 同样提供多个构造器, 但是主要使用以下两个

TestingCluster(int instanceQty)
TestingCluster(InstanceSpec... specs)

instanceQty 是集群的数量
specs 是 InstanceSpec 的变长参数
InstanceSpec 的创建方法可以参考 TestingServer 的构造方法实现
然后创建客户端使用以下方法

TestingCluster server=new TestingCluster(3);
// server.getConnectString() 方法会返回可用的服务链接地址, 如: 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
CuratorFramework client=CuratorFrameworkFactory.newClient(server.getConnectString(), retry);

同样请记得释放资源

测试源码

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352

推荐阅读更多精彩内容

  • 一、ZooKeeper的背景 1.1 认识ZooKeeper ZooKeeper---译名为“动物园管理员”。动物...
    algernoon阅读 9,063评论 1 106
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,646评论 18 139
  • Zookeeper--Zookeeper是什么博客借鉴http://www.cnblogs.com/yuyijq/...
    Albert陈凯阅读 6,034评论 1 36
  • 该是怎样的湖光山色 撩动你细腻的神经 触动你奇丽的想象 让你写下这难以言表的自然 15岁的手稿泛着金黄 28岁对声...
    劈柴捌哥阅读 163评论 0 2
  • 会话? 使用Cookie实现会话管理 HttpSession HttpSession原理
    pilipalaboompow阅读 371评论 0 0