redis分布式锁的实现总结

在Java中,关于锁我想大家都很熟悉。在并发编程中,我们通过锁,来避免由于竞争而造成的数据不一致问题。通常我们以进程锁synchronized 、Lock来实现它,对于分布式程序,就不能用进程锁了,这时候常用的是分布式锁。

什么是分布式锁

分布式锁,是一种思想,它的实现方式有很多。比如,我们将沙滩当做分布式锁的组件,那么它看起来应该是这样的:

加锁

在沙滩上踩一脚,留下自己的脚印,就对应了加锁操作。其他进程或者线程,看到沙滩上已经有脚印,证明锁已被别人持有,则等待。

解锁

把脚印从沙滩上抹去,就是解锁的过程。

锁超时

为了避免死锁,我们可以设置一阵风,在单位时间后刮起,将脚印自动抹去。

分布式锁的实现有很多,比如基于数据库、memcached、Redis、系统文件、zookeeper等。它们的核心的理念跟上面的过程大致相同。基于数据库可以用乐观锁和悲观锁处理分页式锁,乐观锁使用对比记录version号来实现,悲观锁使用类似“select * where * for update”行锁实现。

本文讨论的是基于redis实现分页式锁的问题,别的方面不做详说,有相关需求可以参考和查阅别的资料。

Redis分布式锁原理

加锁

加锁实际上就是在redis中,给Key键设置一个值,为避免死锁,并给定一个过期时间。

SET lock_key random_value NX PX 5000

值得注意的是:
random_value 是客户端生成的唯一的字符串。
NX 代表只在键不存在时,才对键进行设置操作。
PX 5000 设置键的过期时间为5000毫秒。

这样,如果上面的命令执行成功,则证明客户端获取到了锁。

解锁

解锁的过程就是将Key键删除。但也不能乱删,不能说客户端1的请求将客户端2的锁给删除掉。这时候random_value的作用就体现出来。

为了保证解锁操作的原子性,我们用LUA脚本完成这一操作。先判断当前锁的字符串是否与传入的值相等,是的话就删除Key,解锁成功。

if redis.call('get',KEYS[1]) == ARGV[1] then 
   return redis.call('del',KEYS[1]) 
else
   return 0 
end

jedis实现(单节点)


/**
     * 获取分布式锁:一分命令,保证事务的一致性。
     * @param lockKey
     * @param requestId
     * @param expireTime
     * @return
     */
    public static boolean getDistributeLock(String lockKey, String requestId, long expireTime) {
        Jedis jedis = null;
        try {
            jedis = getResource();
            String result = jedis.set(lockKey,requestId,"NX","PX",expireTime);
            if ("OK".equals(result)) {
                return true;
            }
        } catch (Exception e) {
            logger.error("getDistributeLock {}", lockKey, e);
        } finally {
            returnResource(jedis);
        }
        return false;
    }

    /**
     * 释放分布式锁:使用lua脚本,一个命令实现对带有标志的锁的释放
     * @param lockKey
     * @param requestId
     * @return
     */
    public static boolean releaseDistributeLock(String lockKey, String requestId) {
        Jedis jedis = null;
        try {
            jedis = getResource();
            String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
            Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
            Long RELEASE_SUCCESS = 1L;
            if (RELEASE_SUCCESS.equals(result)) {
                return true;
            }
        } catch (Exception e) {
            logger.error("releaseDistributeLock {}", lockKey, e);
        } finally {
            returnResource(jedis);
        }
        return false;
    }
    

注意:这里的requestId,类似客户端口请求id,每次请求都是不同的可以使用uuid,测试和使用可以参考后面的”测试和说明“部分。

缺点:在集群包括主从、哨兵模式、集群模式不可用;锁不具有可重入性。

redisson实现(通用)

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

Redisson底层采用的是Netty 框架。支持Redis 2.8以上版本,支持Java1.6+以上版本。它里面也实现了分布式锁,而且包含多种类型的锁:可重入锁,公平锁等。

具体实现如下:

