Redis实现的分布式锁和分布式限流

来源:https://www.cnblogs.com/huangqingshi/p/10290615.html


随着现在分布式越来越普遍,分布式锁也十分常用,上篇文章解释了使用zookeeper实现分布式锁,本次咱们说一下如何用Redis实现分布式锁和分布限流。

https://www.cnblogs.com/huangqingshi/p/9650837.html

Redis有个事务锁,就是如下的命令,这个命令的含义是将一个value设置到一个key中,如果不存在将会赋值并且设置超时时间为30秒,如何这个key已经存在了,则不进行设置。

SET key value NX PX 30000

这个事务锁很好的解决了两个单独的命令,一个设置set key value nx,即该key不存在的话将对其进行设置,另一个是expire key seconds,设置该key的超时时间。我们可以想一下,如果这两个命令用程序单独使用会存在什么问题:

如果一个set key的命令设置了key,然后程序异常了,expire时间没有设置,那么这个key会一直锁住。

如果一个set key时出现了异常,但是直接执行了expire,过了一会儿之后另一个进行set key,还没怎么执行代码,结果key过期了,别的线程也进入了锁。

还有很多出问题的可能点,这里我们就不讨论了,下面咱们来看看如何实现吧。

本文使用的Spring Boot 2.x + Spring data redis + Swagger +lombok + AOP + lua脚本。在实现的过程中遇到了很多问题,都一一解决实现了。

依赖的POM文件如下:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

   <modelVersion>4.0.0</modelVersion>

   <parent>

       <groupId>org.springframework.boot</groupId>

       <artifactId>spring-boot-starter-parent</artifactId>

       <version>2.1.2.RELEASE</version>

       <relativePath/> <!-- lookup parent from repository -->

   </parent>

   <groupId>com.hqs</groupId>

   <artifactId>distributedlock</artifactId>

   <version>0.0.1-SNAPSHOT</version>

   <name>distributedlock</name>

   <description>Demo project for Spring Boot</description>

   <properties>

       <java.version>1.8</java.version>

   </properties>

   <dependencies>

       <dependency>

           <groupId>org.springframework.boot</groupId>

           <artifactId>spring-boot-starter-aop</artifactId>

       </dependency>

       <dependency>

           <groupId>org.springframework.boot</groupId>

           <artifactId>spring-boot-starter-web</artifactId>

       </dependency>

       <dependency>

           <groupId>org.springframework.boot</groupId>

           <artifactId>spring-boot-starter-data-redis</artifactId>

       </dependency>

       <dependency>

           <groupId>org.springframework.boot</groupId>

           <artifactId>spring-boot-devtools</artifactId>

           <scope>runtime</scope>

       </dependency>

       <dependency>

           <groupId>org.projectlombok</groupId>

           <artifactId>lombok</artifactId>

           <optional>true</optional>

       </dependency>

       <dependency>

           <groupId>org.springframework.boot</groupId>

           <artifactId>spring-boot-starter-test</artifactId>

           <scope>test</scope>

       </dependency>

       <dependency>

           <groupId>io.springfox</groupId>

           <artifactId>springfox-swagger-ui</artifactId>

           <version>2.9.2</version>

       </dependency>

       <dependency>

           <groupId>io.springfox</groupId>

           <artifactId>springfox-swagger2</artifactId>

           <version>2.9.2</version>

           <scope>compile</scope>

       </dependency>

       <dependency>

           <groupId>redis.clients</groupId>

           <artifactId>jedis</artifactId>

           <version>2.9.0</version>

       </dependency>

   </dependencies>

   <build>

       <plugins>

           <plugin>

               <groupId>org.springframework.boot</groupId>

               <artifactId>spring-boot-maven-plugin</artifactId>

           </plugin>

       </plugins>

   </build>

</project>

使用了两个lua脚本,一个用于执行lock,另一个执行unlock。

咱们简单看一下,lock脚本就是采用Redis事务执行的set nx px命令,其实还有set nx ex命令,这个ex命令是采用秒的方式进行设置过期时间,这个px是采用毫秒的方式设置过期时间。

value需要使用一个唯一的值,这个值在解锁的时候需要判断是否一致,如果一致的话就进行解锁。这个也是官方推荐的方法。另外在lock的地方我设置了一个result,用于输出测试时的结果,这样就可以结合程序去进行debug了。

local expire = tonumber(ARGV[2])

local ret = redis.call('set', KEYS[1], ARGV[1], 'NX', 'PX', expire)

local strret = tostring(ret)

//用于查看结果,我本机获取锁成功后程序返回随机结果"table: 0x7fb4b3700fe0",否则返回"false"

redis.call('set', 'result', strret)

