zk与redis实现分布式锁

分布式锁

在同一个jvm中,jdk已经提供了lock、synchronized等同步机制,但是在分布式环境下,分布在不同机器上的多个进程可能对一些资源产生竞争关系,无法再使用jdk提供的同步机制,分布式锁就是用来解决这种场景下的同步问题。

利用zk实现分布式锁思路

  1. 建立一个名为lock的持久节点(Persistent)
  2. 当进程需要访问共享资源时,会先在lock节点下创建临时顺序节点,然后对lock节点下所有的子节点进行按序号排序,如果该进程创建的临时节点是所有子节点序号最小的,该进程获得锁进入临界区,执行任务后删除对应的临时顺序节点
  3. 如果序号不是最小的,就获得该节点序号的上一个序号对应节点,并给该节点是否存在注册监听事件,等待监听到其上个节点被删除后,重新去获取锁,从而进入临界区执行任务,执行后同样删除所创建的临时节点,这里只去监听比自己节点序号小1的节点,不用去监听所有的节点。

代码实现

详细代码请点击zk实现分布式锁

public class DistributedLock implements Watcher {
    private int threadId;
    private ZooKeeper zk = null;
    private String selfPath;
    private String waitPath;
    private String LOG_PREFIX_OF_THREAD;
    private static final int SESSION_TIMEOUT = 10000;
    private static final String GROUP_PATH = "/locks";
    private static final String SUB_PATH = "/locks/sub";
    private static final String CONNECTION_STRING = "ubuntu:2181";

    private static final int THREAD_NUM = 10;
    // 确保连接zk成功;
    private CountDownLatch connectedSemaphore = new CountDownLatch(1);
    // 确保所有线程运行结束;
    private static final CountDownLatch threadSemaphore = new CountDownLatch(
            THREAD_NUM);

    public DistributedLock(int id) {
        this.threadId = id;
        LOG_PREFIX_OF_THREAD = "【第" + threadId + "个线程】";
    }

