Spring-data-redis + redis 分布式锁(一)

分布式锁的解决方式

  1. 基于数据库表做乐观锁,用于分布式锁。(适用于小并发)
  2. 使用memcached的add()方法,用于分布式锁。
  3. 使用memcached的cas()方法,用于分布式锁。(不常用)
  4. 使用redis的setnx()、expire()方法,用于分布式锁。
  5. 使用redis的setnx()、get()、getset()方法,用于分布式锁。
  6. 使用redis的watch、multi、exec命令,用于分布式锁。(不常用)
  7. 使用zookeeper,用于分布式锁。(不常用)

这里主要介绍第四种和第五种:

使用redis的setnx()、expire()方法,用于分布式锁

原理

对于使用redis的setnx()、expire()来实现分布式锁,这个方案相对于memcached()的add()方案,redis占优势的是,其支持的数据类型更多,而memcached只支持String一种数据类型。除此之外,无论是从性能上来说,还是操作方便性来说,其实都没有太多的差异,完全看你的选择,比如公司中用哪个比较多,你就可以用哪个。

首先说明一下setnx()命令,setnx的含义就是SET if Not Exists,其主要有两个参数 setnx(key, value)。该方法是原子的,如果key不存在,则设置当前key成功,返回1;如果当前key已经存在,则设置当前key失败,返回0。但是要注意的是setnx命令不能设置key的超时时间,只能通过expire()来对key设置。

具体的使用步骤如下:

  1. setnx(lockkey, 1) 如果返回0,则说明占位失败;如果返回1,则说明占位成功
  2. expire()命令对lockkey设置超时时间,为的是避免死锁问题。
  3. 执行完业务代码后,可以通过delete命令删除key。

为了保证在某个Redis节点不可用的时候算法能够继续运行,这个获取锁的操作还有一个超时时间(timeOut),它要远小于锁的有效时间(几十毫秒量级)。

可能存在的问题

这个方案其实是可以解决日常工作中的需求的,但从技术方案的探讨上来说,可能还有一些可以完善的地方。比如,如果在第一步setnx执行成功后,在expire()命令执行成功前,发生了宕机的现象,那么就依然会出现死锁的问题,所以如果要对其进行完善的话,可以使用redis的setnx()、get()和getset()方法来实现分布式锁。

具体实现

锁具体实现RedisLock:

package com.xiaolyuh.lock;

import java.util.Random;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.StringRedisTemplate;

public class RedisLock {
    private static Logger logger = LoggerFactory.getLogger(RedisLock.class);

    //////////////////// 静态常量定义开始///////////////////////
    /**
     * 存储到redis中的锁标志
     */
    private static final String LOCKED = "LOCKED";

    /**
     * 默认请求锁的超时时间(ms 毫秒)
     */
    private static final long TIME_OUT = 100;

    /**
     * 默认锁的有效时间(s)
     */
    public static final int EXPIRE = 60;
    //////////////////// 静态常量定义结束///////////////////////

    /**
     * 锁标志对应的key
     */
    private String key;

    /**
     * 锁的有效时间(s)
     */
    private int expireTime = EXPIRE;

    /**
     * 请求锁的超时时间(ms)
     */
    private long timeOut = TIME_OUT;

    /**
     * 锁flag
     */
    private volatile boolean isLocked = false;
    /**
     * Redis管理模板
     */
    private StringRedisTemplate redisTemplate;

    /**
     * 构造方法
     *
     * @param redisTemplate Redis管理模板
     * @param key           锁定key
     * @param expireTime    锁过期时间 (秒)
     * @param timeOut       请求锁超时时间 (毫秒)
     */
    public RedisLock(StringRedisTemplate redisTemplate, String key, int expireTime, long timeOut) {
        this.key = key;
        this.expireTime = expireTime;
        this.timeOut = timeOut;
        this.redisTemplate = redisTemplate;
    }

    /**
     * 构造方法
     *
     * @param redisTemplate Redis管理模板
     * @param key           锁定key
     * @param expireTime    锁过期时间
     */
    public RedisLock(StringRedisTemplate redisTemplate, String key, int expireTime) {
        this.key = key;
        this.expireTime = expireTime;
        this.redisTemplate = redisTemplate;
    }

    /**
     * 构造方法(默认请求锁超时时间30秒,锁过期时间60秒)
     *
     * @param redisTemplate Redis管理模板
     * @param key           锁定key
     */
    public RedisLock(StringRedisTemplate redisTemplate, String key) {
        this.key = key;
        this.redisTemplate = redisTemplate;
    }

