分布式环境下的独占锁,共享锁

基于zookeeper分布式环境下的锁,原理大概是:zookeeper的临时顺序节点。当session结束的时候,临时节点自动删除。发送watchEvent通知给客户端。然后占据临时顺序节点,索引为0的地方,这个进程就占有锁。其他的等待。当锁释放的时候,节点删除。后面一个变成锁的拥有者。主要是利用了zookeeper的临时顺序节点的原子递增性。基于zookeeper的分布式唯一id生成器,也是这个原理。

基于zookeeper的分布式独占锁

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;

import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * 基于zookeeper的分布式独占锁
 */
public class ZKDistributeLock {

    private final ZkClient client;
    private final String path;
    private final String basePath;
    private final String lockName;
    private String ourLockPath;

    /** 重试获取锁次数 */
    private static final Integer MAX_RETRY_COUNT = 10;
    private static final String LOCK_NAME = "lock-";

    public ZKDistributeLock(ZkClient client, String basePath) {
        this.client = client;
        this.basePath = basePath;
        this.path = basePath.concat("/").concat(LOCK_NAME);
        this.lockName = LOCK_NAME;
    }

    public void getLock() throws Exception {
        // -1 表示永不超时
        ourLockPath = tryGetLock(-1, null);
        if(ourLockPath == null){
            throw new IOException("连接丢失!在路径:'" + basePath + "'下不能获取锁!");
        }
    }

    public boolean getLock(long timeOut, TimeUnit timeUnit) throws Exception {
        ourLockPath = tryGetLock(timeOut, timeUnit);
        return ourLockPath != null;
    }

    public void releaseLock() throws Exception {
        releaseLock(ourLockPath);
    }

    /**
     * 等待获取锁
     * @param startMillis
     * @param millisToWait
     * @param ourPath
     * @return
     * @throws Exception
     */
    private boolean waitToLock(long startMillis, Long millisToWait, String ourPath) throws Exception {

        // 是否得到锁
        boolean haveTheLock = false;
        // 是否需要删除当前锁的节点
        boolean doDeleteOurPath = false;

        try {

            while (!haveTheLock) {

                // 获取所有锁节点(/locker下的子节点)并排序(从小到大)
                List<String> children = getSortedChildren();

                // 获取顺序节点的名字 如:/locker/lock-0000000013 > lock-0000000013
                String sequenceNodeName = ourPath.substring(basePath.length() + 1);

                // 判断该该节点是否在所有子节点的第一位 如果是就已经获得锁
                int ourIndex = children.indexOf(sequenceNodeName);
                if (ourIndex < 0) {
                    // 可能网络闪断 抛给上层处理
                    throw new ZkNoNodeException("节点没有找到: " + sequenceNodeName);
                }

                boolean isGetTheLock = (ourIndex == 0);

                if (isGetTheLock) {
                    // 如果第一位 已经获得锁
                    haveTheLock = true;
                } else {
                    // 如果不是第一位,监听比自己小的那个节点的删除事件
                    String pathToWatch = children.get(ourIndex - 1);
                    String previousSequencePath = basePath.concat("/").concat(pathToWatch);
                    final CountDownLatch latch = new CountDownLatch(1);
                    final IZkDataListener previousListener = new IZkDataListener() {

                        public void handleDataDeleted(String dataPath) throws Exception {
                            latch.countDown();
                        }

                        public void handleDataChange(String dataPath, Object data) throws Exception {
                        }
                    };

                    try {
                        client.subscribeDataChanges(previousSequencePath, previousListener);

                        if (millisToWait != null) {
                            millisToWait -= (System.currentTimeMillis() - startMillis);
                            startMillis = System.currentTimeMillis();
                            if (millisToWait <= 0) {
                                doDeleteOurPath = true;
                                break;
                            }

                            latch.await(millisToWait, TimeUnit.MICROSECONDS);
                        } else {
                            latch.await();
                        }
                    } catch (ZkNoNodeException e) {
                        e.printStackTrace();
                    } finally {
                        client.unsubscribeDataChanges(previousSequencePath, previousListener);
                    }

                }
            }
        } catch (Exception e) {
            //发生异常需要删除节点
            doDeleteOurPath = true;
            throw e;
        } finally {
            //如果需要删除节点
            if (doDeleteOurPath) {
                deleteOurPath(ourPath);
            }
        }

        return haveTheLock;
    }