    public static void main(String[] args) {
                // 用多线程模拟分布式环境
        for (int i = 0; i < THREAD_NUM; i++) {
            final int threadId = i + 1;
            new Thread() {
                @Override
                public void run() {
                    try {
                        DistributedLock dc = new DistributedLock(threadId);
                        dc.createConnection(CONNECTION_STRING, SESSION_TIMEOUT);
                        // GROUP_PATH不存在的话,由一个线程创建即可;
                        synchronized (threadSemaphore) {
                            dc.createPath(GROUP_PATH, "该节点由线程" + threadId
                                    + "创建", true);
                        }
                        dc.getLock();
                    } catch (Exception e) {
                        System.out.println("【第" + threadId + "个线程】 抛出的异常:");
                        e.printStackTrace();
                    }
                }
            }.start();
        }
        try {
            threadSemaphore.await();
            System.out.println("所有线程运行结束!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
/**
     * 获取锁
     *
     * @return
     */
    private void getLock() throws KeeperException, InterruptedException {
                // 去创建临时节点
        selfPath = zk.create(SUB_PATH, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println(LOG_PREFIX_OF_THREAD + "创建锁路径:" + selfPath);
        if (checkMinPath()) {
            getLockSuccess();
        }
    }

    /**
     * 创建节点
     *
     * @param path 节点path
     * @param data 初始数据内容
     * @return
     */
    public boolean createPath(String path, String data, boolean needWatch)
            throws KeeperException, InterruptedException {
        if (zk.exists(path, needWatch) == null) {
            System.out.println(LOG_PREFIX_OF_THREAD
                    + "节点创建成功, Path: "
                    + this.zk.create(path, data.getBytes(),
                            ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)
                    + ", content: " + data);
        }
        return true;
    }

    /**
     * 创建ZK连接
     *
     * @param connectString ZK服务器地址列表
     * @param sessionTimeout Session超时时间
     */
    public void createConnection(String connectString, int sessionTimeout)
            throws IOException, InterruptedException {
        zk = new ZooKeeper(connectString, sessionTimeout, this);
        connectedSemaphore.await();
    }

    /**
     * 获取锁成功
     */
    public void getLockSuccess() throws KeeperException, InterruptedException {
        if (zk.exists(this.selfPath, false) == null) {
            System.out.println(LOG_PREFIX_OF_THREAD + "本节点已不在了...");
            return;
        }
        System.out.println(LOG_PREFIX_OF_THREAD + "获取锁成功,赶紧干活!");
        Thread.sleep(2000);
        System.out.println(LOG_PREFIX_OF_THREAD + "删除本节点:" + selfPath);
        zk.delete(this.selfPath, -1);
        releaseConnection();
        threadSemaphore.countDown();
    }

    /**
     * 关闭ZK连接
     */
    public void releaseConnection() {
        if (this.zk != null) {
            try {
                this.zk.close();
            } catch (InterruptedException e) {
            }
        }
        System.out.println(LOG_PREFIX_OF_THREAD + "释放连接");
    }

    /**
     * 检查自己是不是最小的节点
     *
     * @return
     */
    public boolean checkMinPath() throws KeeperException, InterruptedException {
        List<String> subNodes = zk.getChildren(GROUP_PATH, false);
        Collections.sort(subNodes);
        int index = subNodes.indexOf(selfPath.substring(GROUP_PATH.length() + 1));
        switch (index) {
        case -1: {
            System.out.println(LOG_PREFIX_OF_THREAD + "本节点已不在了..." + selfPath);
            return false;
        }
        case 0: {
            System.out.println(LOG_PREFIX_OF_THREAD + "子节点中,我果然是老大...哈哈哈" + selfPath);
            return true;
        }
        default: {
            this.waitPath = GROUP_PATH + "/" + subNodes.get(index - 1);
            System.out.println(LOG_PREFIX_OF_THREAD + "获取子节点中,排在我前面的。。。"
                    + waitPath);
            try {
                zk.getData(waitPath, true, new Stat());
                return false;
            } catch (KeeperException e) {
                if (zk.exists(waitPath, false) == null) {
                    System.out.println(LOG_PREFIX_OF_THREAD + "子节点中,排在我前面的。。。"
                            + waitPath + "已失踪,幸福来得太突然?");
                    return checkMinPath();
                } else {
                    throw e;
                }
            }
        }

        }

    }

    @Override
    public void process(WatchedEvent event) {
               // 监听器处理事件 
        if (event == null) {
            return;
        }
        Event.KeeperState keeperState = event.getState();
        Event.EventType eventType = event.getType();
        if (Event.KeeperState.SyncConnected == keeperState) {
            if (Event.EventType.None == eventType) {
                System.out.println(LOG_PREFIX_OF_THREAD + "成功连接上ZK服务器");
                connectedSemaphore.countDown();
            } else if (event.getType() == Event.EventType.NodeDeleted
                    && event.getPath().equals(waitPath)) {
                System.out.println(LOG_PREFIX_OF_THREAD
                        + "收到情报,排我前面的家伙已挂,我是不是可以出山了?");
                try {
                    if (checkMinPath()) {
                        getLockSuccess();
                    }
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } else if (Event.KeeperState.Disconnected == keeperState) {
            System.out.println(LOG_PREFIX_OF_THREAD + "与ZK服务器断开连接");
        } else if (Event.KeeperState.AuthFailed == keeperState) {
            System.out.println(LOG_PREFIX_OF_THREAD + "权限检查失败");
        } else if (Event.KeeperState.Expired == keeperState) {
            System.out.println(LOG_PREFIX_OF_THREAD + "会话失效");
        }
    }
}

可以看到代码还是有点复杂的,通常线上很好会使用zk来实现分布式锁,redis作为一种较更简单方便的方式常常被使用。

基于redis的SetNX实现分布式锁原理:

setNX是Redis提供的一个原子操作,如果指定key存在,那么setNX失败,如果不存在会进行Set操作并返回成功。我们可以利用这个来实现一个分布式的锁,主要思路就是,set成功表示获取锁,set失败表示获取失败,失败后需要重试。

package lock;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import redis.clients.jedis.Jedis;

/**
 * Redis分布式锁
 */
public class RedisLockTest {

    private Jedis jedisCli = new Jedis("192.168.58.99", 6379);

    private int expireTime = 1;

    /**
     * 获取锁
     *
     * @param lockID
     * @return
     */
    public boolean lock(String lockID) {
        while (true) {
            long returnFlag = jedisCli.setnx(lockID, "1");
            if (returnFlag == 1) {
                System.out.println(Thread.currentThread().getName() + " get lock....");
                return true;
            }
            System.out.println(Thread.currentThread().getName() + " is trying lock....");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
                return false;
            }
        }
    }

    /**
     * 超时获取锁
     *
     * @param lockID
     * @param timeOuts
     * @return
     */
    public boolean lock(String lockID, long timeOuts) {
        long current = System.currentTimeMillis();
        long future = current + timeOuts;
        long timeStep = 500;
        CountDownLatch latch = new CountDownLatch(1);
        while (future > current) {
            long returnFlag = jedisCli.setnx(lockID, "1");
            if (returnFlag == 1) {
                System.out.println(Thread.currentThread().getName() + " get lock....");
                jedisCli.expire(lockID, expireTime);
                return true;
            }
            System.out.println(Thread.currentThread().getName() + " is trying lock....");
            try {
                latch.await(timeStep, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            current = current + timeStep;
        }
        return false;
    }

    public void unlock(String lockId) {
        long flag = jedisCli.del(lockId);
        if (flag > 0) {
            System.out.println(Thread.currentThread().getName() + " release lock....");
        } else {
            System.out.println(Thread.currentThread().getName() + " release lock fail....");
        }
    }

    /**
     * 线程工厂,命名线程
     */
    public static class MyThreadFactory implements ThreadFactory {
        public static AtomicInteger count = new AtomicInteger();

        @Override
        public Thread newThread(Runnable r) {
            count.getAndIncrement();
            Thread thread = new Thread(r);
            thread.setName("Thread-lock-test " + count);
            return thread;
        }
    }

    public static void main(String args[]) {
        final String lockID = "lockTest";
        Runnable task = () -> {
            RedisLockTest testCli = new RedisLockTest();
            testCli.lock(lockID);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            testCli.unlock(lockID);
        };

        MyThreadFactory factory = new MyThreadFactory();
        ExecutorService services = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 3; i++)
            services.execute(factory.newThread(task));
    }

}

实现起来比较简单明了,并且对于锁删除失败(分布式锁基本都有这个问题),可以对key设置失效时间,这个超时时间需要能保证获得锁的这个进程已经获取完了竞争资源。相比zk的实现唯一不足的地方是没有通知机制,需要不断的轮询和睡眠去获取锁。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,826评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,968评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,234评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,562评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,611评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,482评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,271评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,166评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,608评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,814评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,926评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,644评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,249评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,866评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,991评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,063评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,871评论 2 354

推荐阅读更多精彩内容