    public boolean lock() {
        // 系统当前时间,纳秒
        long nowTime = System.nanoTime();
        // 请求锁超时时间,纳秒
        long timeout = timeOut * 1000000;
        final Random random = new Random();

        // 不断循环向Master节点请求锁,当请求时间(System.nanoTime() - nano)超过设定的超时时间则放弃请求锁
        // 这个可以防止一个客户端在某个宕掉的master节点上阻塞过长时间
        // 如果一个master节点不可用了,应该尽快尝试下一个master节点
        while ((System.nanoTime() - nowTime) < timeout) {
            // 将锁作为key存储到redis缓存中,存储成功则获得锁
            if (redisTemplate.opsForValue().setIfAbsent(key, LOCKED)) {
                isLocked = true;
                // 设置锁的有效期,也是锁的自动释放时间,也是一个客户端在其他客户端能抢占锁之前可以执行任务的时间
                // 可以防止因异常情况无法释放锁而造成死锁情况的发生
                redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);

                // 上锁成功结束请求
                break;
            }
            // 获取锁失败时,应该在随机延时后进行重试,避免不同客户端同时重试导致谁都无法拿到锁的情况出现
            // 睡眠10毫秒后继续请求锁
            try {
                Thread.sleep(10, random.nextInt(50000));
            } catch (InterruptedException e) {
                logger.error("获取分布式锁休眠被中断:", e);
            }
        }
        return isLocked;

    }

    public boolean isLock() {
        redisTemplate.getConnectionFactory().getConnection().time();
        return redisTemplate.hasKey(key);
    }

    public void unlock() {
        // 释放锁
        // 不管请求锁是否成功,只要已经上锁,客户端都会进行释放锁的操作
        if (isLocked) {
            redisTemplate.delete(key);
        }
    }

}

调用锁:

public void redisLock(int i) {
        RedisLock redisLock = new RedisLock(redisTemplate, "redisLockKey:"+i % 10, 5*60 , 500);
        try {
            long now = System.currentTimeMillis();
            if (redisLock.lock()) {
                logger.info("=" + (System.currentTimeMillis() - now));
                // TODO 获取到锁要执行的代码块
                logger.info("j:" + j ++);
            } else {
                logger.info("k:" + k ++);
            }
        } catch (Exception e) {
            logger.info(e.getMessage(), e);
        } finally {
            // 一定要释放锁
            redisLock.unlock();
        }
    }

使用redis的setnx()、get()、getset()方法,用于分布式锁

原理

这个方案的背景主要是在setnx()和expire()的方案上针对可能存在的死锁问题,做了一版优化。

那么先说明一下这三个命令,对于setnx()和get()这两个命令,相信不用再多说什么。那么getset()命令?这个命令主要有两个参数 getset(key,newValue)。该方法是原子的,对key设置newValue这个值,并且返回key原来的旧值。假设key原来是不存在的,那么多次执行这个命令,会出现下边的效果:

  1. getset(key, "value1") 返回nil 此时key的值会被设置为value1
  2. getset(key, "value2") 返回value1 此时key的值会被设置为value2
  3. 依次类推!

介绍完要使用的命令后,具体的使用步骤如下:

  1. setnx(lockkey, 当前时间+过期超时时间) ,如果返回1,则获取锁成功;如果返回0则没有获取到锁,转向2。

  2. get(lockkey)获取值oldExpireTime ,并将这个value值与当前的系统时间进行比较,如果小于当前系统时间,则认为这个锁已经超时,可以允许别的请求重新获取,转向3。

  3. 计算newExpireTime=当前时间+过期超时时间,然后getset(lockkey, newExpireTime) 会返回当前lockkey的值currentExpireTime。

  4. 判断currentExpireTime与oldExpireTime 是否相等,如果相等,说明当前getset设置成功,获取到了锁。如果不相等,说明这个锁又被别的请求获取走了,那么当前请求可以直接返回失败,或者继续重试。

  5. 在获取到锁之后,当前线程可以开始自己的业务处理,当处理完毕后,比较自己的处理时间和对于锁设置的超时时间,如果小于锁设置的超时时间,则直接执行delete释放锁;如果大于锁设置的超时时间,则不需要再锁进行处理。

可能存在的问题

问题1: 在“get(lockkey)获取值oldExpireTime ”这个操作与“getset(lockkey, newExpireTime) ”这个操作之间,如果有N个线程在get操作获取到相同的oldExpireTime后,然后都去getset,会不会返回的newExpireTime都是一样的,都会是成功,进而都获取到锁???

我认为这套方案是不存在这个问题的。依据有两条: 第一,redis是单进程单线程模式,串行执行命令。 第二,在串行执行的前提条件下,getset之后会比较返回的currentExpireTime与oldExpireTime 是否相等。