JedisUtil提供

 //从配置类中获取redisson对象
 private static Redisson redisson = JedisConfig.getRedisson();
    
 //加锁 Redisson:适用单机、主从、哨兵和集群
    //同步方法,等待锁返回执行 所以涉及锁使用的,可以放在线程池中进行
    public static boolean acquire(String lockName){
        //声明key对象
        String key = lockName;
        //获取锁对象
        RLock mylock = redisson.getLock(key);
        //加锁,并且设置锁过期时间,防止死锁的产生
        mylock.lock(2, TimeUnit.MINUTES); // 分钟
        //加锁成功
        return  true;
    }
    //锁的释放 Redisson:适用单机、主从、哨兵和集群
    //同步方法,等待锁返回执行  所以涉及锁使用的,可以放在线程池中进行
    public static void release(String lockName){
        //必须是和加锁时的同一个key
        String key =  lockName;
        //获取所对象
        RLock mylock = redisson.getLock(key);
        //释放锁(解锁)
        mylock.unlock();
    }

JedisConfig提供


private static Config config = new Config();
//声明redisso对象
private static Redisson redisson = null;

    static{

        //可以用"redis://"来启用SSL连接
        if (IS_CLUSTER.equals(CLUSTER_USED)) {//集群
            log.info("Redisson redis lock init cluster config:"+server1+";"+server2+";"+server3+";"+server4+";"+server5+";"+server6);
            config.useClusterServers().addNodeAddress(
                    "redis://".concat(server1),"redis://".concat(server2), "redis://".concat(server3),
                    "redis://".concat(server4),"redis://".concat(server5), "redis://".concat(server6)).setScanInterval(5000);
        } else {//单机
            log.info("Redisson redis lock init single node config:"+server1+";"+server2+";"+server3+";"+server4+";"+server5+";"+server6);
            config.useSingleServer().setAddress("redis://".concat(poolHost).concat(":").concat(poolPort));
        }

        //得到redisson对象
        redisson = (Redisson) Redisson.create(config);
    }
    
    /**
     * Redisson redis分布式锁处理对象
    * @return
    */
    public static Redisson getRedisson() {
        return redisson;
    }
    

测试和说明

测试和使用,可以参考下面的junit测试用例。

@Slf4j
public class JedisUtilTest extends SpringTxTestCase {
    private static Logger logger = LoggerFactory.getLogger(JedisUtils.class);

