Zookeeper分布式锁完整代码实现篇

定义锁根路径:/locks ,过期时间

获得锁:
1先创建父节点locks,当前线程创建临时顺序节点(/locks/non000001)
2 获取/locks下所有孩子节点(get /locks)并排序,最小的节点排最前面
3 如果子节点为空说明会话断开了,节点被删除,返回获取锁失败
4 如果当前节点=最小节点,则返回获取锁成功,并新建一个线程来判定锁过期。
5 如果当前节点不是最小节点,则设置监听比自己次小节点的删除事件,然后挂起当前线程。(公平锁),如果要实现非公平锁,则设置监听最小节点。
6 当最小编号的线程获取锁,处理完业务则删除自己对应的zk节点,删除后会激活比自己大一号的节点线程从阻塞变为运行,被激活的线程是当前节点最小的了,然后就可以获取到锁。
代码实现:

package geektime.spring.springbucks.waiter;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * @author liangxianliang
 * @create 2020-01-09 21:54
 */
public class ZookeeperLockTest {

    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperLockTest.class);
    private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();
    private static final ThreadLocal<ZooKeeper> ZK_THREAD_LOCAL = new ThreadLocal<>();
    /** 锁的根路径 **/
    private static final String LOCK_ROOT_PATH = "/locks";
    /** 锁后缀 **/
    private static final String LOCK_SUFFIX = "_NO_";
    /** 创建根节点同步锁 **/
    private static final String CREATE_ROOT_LOCK = "LOCK";
    /** 公平锁 **/
    private static final boolean LOCK_FAIR = false;
    private final static byte[] BUF = new byte[0];
    static int LOCK_EXPIRES = 10;
    static int LOCK_WAITTIME = 10 ;


    /**
     *  获得所有
     *  @param key   路径不含‘/’
     *  @return boolean
     */
    public static boolean tryLock(String key) {
        return tryLock(key, LOCK_EXPIRES, LOCK_WAITTIME);
    }

    /**
     *  获得锁
     *  @param key   键/路径
     *  @param expire    过期时间
     *  @param wait  等待时间
     *  @return boolean
     */
    public static boolean tryLock(String key, long expire, long wait) {
        ZooKeeper zooKeeper = getZooKeeper();
        ZK_THREAD_LOCAL.set(zooKeeper);
        return tryLock(zooKeeper, key, expire, wait);
    }

    /**
     *  获得锁
     *  @param zooKeeper zk连接
     *  @param key   路径
     *  @param expire    过期时间
     *  @param wait  等待时间
     *  @return boolean
     */
    private static boolean tryLock(ZooKeeper zooKeeper, String key, long expire, long wait) {
        expire = expire * 1000;
        wait = wait * 1000;
        final String currNode;
        String path = LOCK_ROOT_PATH + "/" + key + LOCK_SUFFIX;
        try {
            currNode = zooKeeper.create(path, BUF, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            //步骤一
            List<String> nodes = zooKeeper.getChildren(LOCK_ROOT_PATH, false);
            //过滤掉集合中不是当前业务的临时节点
            nodes = nodes.stream().filter(o -> o.startsWith(key)).collect(Collectors.toList());
            nodes.sort(String::compareTo);
            //如果集合为空说明当前创建节点的session在步骤一处已经断开,并且创建的节点已经被zk服务器删除, 此种情况比较极端
            if (nodes.size() == 0) {
                return false;
            }
            //最小的节点就是自己创建的节点表示拿到锁
            if (currNode.endsWith(nodes.get(0))) {
                runExpireThread(zooKeeper, currNode, expire);
                return true;
            }
            //没有拿到锁
            CountDownLatch countDownLatch = new CountDownLatch(1);
            //非公平锁
            if(LOCK_FAIR){
                for (int i = 0; i < nodes.size(); i++) {
                    String node = nodes.get(i);
                    if (currNode.endsWith(node)) {
                        runExpireThread(zooKeeper, currNode, expire);
                        return true;
                    }
                    Stat stat = zooKeeper.exists(LOCK_ROOT_PATH + "/" + node, new LockWatcher(countDownLatch));
                    if (stat != null) {
                        delPath(zooKeeper);
                        //等待锁超时
                        if(!countDownLatch.await(wait, TimeUnit.MILLISECONDS)){
                            return tryLock(zooKeeper, key, expire, wait);
                        }
                    }
                }
            }else{
                for (int i = 0; i < nodes.size(); i++) {
                    String node = nodes.get(i);
                    if (currNode.endsWith(node)) {
                        runExpireThread(zooKeeper, currNode, expire);
                        return true;
                    }
                    //当前节点的前一个节点
                    if (currNode.endsWith(nodes.get(i + 1))) {
                        Stat stat = zooKeeper.exists(LOCK_ROOT_PATH + "/" + node, new LockWatcher(countDownLatch));
                        if (stat != null) {
                            // 等待锁超时,如果是公平锁,等待时间是默认等待时间的2倍,防止因为拿锁的线程处理业务时间太久
                            // 导致当前线程等待超时
                            if(!countDownLatch.await(wait * 2, TimeUnit.MILLISECONDS)){
                                delPath(zooKeeper);
                                return false;
                            }
                            return true;
                        }
                    }
                }
            }
        } catch (KeeperException | InterruptedException e) {
            LOG.error("create '{}' node fail.", key, e);
        }
        return false;
    }

    /**
     *  释放锁
     */
    public static void unLock() {
        ZooKeeper zooKeeper = ZK_THREAD_LOCAL.get();
        delPath(zooKeeper);
        close(ZK_THREAD_LOCAL.get());
        THREAD_LOCAL.remove();
        ZK_THREAD_LOCAL.remove();
    }

    /**
     *  创建分布式锁的根路径
     */
    private static void createLockRootPath() {
        ZooKeeper zooKeeper = getZooKeeper();
        try {
            Stat stat = zooKeeper.exists(LOCK_ROOT_PATH, false);
            if (stat == null) {
                synchronized (CREATE_ROOT_LOCK) {
                    stat = zooKeeper.exists(LOCK_ROOT_PATH, false);
                    if (stat == null) {
                        LOG.info("create lock root path '{}'", LOCK_ROOT_PATH);
                        zooKeeper.create(LOCK_ROOT_PATH, BUF, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                        String path = LOCK_ROOT_PATH + "/key"  + LOCK_SUFFIX;
                        Stat stats = zooKeeper.exists(path, false);
                        zooKeeper.create(path, BUF, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                    }
                }
            }
        } catch (KeeperException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     *  获得zooke会话连接
     *  @return org.apache.zookeeper.ZooKeeper
     */
    private static ZooKeeper getZooKeeper() {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            final ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 30000, null);
            zooKeeper.register((watchedEvent) -> {
                switch (watchedEvent.getState()) {
                    case Expired:
                        close(zooKeeper);
                        break;
                    case SyncConnected:
                        countDownLatch.countDown();
                    default:
                }
            });
            if(!countDownLatch.await(3000, TimeUnit.MILLISECONDS)){
                close(zooKeeper);
                throw new RuntimeException("wait for creating zookeeper connection timeout, timeout is [3000]");
            }
            return zooKeeper;
        } catch (Exception e) {
            throw new RuntimeException("create Zookeeper instance fail.", e);
        }
    }

    /**
     *  启动一个线程来判断锁的过期时间,方式业务假死,zk不断开导致死锁
     *  @param zooKeeper zk连接
     *  @param currNode  当前节点
     */
    private static void runExpireThread(final ZooKeeper zooKeeper, String currNode, long expire){
        THREAD_LOCAL.set(currNode);
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(() -> {
            try {
                Thread.sleep(expire * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            LOG.info("等待了{}秒, 主动结束.", expire);
            delPath(zooKeeper);
        });
    }

    /**
     *  删除创建的路径
     *  @param zooKeeper zk连接
     */
    private static void delPath(ZooKeeper zooKeeper) {
        try {
            //无论节点是否存在,直接执行删除操作
            zooKeeper.delete(THREAD_LOCAL.get(), -1);
        } catch (Exception e){
            LOG.error("lock expire, delete lock");
        }
    }

    /**
     *  断开连接
     *  @param zooKeeper zk连接
     */
    private static void close(ZooKeeper zooKeeper) {
        if (zooKeeper != null) {
            try {
                zooKeeper.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     *  监听节点删除事件
     */
    static class LockWatcher implements Watcher {
        private CountDownLatch latch;

        public LockWatcher(CountDownLatch latch) {
            this.latch = latch;
        }

        @Override
        public void process(WatchedEvent event) {
            if(event.getType() == Event.EventType.NodeDeleted){
                latch.countDown();
            }
        }
    }

    public static void main(String[] args) throws  Exception {
        //createLockRootPath();
        int count = 30;
        ExecutorService executorService = Executors.newFixedThreadPool(count);
        for( int i = 0; i < count; i++) {
            executorService.execute(() -> {
                try {
                    if(ZookeeperLockTest.tryLock("lock")){
                        System.out.println("我拿到了锁");
                        System.out.println(Thread.currentThread().getName() + ": 我要等待6秒,测试其他超时是否有效.");
                        Thread.sleep(1000);
                        System.out.println("我的等待结束了,其他人可以过来拿锁了.");
                    }else{
                        System.out.println("没有拿到锁");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }finally{
                    ZookeeperLockTest.unLock();
                }
            });
        }
        executorService.shutdown();
    }
}
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容