if strret == 'false' then

   return false

else

   return true

end

redis.call('del', 'result')

if redis.call('get', KEYS[1]) == ARGV[1] then

   return redis.call('del', KEYS[1])

else

   return 0

end

来看下代码,主要写了两个方法,一个是用与锁另外一个是用于结解锁。这块需要注意的是使用RedisTemplate,这块意味着key和value一定都是String的,我在使用的过程中就出现了一些错误。首先初始化两个脚本到程序中,然后调用执行脚本。

package com.hqs.distributedlock.lock;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.data.redis.core.RedisTemplate;

import org.springframework.data.redis.core.script.RedisScript;

import org.springframework.stereotype.Component;

import java.util.Collections;

@Slf4j

@Component

public class DistributedLock {

   //注意RedisTemplate用的String,String,后续所有用到的key和value都是String的

   @Autowired

   private RedisTemplate<String, String> redisTemplate;

   @Autowired

   RedisScript<Boolean> lockScript;

   @Autowired

   RedisScript<Long> unlockScript;

   public Boolean distributedLock(String key, String uuid, String secondsToLock) {

       Boolean locked = false;

       try {

           String millSeconds = String.valueOf(Integer.parseInt(secondsToLock) * 1000);

           locked =redisTemplate.execute(lockScript, Collections.singletonList(key), uuid, millSeconds);

           log.info("distributedLock.key{}: - uuid:{}: - timeToLock:{} - locked:{} - millSeconds:{}",

                   key, uuid, secondsToLock, locked, millSeconds);

       } catch (Exception e) {

           log.error("error", e);

       }

       return locked;

   }

   public void distributedUnlock(String key, String uuid) {

       Long unlocked = redisTemplate.execute(unlockScript, Collections.singletonList(key),

               uuid);

       log.info("distributedLock.key{}: - uuid:{}: - unlocked:{}", key, uuid, unlocked);

   }

}

还有一个就是脚本定义的地方需要注意,返回的结果集一定是Long, Boolean,List, 一个反序列化的值。这块要注意。

package com.hqs.distributedlock.config;

import com.sun.org.apache.xpath.internal.operations.Bool;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.core.io.ClassPathResource;

import org.springframework.data.redis.core.script.DefaultRedisScript;

import org.springframework.data.redis.core.script.RedisScript;

import org.springframework.scripting.ScriptSource;

import org.springframework.scripting.support.ResourceScriptSource;

@Configuration

@Slf4j

public class BeanConfiguration {

   /**

    * The script resultType should be one of

    * Long, Boolean, List, or a deserialized value type. It can also be null if the script returns

    * a throw-away status (specifically, OK).

    * @return

    */

   @Bean

   public RedisScript<Long> limitScript() {

       RedisScript redisScript = null;

       try {

           ScriptSource scriptSource = new ResourceScriptSource(new ClassPathResource("/scripts/limit.lua"));

//            log.info("script:{}", scriptSource.getScriptAsString());

           redisScript = RedisScript.of(scriptSource.getScriptAsString(), Long.class);

       } catch (Exception e) {

           log.error("error", e);

       }

       return redisScript;

   }

   @Bean

   public RedisScript<Boolean> lockScript() {

       RedisScript<Boolean> redisScript = null;

       try {

           ScriptSource scriptSource = new ResourceScriptSource(new ClassPathResource("/scripts/lock.lua"));

           redisScript = RedisScript.of(scriptSource.getScriptAsString(), Boolean.class);

       } catch (Exception e) {

           log.error("error" , e);

       }

       return redisScript;

   }

   @Bean

   public RedisScript<Long> unlockScript() {

       RedisScript<Long> redisScript = null;

       try {

           ScriptSource scriptSource = new ResourceScriptSource(new ClassPathResource("/scripts/unlock.lua"));

           redisScript = RedisScript.of(scriptSource.getScriptAsString(), Long.class);

       } catch (Exception e) {

           log.error("error" , e);

       }

       return redisScript;

   }

   @Bean

   public RedisScript<Long> limitAnother() {

       DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();

       redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("/scripts/limit.lua")));

       redisScript.setResultType(Long.class);

       return redisScript;

   }

}

好了,这块就写好了,然后写好controller类准备测试。

@PostMapping("/distributedLock")

   @ResponseBody

   public String distributedLock(String key, String uuid, String secondsToLock, String userId) throws Exception{

//        String uuid = UUID.randomUUID().toString();

       Boolean locked = false;

       try {

           locked = lock.distributedLock(key, uuid, secondsToLock);

           if(locked) {

               log.info("userId:{} is locked - uuid:{}", userId, uuid);

               log.info("do business logic");

               TimeUnit.MICROSECONDS.sleep(3000);

           } else {

               log.info("userId:{} is not locked - uuid:{}", userId, uuid);

           }

       } catch (Exception e) {

           log.error("error", e);

       } finally {

           if(locked) {

               lock.distributedUnlock(key, uuid);

           }

       }

       return "ok";

   }