    /**
     * 单机版本:加解锁功能
     */
    @Test
    public void testSingleRedisLockAndUnlock(){
        JedisUtils.getDistributeLock("lockKey","requestId",JedisConfig.JEDIS_EXPIRE);
        {
            for (int i = 0; i < 5; i++) {
                boolean result = JedisUtils.getDistributeLock("lockKey","requestId",JedisConfig.JEDIS_EXPIRE);
                System.out.println(Thread.currentThread().getName()+":lock result:"+result);
                JedisUtils.releaseDistributeLock("lockKey","requestId");
                boolean result1 = JedisUtils.getDistributeLock("lockKey","requestId",JedisConfig.JEDIS_EXPIRE);
                System.out.println(Thread.currentThread().getName()+":unlock result1:"+result1);
            }
        }
    }
    /**
     * 单机版本:锁测试
     */
    @Test
    public void testSingleRedisLock(){
        {
            final CyclicBarrier cbRef = new CyclicBarrier(10);
            final ReentrantLock reentrantLock=new ReentrantLock();
            for(int i=0;i<10;i++){
                Thread t=  new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            System.out.println(Thread.currentThread().getName() + "准备");
                            cbRef.await();//10个线程等待在这里 才开始执行下面的
                            //reentrantLock.lock();
                            //tryGetDistributedLock("hello","hello",10000);
                            boolean result = JedisUtils.getDistributeLock("lockKey","requestId",JedisConfig.JEDIS_EXPIRE);
                            System.out.println(Thread.currentThread().getName()+"===lock result:"+result);
                            JedisUtils.releaseDistributeLock("lockKey",UUID.randomUUID().toString());
                            boolean result1 = JedisUtils.getDistributeLock("lockKey","requestId",JedisConfig.JEDIS_EXPIRE);
                            System.out.println(Thread.currentThread().getName()+"===lock result1:"+result);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }finally {
                            //reentrantLock.unlock();
                        }
                    }
                });
                t.start();
            }
            //这一段可以不要
            try {
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName() + "起跑");
                System.out.println( cbRef.getParties()+"--" +cbRef.getNumberWaiting());
            } catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    /**
     * 单机版本redis:测试分布式锁的使用方法
     */
    @Test
    public void testUseOfSingleRedisLock() throws InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        String data2Deal = "data to deal";
        final CyclicBarrier cbRef = new CyclicBarrier(10);
        for(int i=0;i<10;i++){
            Thread t=  new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + "准备");
                    try {
                        cbRef.await();//10个线程等待在这里 才开始执行下面的+
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                    final ReentrantLock reentrantLock=new ReentrantLock();
                    reentrantLock.lock();
                    try {
                        String data2Deal = "data to deal:" + Thread.currentThread().getName();
                        useOfSingleRedisLock(data2Deal);
                    } catch (Exception e){
                        e.printStackTrace();
                    } finally {
                        reentrantLock.unlock();
                    }
                    countDownLatch.countDown();
                }
            });
            t.start();
        }
        countDownLatch.await();
        System.out.println("所有线程都执行完了……");
    }

    /**
     * 分布式锁的使用方法:单机redis cluster包括(集群和哨兵)不适用。
     * @param data2Deal
     */
    public void useOfSingleRedisLock(String data2Deal){
        String requestId = UUID.randomUUID().toString();
        if(JedisPoolUtils.getDistributeLock("lock_key", requestId, 1000*60*5)){
            try {
                methonNeedDisLock(data2Deal);
            } catch (Exception e) {
                logger.error("分布式锁业务处理失败!",e);
                e.printStackTrace();
            } finally {
                JedisPoolUtils.releaseDistributeLock("lock_key",requestId);
            }
        } else {
            try {
                Thread.sleep(1000);
                useOfSingleRedisLock(data2Deal);
            } catch (InterruptedException e) {
                logger.error(e.getMessage());
            }
        }
    }

    /*
    *  需要分布式锁的业务代码
    */
    public void methonNeedDisLock(String data2Deal){
        System.out.println("分布式锁业务处理方法:"+data2Deal);
    }
    /**
     * 测试分布式锁(Redisson)的使用方法:redis单机和哨兵、集群都适用
     * 测试说明:开启1000个线程,对count进行累加
     */

    int count = 0;
    @Test
    public void testRedisLock() throws InterruptedException {
        int clientcount =1000;
        final CountDownLatch countDownLatch = new CountDownLatch(clientcount);
        ExecutorService executorService = Executors.newFixedThreadPool(clientcount);
        long start = System.currentTimeMillis();
        for (int i = 0;i<clientcount;i++){
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    //通过Snowflake算法获取唯一的ID字符串
                    String id = UUID.randomUUID().toString();
                    System.out.println("request id:"+id);
                    try {
                        JedisPoolUtils.getDistributeLock("lock_key", id, 1000*60*5);
                        count++;
                    }finally {
                        JedisPoolUtils.releaseDistributeLock("lock_key",id);
                    }
                    countDownLatch.countDown();
                }
            });
        }
        //控制程序结束
        countDownLatch.await();
        long end = System.currentTimeMillis();
        System.out.println("执行线程数:{"+clientcount+"},总耗时:{"+(end-start)+"},count数为:{"+count+"}");
        //logger.info("执行线程数:{},总耗时:{},count数为:{}",clientcount,end-start,count);
    }

    @Test
    public void testRedissonLock() throws InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        System.out.println("测试程序:开始运行……");
        Thread t1 = new Thread(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                String key = "test123";
                System.out.println(Thread.currentThread().getName()+": t1 start");
                //加锁
                boolean result = JedisUtils.acquire(key);
                System.out.println(Thread.currentThread().getName()+": t1 get lock time :"+new Date().toLocaleString());
                System.out.println(Thread.currentThread().getName()+": t1 get lock:"+result);
                Thread.sleep(50000);
                System.out.println(Thread.currentThread().getName()+":t1 sleep 50000.");
                System.out.println(Thread.currentThread().getName()+":t1 修改了数据库");
                //释放锁
                JedisUtils.release(key);
                System.out.println(Thread.currentThread().getName()+" t1 lock release time:"+new Date().toLocaleString());
                countDownLatch.countDown();
            }
        });
        t1.start();
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                String key = "test123";
                System.out.println(Thread.currentThread().getName()+": t2 start");
                //加锁
                boolean result = JedisUtils.acquire(key);
                System.out.println(Thread.currentThread().getName()+": t2 get lock time :"+new Date().toLocaleString());
                System.out.println(Thread.currentThread().getName()+": t2 get lock:"+result);
                System.out.println(Thread.currentThread().getName()+":t2 修改了数据库");
                //释放锁
                JedisUtils.release(key);
                System.out.println(Thread.currentThread().getName()+" t2 lock release time:"+new Date().toLocaleString());
                countDownLatch.countDown();
            }
        });
        t2.start();
        countDownLatch.await();
        System.out.println("测试程序:完成运行。");
    }

    @Test
    public void testRedissonLockSyn() throws InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        System.out.println("测试程序:开始运行……");
        Thread t1 = new Thread(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                String key = "test123";
                System.out.println(Thread.currentThread().getName()+": t1 start");
                System.out.println(Thread.currentThread().getName()+": t1 sleep 50000.");
                Thread.sleep(50000);
                System.out.println(Thread.currentThread().getName()+": t1 ready for lock.");
                //加锁
                boolean result = JedisUtils.acquire(key);
                System.out.println(Thread.currentThread().getName()+": t1 get lock time :"+new Date().toLocaleString());
                System.out.println(Thread.currentThread().getName()+": t1 get lock:"+result);
                System.out.println(Thread.currentThread().getName()+":t1 修改了数据库");
                //释放锁
                JedisUtils.release(key);
                System.out.println(Thread.currentThread().getName()+" t1 lock release time:"+new Date().toLocaleString());
                countDownLatch.countDown();
            }
        });
        t1.start();
        Thread t2 = new Thread(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                String key = "test123";
                System.out.println(Thread.currentThread().getName()+": t2 start");
                boolean result = JedisUtils.acquire(key);
                System.out.println(Thread.currentThread().getName()+": t2 get lock time :"+new Date().toLocaleString());
                System.out.println(Thread.currentThread().getName()+": t2 get lock:"+result);
                System.out.println(Thread.currentThread().getName()+": t2 sleep 500000.");
                Thread.sleep(500000);
                //释放锁
                JedisUtils.release(key);
                System.out.println(Thread.currentThread().getName()+" t2 lock release time:"+new Date().toLocaleString());
                countDownLatch.countDown();
            }
        });
        t2.start();
        countDownLatch.await();
        System.out.println("测试程序:完成运行。");
    }

    @Test
    public void testUseOfRedissonLock() throws InterruptedException {
        int clientcount = 1000;
        final CountDownLatch countDownLatch = new CountDownLatch(clientcount);
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        long start = System.currentTimeMillis();
        for (int i = 0;i<clientcount;i++){
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    JedisUtils.acquire("lockName");
                    count++;
                    JedisUtils.release("lockName");
                    countDownLatch.countDown();
                }
            });
        }
        //控制程序结束
        countDownLatch.await();
        long end = System.currentTimeMillis();
        System.out.println("执行线程任务总数:{"+clientcount+"},总耗时:{"+(end-start)+"},count数为:{"+count+"}");
    }

}

参考

https://blog.csdn.net/u014353343/article/details/88921212

https://www.jianshu.com/p/828aa3b44564

https://www.jianshu.com/p/47fd7f86c848

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343