    /**
     * 获取所有锁节点(/locker下的子节点)并排序
     *
     * @return
     * @throws Exception
     */
    private List<String> getSortedChildren() throws Exception {
        try {

            List<String> children = client.getChildren(basePath);
            Collections.sort
                    (
                            children,
                            new Comparator<String>() {
                                public int compare(String lhs, String rhs) {
                                    return getLockNodeNumber(lhs, lockName).compareTo(getLockNodeNumber(rhs, lockName));
                                }
                            }
                    );
            return children;

        } catch (ZkNoNodeException e) {
            client.createPersistent(basePath, true);
            return getSortedChildren();

        }
    }

    protected void releaseLock(String lockPath) throws Exception {
        deleteOurPath(lockPath);
    }

    /**
     * 尝试获取锁
     * @param timeOut
     * @param timeUnit
     * @return 锁节点的路径没有获取到锁返回null
     * @throws Exception
     */
    protected String tryGetLock(long timeOut, TimeUnit timeUnit) throws Exception {

        long startMillis = System.currentTimeMillis();
        Long millisToWait = (timeUnit != null) ? timeUnit.toMillis(timeOut) : null;

        String ourPath = null;
        boolean hasTheLock = false;
        boolean isDone = false;
        int retryCount = 0;

        //网络闪断需要重试一试
        while (!isDone) {
            isDone = true;

            try {
                // 在/locker下创建临时的顺序节点
                ourPath = createLockNode(client, path);

                // 判断你自己是否获得了锁,如果没获得那么我们等待直到获取锁或者超时
                hasTheLock = waitToLock(startMillis, millisToWait, ourPath);
            } catch (ZkNoNodeException e) {
                if (retryCount++ < MAX_RETRY_COUNT) {
                    isDone = false;
                } else {
                    throw e;
                }
            }
        }

        if (hasTheLock) {
            return ourPath;
        }

        return null;
    }

    private void deleteOurPath(String ourPath) throws Exception {
        client.delete(ourPath);
    }

    private String createLockNode(ZkClient client, String path) throws Exception {
        // 创建临时循序节点
        return client.createEphemeralSequential(path, null);
    }

    private String getLockNodeNumber(String str, String lockName) {
        int index = str.lastIndexOf(lockName);
        if (index >= 0) {
            index += lockName.length();
            return index <= str.length() ? str.substring(index) : "";
        }
        return str;
    }
}

pom依赖:

<dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.6</version>
        </dependency>

        <!-- ZkClient -->
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.9</version>
        </dependency>

测试:

import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;

public class LockTest {
    public static final String ZK_CONNECT_STRING="47.99.45.243:2181";