我也写了一个测试类,用于测试和输出结果, 使用100个线程,然后锁的时间设置10秒,controller里边需要休眠3秒模拟业务执行。

@Test

   public void distrubtedLock() {

       String url = "http://localhost:8080/distributedLock";

       String uuid = "abcdefg";

//        log.info("uuid:{}", uuid);

       String key = "redisLock";

       String secondsToLive = "10";

       for(int i = 0; i < 100; i++) {

           final int userId = i;

           new Thread(() -> {

               MultiValueMap<String, String> params = new LinkedMultiValueMap<>();

               params.add("uuid", uuid);

               params.add("key", key);

               params.add("secondsToLock", secondsToLive);

               params.add("userId", String.valueOf(userId));

               String result = testRestTemplate.postForObject(url, params, String.class);

               System.out.println("-------------" + result);

           }

           ).start();

       }

   }

获取锁的地方就会执行do business logic, 然后会有部分线程获取到锁并执行业务,执行完业务的就会释放锁。

分布式锁就实现好了,接下来实现分布式限流。先看一下limit的lua脚本,需要给脚本传两个值,一个值是限流的key,一个值是限流的数量。

获取当前key,然后判断其值是否为nil,如果为nil的话需要赋值为0,然后进行加1并且和limit进行比对,如果大于limt即返回0,说明限流了,如果小于limit则需要使用Redis的INCRBY key 1,就是将key进行加1命令。并且设置超时时间,超时时间是秒,并且如果有需要的话这个秒也是可以用参数进行设置。

//lua 下标从 1 开始

// 限流 key

local key = KEYS[1]

//限流大小

local limit = tonumber(ARGV[1])

// 获取当前流量大小

local curentLimit = tonumber(redis.call('get', key) or "0")

if curentLimit + 1 > limit then

   // 达到限流大小 返回

   return 0;

else

   // 没有达到阈值 value + 1

   redis.call("INCRBY", key, 1)

   // EXPIRE后边的单位是秒

   redis.call("EXPIRE", key, 10)

   return curentLimit + 1

end

执行limit的脚本和执行lock的脚本类似。

package com.hqs.distributedlock.limit;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.data.redis.core.RedisTemplate;

import org.springframework.data.redis.core.script.RedisScript;

import org.springframework.stereotype.Component;

import java.util.Collections;

/**

* @author huangqingshi

* @Date 2019-01-17

*/

@Slf4j

@Component

public class DistributedLimit {

   //注意RedisTemplate用的String,String,后续所有用到的key和value都是String的

   @Autowired

   private RedisTemplate<String, String> redisTemplate;

   @Autowired

   RedisScript<Long> limitScript;

   public Boolean distributedLimit(String key, String limit) {

       Long id = 0L;

       try {

           id = redisTemplate.execute(limitScript, Collections.singletonList(key),

                   limit);

           log.info("id:{}", id);

       } catch (Exception e) {

           log.error("error", e);

       }

       if(id == 0L) {

           return false;

       } else {

           return true;

       }

   }

}

接下来咱们写一个限流注解,并且设置注解的key和限流的大小:

package com.hqs.distributedlock.annotation;

import java.lang.annotation.ElementType;

import java.lang.annotation.Retention;

import java.lang.annotation.RetentionPolicy;

import java.lang.annotation.Target;

/**

* 自定义limit注解

* @author huangqingshi

* @Date 2019-01-17

*/

@Target(ElementType.METHOD)

@Retention(RetentionPolicy.RUNTIME)

public @interface DistriLimitAnno {

   public String limitKey() default "limit";

   public int limit() default 1;

}

然后对注解进行切面,在切面中判断是否超过limit,如果超过limit的时候就需要抛出异常exceeded limit,否则正常执行。

package com.hqs.distributedlock.aspect;

import com.hqs.distributedlock.annotation.DistriLimitAnno;

import com.hqs.distributedlock.limit.DistributedLimit;

import lombok.extern.slf4j.Slf4j;

import org.aspectj.lang.JoinPoint;

import org.aspectj.lang.annotation.Aspect;

import org.aspectj.lang.annotation.Before;

import org.aspectj.lang.annotation.Pointcut;

import org.aspectj.lang.reflect.MethodSignature;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.EnableAspectJAutoProxy;

import org.springframework.stereotype.Component;

import java.lang.reflect.Method;

