Zookeeper分布式锁完整代码实现篇

定义锁根路径:/locks ,过期时间

获得锁:
1先创建父节点locks,当前线程创建临时顺序节点(/locks/non000001)
2 获取/locks下所有孩子节点(get /locks)并排序,最小的节点排最前面
3 如果子节点为空说明会话断开了,节点被删除,返回获取锁失败
4 如果当前节点=最小节点,则返回获取锁成功,并新建一个线程来判定锁过期。
5 如果当前节点不是最小节点,则设置监听比自己次小节点的删除事件,然后挂起当前线程。(公平锁),如果要实现非公平锁,则设置监听最小节点。
6 当最小编号的线程获取锁,处理完业务则删除自己对应的zk节点,删除后会激活比自己大一号的节点线程从阻塞变为运行,被激活的线程是当前节点最小的了,然后就可以获取到锁。
代码实现:

package geektime.spring.springbucks.waiter;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * @author liangxianliang
 * @create 2020-01-09 21:54
 */
public class ZookeeperLockTest {

    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperLockTest.class);
    private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();
    private static final ThreadLocal<ZooKeeper> ZK_THREAD_LOCAL = new ThreadLocal<>();
    /** 锁的根路径 **/
    private static final String LOCK_ROOT_PATH = "/locks";
    /** 锁后缀 **/
    private static final String LOCK_SUFFIX = "_NO_";
    /** 创建根节点同步锁 **/
    private static final String CREATE_ROOT_LOCK = "LOCK";
    /** 公平锁 **/
    private static final boolean LOCK_FAIR = false;
    private final static byte[] BUF = new byte[0];
    static int LOCK_EXPIRES = 10;
    static int LOCK_WAITTIME = 10 ;


    /**
     *  获得所有
     *  @param key   路径不含‘/’
     *  @return boolean
     */
    public static boolean tryLock(String key) {
        return tryLock(key, LOCK_EXPIRES, LOCK_WAITTIME);
    }

    /**
     *  获得锁
     *  @param key   键/路径
     *  @param expire    过期时间
     *  @param wait  等待时间
     *  @return boolean
     */
    public static boolean tryLock(String key, long expire, long wait) {
        ZooKeeper zooKeeper = getZooKeeper();
        ZK_THREAD_LOCAL.set(zooKeeper);
        return tryLock(zooKeeper, key, expire, wait);
    }

    /**
     *  获得锁
     *  @param zooKeeper zk连接
     *  @param key   路径
     *  @param expire    过期时间
     *  @param wait  等待时间
     *  @return boolean
     */
    private static boolean tryLock(ZooKeeper zooKeeper, String key, long expire, long wait) {
        expire = expire * 1000;
        wait = wait * 1000;
        final String currNode;
        String path = LOCK_ROOT_PATH + "/" + key + LOCK_SUFFIX;
        try {
            currNode = zooKeeper.create(path, BUF, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            //步骤一
            List<String> nodes = zooKeeper.getChildren(LOCK_ROOT_PATH, false);
            //过滤掉集合中不是当前业务的临时节点
            nodes = nodes.stream().filter(o -> o.startsWith(key)).collect(Collectors.toList());
            nodes.sort(String::compareTo);
            //如果集合为空说明当前创建节点的session在步骤一处已经断开,并且创建的节点已经被zk服务器删除, 此种情况比较极端
            if (nodes.size() == 0) {
                return false;
            }
            //最小的节点就是自己创建的节点表示拿到锁
            if (currNode.endsWith(nodes.get(0))) {
                runExpireThread(zooKeeper, currNode, expire);
                return true;
            }
            //没有拿到锁
            CountDownLatch countDownLatch = new CountDownLatch(1);
            //非公平锁
            if(LOCK_FAIR){
                for (int i = 0; i < nodes.size(); i++) {
                    String node = nodes.get(i);
                    if (currNode.endsWith(node)) {
                        runExpireThread(zooKeeper, currNode, expire);
                        return true;
                    }
                    Stat stat = zooKeeper.exists(LOCK_ROOT_PATH + "/" + node, new LockWatcher(countDownLatch));
                    if (stat != null) {
                        delPath(zooKeeper);
                        //等待锁超时
                        if(!countDownLatch.await(wait, TimeUnit.MILLISECONDS)){
                            return tryLock(zooKeeper, key, expire, wait);
                        }
                    }
                }
            }else{
                for (int i = 0; i < nodes.size(); i++) {
                    String node = nodes.get(i);
                    if (currNode.endsWith(node)) {
                        runExpireThread(zooKeeper, currNode, expire);
                        return true;
                    }
                    //当前节点的前一个节点
                    if (currNode.endsWith(nodes.get(i + 1))) {
                        Stat stat = zooKeeper.exists(LOCK_ROOT_PATH + "/" + node, new LockWatcher(countDownLatch));
                        if (stat != null) {
                            // 等待锁超时,如果是公平锁,等待时间是默认等待时间的2倍,防止因为拿锁的线程处理业务时间太久
                            // 导致当前线程等待超时
                            if(!countDownLatch.await(wait * 2, TimeUnit.MILLISECONDS)){
                                delPath(zooKeeper);
                                return false;
                            }
                            return true;
                        }
                    }
                }
            }
        } catch (KeeperException | InterruptedException e) {
            LOG.error("create '{}' node fail.", key, e);
        }
        return false;
    }

    /**
     *  释放锁
     */
    public static void unLock() {
        ZooKeeper zooKeeper = ZK_THREAD_LOCAL.get();
        delPath(zooKeeper);
        close(ZK_THREAD_LOCAL.get());
        THREAD_LOCAL.remove();
        ZK_THREAD_LOCAL.remove();
    }

    /**
     *  创建分布式锁的根路径
     */
    private static void createLockRootPath() {
        ZooKeeper zooKeeper = getZooKeeper();
        try {
            Stat stat = zooKeeper.exists(LOCK_ROOT_PATH, false);
            if (stat == null) {
                synchronized (CREATE_ROOT_LOCK) {
                    stat = zooKeeper.exists(LOCK_ROOT_PATH, false);
                    if (stat == null) {
                        LOG.info("create lock root path '{}'", LOCK_ROOT_PATH);
                        zooKeeper.create(LOCK_ROOT_PATH, BUF, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                        String path = LOCK_ROOT_PATH + "/key"  + LOCK_SUFFIX;
                        Stat stats = zooKeeper.exists(path, false);
                        zooKeeper.create(path, BUF, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                    }
                }
            }
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     *  获得zooke会话连接
     *  @return org.apache.zookeeper.ZooKeeper
     */
    private static ZooKeeper getZooKeeper() {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            final ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 30000, null);
            zooKeeper.register((watchedEvent) -> {
                switch (watchedEvent.getState()) {
                    case Expired:
                        close(zooKeeper);
                        break;
                    case SyncConnected:
                        countDownLatch.countDown();
                    default:
                }
            });
            if(!countDownLatch.await(3000, TimeUnit.MILLISECONDS)){
                close(zooKeeper);
                throw new RuntimeException("wait for creating zookeeper connection timeout, timeout is [3000]");
            }
            return zooKeeper;
        } catch (Exception e) {
            throw new RuntimeException("create Zookeeper instance fail.", e);
        }
    }

    /**
     *  启动一个线程来判断锁的过期时间,方式业务假死,zk不断开导致死锁
     *  @param zooKeeper zk连接
     *  @param currNode  当前节点
     */
    private static void runExpireThread(final ZooKeeper zooKeeper, String currNode, long expire){
        THREAD_LOCAL.set(currNode);
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(() -> {
            try {
                Thread.sleep(expire * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            LOG.info("等待了{}秒, 主动结束.", expire);
            delPath(zooKeeper);
        });
    }

    /**
     *  删除创建的路径
     *  @param zooKeeper zk连接
     */
    private static void delPath(ZooKeeper zooKeeper) {
        try {
            //无论节点是否存在,直接执行删除操作
            zooKeeper.delete(THREAD_LOCAL.get(), -1);
        } catch (Exception e){
            LOG.error("lock expire, delete lock");
        }
    }

    /**
     *  断开连接
     *  @param zooKeeper zk连接
     */
    private static void close(ZooKeeper zooKeeper) {
        if (zooKeeper != null) {
            try {
                zooKeeper.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     *  监听节点删除事件
     */
    static class LockWatcher implements Watcher {
        private CountDownLatch latch;

        public LockWatcher(CountDownLatch latch) {
            this.latch = latch;
        }

        @Override
        public void process(WatchedEvent event) {
            if(event.getType() == Event.EventType.NodeDeleted){
                latch.countDown();
            }
        }
    }

    public static void main(String[] args) throws  Exception {
        //createLockRootPath();
        int count = 30;
        ExecutorService executorService = Executors.newFixedThreadPool(count);
        for( int i = 0; i < count; i++) {
            executorService.execute(() -> {
                try {
                    if(ZookeeperLockTest.tryLock("lock")){
                        System.out.println("我拿到了锁");
                        System.out.println(Thread.currentThread().getName() + ": 我要等待6秒,测试其他超时是否有效.");
                        Thread.sleep(1000);
                        System.out.println("我的等待结束了,其他人可以过来拿锁了.");
                    }else{
                        System.out.println("没有拿到锁");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }finally{
                    ZookeeperLockTest.unLock();
                }
            });
        }
        executorService.shutdown();
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,907评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,987评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,298评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,586评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,633评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,488评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,275评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,176评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,619评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,819评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,932评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,655评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,265评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,871评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,994评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,095评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,884评论 2 354

推荐阅读更多精彩内容