问题2: 在“get(lockkey)获取值oldExpireTime ”这个操作与“getset(lockkey, newExpireTime) ”这个操作之间,如果有N个线程在get操作获取到相同的oldExpireTime后,然后都去getset,假设第1个线程获取锁成功,其他锁获取失败,但是获取锁失败的线程它发起的getset命令确实执行了,这样会不会造成第一个获取锁的线程设置的锁超时时间一直在延长???

我认为这套方案确实存在这个问题的可能。但我个人认为这个微笑的误差是可以忽略的,不过技术方案上存在缺陷,大家可以自行抉择哈。

问题3: 这个方案必须要保证分布式服务器的时间一定要同步,否则这个锁就会出问题。

具体实现

锁具体实现RedisLock:

package com.xiaolyuh.lock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * Redis分布式锁(这种方式服务器时间一定要同步,否则会出问题)
 * 
 * @author yuhao.wangwang
 * @version 1.0
 * @date 2017年11月3日 上午10:21:27
 */
public class RedisLock2 {

    /**
     * 默认请求锁的超时时间(ms 毫秒)
     */
    private static final long TIME_OUT = 100;

    /**
     * 默认锁的有效时间(s)
     */
    public static final int EXPIRE = 60;

    private static Logger logger = LoggerFactory.getLogger(RedisLock2.class);

    private StringRedisTemplate redisTemplate;

    /**
     * 锁标志对应的key
     */
    private String lockKey;
    /**
     * 锁的有效时间(s)
     */
    private int expireTime = EXPIRE;

    /**
     * 请求锁的超时时间(ms)
     */
    private long timeOut = TIME_OUT;

    /**
     * 锁的有效时间
     */
    private long expires = 0;

    /**
     * 锁标记
     */
    private volatile boolean locked = false;

    final Random random = new Random();

    /**
     * 使用默认的锁过期时间和请求锁的超时时间
     *
     * @param redisTemplate
     * @param lockKey       锁的key(Redis的Key)
     */
    public RedisLock2(StringRedisTemplate redisTemplate, String lockKey) {
        this.redisTemplate = redisTemplate;
        this.lockKey = lockKey + "_lock";
    }

    /**
     * 使用默认的请求锁的超时时间,指定锁的过期时间
     *
     * @param redisTemplate
     * @param lockKey       锁的key(Redis的Key)
     * @param expireTime    锁的过期时间(单位:秒)
     */
    public RedisLock2(StringRedisTemplate redisTemplate, String lockKey, int expireTime) {
        this(redisTemplate, lockKey);
        this.expireTime = expireTime;
    }

    /**
     * 使用默认的锁的过期时间,指定请求锁的超时时间
     *
     * @param redisTemplate
     * @param lockKey       锁的key(Redis的Key)
     * @param timeOut       请求锁的超时时间(单位:毫秒)
     */
    public RedisLock2(StringRedisTemplate redisTemplate, String lockKey, long timeOut) {
        this(redisTemplate, lockKey);
        this.timeOut = timeOut;
    }

    /**
     * 锁的过期时间和请求锁的超时时间都是用指定的值
     *
     * @param redisTemplate
     * @param lockKey       锁的key(Redis的Key)
     * @param expireTime    锁的过期时间(单位:秒)
     * @param timeOut       请求锁的超时时间(单位:毫秒)
     */
    public RedisLock2(StringRedisTemplate redisTemplate, String lockKey, int expireTime, long timeOut) {
        this(redisTemplate, lockKey, expireTime);
        this.timeOut = timeOut;
    }

    /**
     * @return 获取锁的key
     */
    public String getLockKey() {
        return lockKey;
    }