/**

* @author huangqingshi

* @Date 2019-01-17

*/

@Slf4j

@Aspect

@Component

@EnableAspectJAutoProxy(proxyTargetClass = true)

public class LimitAspect {

   @Autowired

   DistributedLimit distributedLimit;

   @Pointcut("@annotation(com.hqs.distributedlock.annotation.DistriLimitAnno)")

   public void limit() {};

   @Before("limit()")

   public void beforeLimit(JoinPoint joinPoint) throws Exception {

       MethodSignature signature = (MethodSignature) joinPoint.getSignature();

       Method method = signature.getMethod();

       DistriLimitAnno distriLimitAnno = method.getAnnotation(DistriLimitAnno.class);

       String key = distriLimitAnno.limitKey();

       int limit = distriLimitAnno.limit();

       Boolean exceededLimit = distributedLimit.distributedLimit(key, String.valueOf(limit));

       if(!exceededLimit) {

           throw new RuntimeException("exceeded limit");

       }

   }

}

因为有抛出异常,这里我弄了一个统一的controller错误处理,如果controller出现Exception的时候都需要走这块异常。如果是正常的RunTimeException的时候获取一下,否则将异常获取一下并且输出。

package com.hqs.distributedlock.util;

import lombok.extern.slf4j.Slf4j;

import org.springframework.http.HttpStatus;

import org.springframework.web.bind.annotation.ControllerAdvice;

import org.springframework.web.bind.annotation.ExceptionHandler;

import org.springframework.web.bind.annotation.ResponseBody;

import org.springframework.web.bind.annotation.ResponseStatus;

import org.springframework.web.context.request.NativeWebRequest;

import javax.servlet.http.HttpServletRequest;

import java.util.HashMap;

import java.util.Map;

/**

* @author huangqingshi

* @Date 2019-01-17

* 统一的controller错误处理

*/

@Slf4j

@ControllerAdvice

public class UnifiedErrorHandler {

   private static Map<String, String> res = new HashMap<>(2);

   @ExceptionHandler(value = Exception.class)

   @ResponseStatus(HttpStatus.OK)

   @ResponseBody

   public Object processException(HttpServletRequest req, Exception e) {

       res.put("url", req.getRequestURL().toString());

       if(e instanceof RuntimeException) {

           res.put("mess", e.getMessage());

       } else {

           res.put("mess", "sorry error happens");

       }

       return res;

   }

}

好了,接下来将注解写到自定义的controller上,limit的大小为10,也就是10秒钟内限制10次访问。

@PostMapping("/distributedLimit")

   @ResponseBody

   @DistriLimitAnno(limitKey="limit", limit = 10)

   public String distributedLimit(String userId) {

       log.info(userId);

       return "ok";

   }

也是来一段Test方法来跑,老方式100个线程开始跑,只有10次,其他的都是limit。没有问题。

总结一下,这次实现采用了使用lua脚本和Redis实现了锁和限流,但是真实使用的时候还需要多测试,另外如果此次Redis也是采用的单机实现方法,使用集群的时候可能需要改造一下。

关于锁这块其实Reids自己也实现了RedLock, java实现的版本Redission。也有很多公司使用了,功能非常强大。各种场景下都用到了。

扩展阅读

利用Redis实现分布式锁

从分布式一致性谈到CAP理论、BASE理论

Redis 分布式锁:乐观锁的实现,以秒杀系统为例

从构建分布式秒杀系统聊聊限流特技

大型网站限流算法的实现和改造

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,542评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,596评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,021评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,682评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,792评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,985评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,107评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,845评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,299评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,612评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,747评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,441评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,072评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,828评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,069评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,545评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,658评论 2 350

推荐阅读更多精彩内容

  • 今天是星期五,老师说下午有树叶贴纸课。老师让我们准备胶棒剪刀A4纸还有树叶上午一放学我就坐公交回家了。一到家我就去...
    刘爱青家长阅读 292评论 0 0
  • 半夏1998_2005阅读 163评论 0 0
  • 好多朋友还是会问一些问题,大多问题,都集结成文字,看不懂可以交流,但,别一上来就倒一堆,仙人真的时间不够,探讨的前...
    纵情嬉戏天地间阅读 191评论 0 0
  • 待的久了,总渴望离开的日子快快到来。 将要离开,却希望时间即刻静止。 心有不舍,但知道不离开只会更难受。 我想,这...
    lqh_4a4a阅读 221评论 1 0
  • 张书云 有这样一个老太太,近八十岁。身才不高,白白胖胖,说起话来总是符合人的心理。 她家就像个聚乐部,她总是乐和和...
    qiushui__lianli阅读 292评论 4 7