    public static void main(String[] args) {

        // 需要手动创建节点 /locker

        ZkClient zkClient1 = new ZkClient(ZK_CONNECT_STRING, 5000,
                5000, new BytesPushThroughSerializer());
        ZKDistributeLock lock1 = new ZKDistributeLock(zkClient1, "/locker");

        ZkClient zkClient2 = new ZkClient(ZK_CONNECT_STRING, 5000,
                5000, new BytesPushThroughSerializer());
        final ZKDistributeLock lock2 = new ZKDistributeLock(zkClient2, "/locker");

        try {
            lock1.getLock();
            System.out.println("Client1 is get lock!");
            Thread client2Thd = new Thread(new Runnable() {

                public void run() {
                    try {
                        lock2.getLock();
//                        lock2.getLock(500, TimeUnit.SECONDS);
                        System.out.println("Client2 is get lock");
                        lock2.releaseLock();
                        System.out.println("Client2 is released lock");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            client2Thd.start();

            // 5s 后lock1释放锁
            Thread.sleep(5000);
            lock1.releaseLock();
            System.out.println("Client1 is released lock");

            client2Thd.join();

        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

输出结果:

Client1 is get lock!
Client1 is released lock
Client2 is get lock
Client2 is released lock

基于zookeeper的分布式共享锁:

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNoNodeException;

import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * 基于zookeeper的分布式共享锁 信号量(有缺陷,因为实际上应该是前面5个,任意一个节点被删除,都应该受到watchEvent通知,
 * 但是这样的话,代码难以控制,
 * ******也可以超时后(这个节点一直任务比较长,一直不释放锁,一直不释放锁的话,前面的已经释放,最终他会成为
 * index=0的节点),往后面寻找),但是这里暂时不实现,只实现简单版本的共享锁,这种有可能死锁*******
 * 假设共享锁持有的permit许可,有5张
 * 加锁: 先生成临时顺序节点,然后获得basePath下的所有孩子节点。从小到大排序
 * ourlockPath 先获得。 然后previous5Index = ourLockPathIndex -5 < 0 说明是获得锁,
 * 如果大于等于0 ,说明是没获得,等待。注册监听器,监听前面第5个节点,dataChange的时候,说明节点删除,
 */
public class ZKDistributeSemaphore {

    private final ZkClient client;
    private final String path;
    private final String basePath;
    private final String lockName;
    private String ourLockPath;
    private  int permits =5;

    /** 重试获取锁次数 */
    private static final Integer MAX_RETRY_COUNT = 10;
    private static final String LOCK_NAME = "lock-";

    public ZKDistributeSemaphore(ZkClient client, String basePath,int permits) {
        this.client = client;
        this.basePath = basePath;
        this.path = basePath.concat("/").concat(LOCK_NAME);
        this.lockName = LOCK_NAME;
        this.permits = permits;
    }

    public void getLock() throws Exception {
        // -1 表示永不超时
        ourLockPath = tryGetLock(-1, null);
        if(ourLockPath == null){
            throw new IOException("连接丢失!在路径:'" + basePath + "'下不能获取锁!");
        }
    }

    public boolean getLock(long timeOut, TimeUnit timeUnit) throws Exception {
        ourLockPath = tryGetLock(timeOut, timeUnit);
        return ourLockPath != null;
    }

    public void releaseLock() throws Exception {
        releaseLock(ourLockPath);
    }

    /**
     * 等待获取锁
     * @param startMillis
     * @param millisToWait
     * @param ourPath
     * @return
     * @throws Exception
     */
    private boolean waitToLock(long startMillis, Long millisToWait, String ourPath) throws Exception {

        // 是否得到锁
        boolean haveTheLock = false;
        // 是否需要删除当前锁的节点
        boolean doDeleteOurPath = false;

        try {

            while (!haveTheLock) {

                // 获取所有锁节点(/locker下的子节点)并排序(从小到大)
                List<String> children = getSortedChildren();

                // 获取顺序节点的名字 如:/locker/lock-0000000013 > lock-0000000013
                String sequenceNodeName = ourPath.substring(basePath.length() + 1);

                // 判断该该节点是否在所有子节点的第一位 如果是就已经获得锁
                int ourIndex = children.indexOf(sequenceNodeName);
                if (ourIndex < 0) {
                    // 可能网络闪断 抛给上层处理
                    throw new ZkNoNodeException("节点没有找到: " + sequenceNodeName);
                }

                boolean isGetTheLock = (ourIndex-permits<0);

                if (isGetTheLock) {
                    // 如果第一位 已经获得锁
                    haveTheLock = true;
                } else {
                    // 如果没有拿到锁,就监听前面第5个节点
                    String pathToWatch = children.get(ourIndex - permits);
                    String previousSequencePath = basePath.concat("/").concat(pathToWatch);
                    final CountDownLatch latch = new CountDownLatch(1);
                    final IZkDataListener previousListener = new IZkDataListener() {

                        public void handleDataDeleted(String dataPath) throws Exception {
                            latch.countDown();
                        }

                        public void handleDataChange(String dataPath, Object data) throws Exception {
                        }
                    };

                    try {
                        client.subscribeDataChanges(previousSequencePath, previousListener);

                        if (millisToWait != null) {
                            millisToWait -= (System.currentTimeMillis() - startMillis);
                            startMillis = System.currentTimeMillis();
                            if (millisToWait <= 0) {
                                doDeleteOurPath = true;
                                break;
                            }

                            latch.await(millisToWait, TimeUnit.MICROSECONDS);
                        } else {
                            latch.await();
                        }
                    } catch (ZkNoNodeException e) {
                        e.printStackTrace();
                    } finally {
                        client.unsubscribeDataChanges(previousSequencePath, previousListener);
                    }

                }
            }
        } catch (Exception e) {
            //发生异常需要删除节点
            doDeleteOurPath = true;
            throw e;
        } finally {
            //如果需要删除节点
            if (doDeleteOurPath) {
                deleteOurPath(ourPath);
            }
        }

        return haveTheLock;
    }

    /**
     * 获取所有锁节点(/locker下的子节点)并排序
     *
     * @return
     * @throws Exception
     */
    private List<String> getSortedChildren() throws Exception {
        try {

            List<String> children = client.getChildren(basePath);
            Collections.sort
                    (
                            children,
                            new Comparator<String>() {
                                public int compare(String lhs, String rhs) {
                                    return getLockNodeNumber(lhs, lockName).compareTo(getLockNodeNumber(rhs, lockName));
                                }
                            }
                    );
            return children;

        } catch (ZkNoNodeException e) {
            client.createPersistent(basePath, true);
            return getSortedChildren();

        }
    }

    protected void releaseLock(String lockPath) throws Exception {
        deleteOurPath(lockPath);
    }

    /**
     * 尝试获取锁
     * @param timeOut
     * @param timeUnit
     * @return 锁节点的路径没有获取到锁返回null
     * @throws Exception
     */
    protected String tryGetLock(long timeOut, TimeUnit timeUnit) throws Exception {

        long startMillis = System.currentTimeMillis();
        Long millisToWait = (timeUnit != null) ? timeUnit.toMillis(timeOut) : null;

        String ourPath = null;
        boolean hasTheLock = false;
        boolean isDone = false;
        int retryCount = 0;

        //网络闪断需要重试一试
        while (!isDone) {
            isDone = true;

            try {
                // 在/locker下创建临时的顺序节点
                ourPath = createLockNode(client, path);

                // 判断你自己是否获得了锁,如果没获得那么我们等待直到获取锁或者超时
                hasTheLock = waitToLock(startMillis, millisToWait, ourPath);
            } catch (ZkNoNodeException e) {
                if (retryCount++ < MAX_RETRY_COUNT) {
                    isDone = false;
                } else {
                    throw e;
                }
            }
        }

        if (hasTheLock) {
            return ourPath;
        }

        return null;
    }

    private void deleteOurPath(String ourPath) throws Exception {
        client.delete(ourPath);
    }

    private String createLockNode(ZkClient client, String path) throws Exception {
        // 创建临时循序节点
        return client.createEphemeralSequential(path, null);
    }

    private String getLockNodeNumber(String str, String lockName) {
        int index = str.lastIndexOf(lockName);
        if (index >= 0) {
            index += lockName.length();
            return index <= str.length() ? str.substring(index) : "";
        }
        return str;
    }
}

这个没有测试过。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,542评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,596评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,021评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,682评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,792评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,985评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,107评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,845评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,299评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,612评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,747评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,441评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,072评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,828评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,069评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,545评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,658评论 2 350

推荐阅读更多精彩内容