    /**
     * 获得 lock.
     * 实现思路: 主要是使用了redis 的setnx命令,缓存了锁.
     * reids缓存的key是锁的key,所有的共享, value是锁的到期时间(注意:这里把过期时间放在value了,没有时间上设置其超时时间)
     * 执行过程:
     * 1.通过setnx尝试设置某个key的值,成功(当前没有这个锁)则返回,成功获得锁
     * 2.锁已经存在则获取锁的到期时间,和当前时间比较,超时的话,则设置新的值
     *
     * @return true if lock is acquired, false acquire timeouted
     * @throws InterruptedException in case of thread interruption
     */
    public boolean lock() {
        // 请求锁超时时间,纳秒
        long timeout = timeOut * 1000000;
        // 系统当前时间,纳秒
        long nowTime = System.nanoTime();

        while ((System.nanoTime() - nowTime) < timeout) {
            // 分布式服务器有时差,这里给1秒的误差值
            expires = System.currentTimeMillis() + expireTime + 1;
            String expiresStr = String.valueOf(expires); //锁到期时间

            if (redisTemplate.opsForValue().setIfAbsent(lockKey, expiresStr)) {
                locked = true;
                // 设置锁的有效期,也是锁的自动释放时间,也是一个客户端在其他客户端能抢占锁之前可以执行任务的时间
                // 可以防止因异常情况无法释放锁而造成死锁情况的发生
                redisTemplate.expire(lockKey, expireTime, TimeUnit.SECONDS);

                // 上锁成功结束请求
                return true;
            }

            String currentValueStr = redisTemplate.opsForValue().get(lockKey); //redis里的时间
            if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
                //判断是否为空,不为空的情况下,如果被其他线程设置了值,则第二个条件判断是过不去的
                // lock is expired

                String oldValueStr = redisTemplate.opsForValue().getAndSet(lockKey, expiresStr);
                //获取上一个锁到期时间,并设置现在的锁到期时间,
                //只有一个线程才能获取上一个线上的设置时间,因为jedis.getSet是同步的
                if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
                    //防止误删(覆盖,因为key是相同的)了他人的锁——这里达不到效果,这里值会被覆盖,但是因为什么相差了很少的时间,所以可以接受

                    //[分布式的情况下]:如过这个时候,多个线程恰好都到了这里,但是只有一个线程的设置值和当前值相同,他才有权利获取锁
                    // lock acquired
                    locked = true;
                    return true;
                }
            }

            /*
                延迟10 毫秒,  这里使用随机时间可能会好一点,可以防止饥饿进程的出现,即,当同时到达多个进程,
                只会有一个进程获得锁,其他的都用同样的频率进行尝试,后面有来了一些进行,也以同样的频率申请锁,这将可能导致前面来的锁得不到满足.
                使用随机的等待时间可以一定程度上保证公平性
             */
            try {
                Thread.sleep(10, random.nextInt(50000));
            } catch (InterruptedException e) {
                logger.error("获取分布式锁休眠被中断:", e);
            }

        }
        return locked;
    }


    /**
     * 解锁
     */
    public synchronized void unlock() {
        // 只有加锁成功并且锁还有效才去释放锁
        if (locked && expires > System.currentTimeMillis()) {
            redisTemplate.delete(lockKey);
            locked = false;
        }
    }

}

调用方式:

public void redisLock2(int i) {
    RedisLock2 redisLock2 = new RedisLock2(redisTemplate, "redisLock:" + i % 10, 5 * 60, 500);
    try {
        long now = System.currentTimeMillis();
        if (redisLock2.lock()) {
            logger.info("=" + (System.currentTimeMillis() - now));
            // TODO 获取到锁要执行的代码块
            logger.info("j:" + j++);
        } else {
            logger.info("k:" + k++);
        }
    } catch (Exception e) {
        logger.info(e.getMessage(), e);
    } finally {
        redisLock2.unlock();
    }
}

对于上面两种redis实现分布式锁的方案都有一个问题:

  • 就是你获取锁后执行业务逻辑的代码只能在redis锁的有效时间之内,因为,redis的key到期后会自动清除,这个锁就算释放了。所以这个锁的有效时间一定要结合业务做好评估。
  • 这两种方式解锁的时候是直接删除key,假如C1获取到了锁,这个时候redis挂了,并且数据没有持久化,等redis服务启动起来,C2请求过来获取到了锁。但是C1请求现在执行完了删除了key,这个时候就把C2的锁删掉了。(在下一篇文章中有解决方案)

使用redis的SET resource-name anystring NX EX max-lock-time方式来实现分布式锁

下一篇文章介绍
Spring-data-redis + redis 分布式锁(二)

源码: https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases

spring-boot-student-data-redis-distributed-lock 工程

参考:

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容

  • 目前实现分布式锁的方式主要有数据库、Redis和Zookeeper三种,本文主要阐述利用Redis的相关命令来实现...
    Aldeo阅读 2,072评论 0 6
  • 一、分布式锁的作用: redis写入时不带锁定功能,为防止多个进程同时进行一个操作,出现意想不到的结果,so......
    魔法师_阅读 2,053评论 0 6
  • SETNX命令简介 命令格式 SETNX key value 将 key 的值设为 value,当且仅当 key ...
    tangstream阅读 1,960评论 0 2
  • 方案一:数据库乐观锁 乐观锁通常实现基于数据版本(version)的记录机制实现的,比如有一张红包表(t_bonu...
    传达室马大爷阅读 24,411评论 11 146
  • 哇!今天好不容易第一次完成了一天走一万步的目标。自从听说最好的锻炼就是每天走上一万步后,觉得很容易实现就默默在心里...
    黄云龙阅读 311评论 0 1