基于Redis + Lua的令牌桶限流器的实现

开发环境

  • jdk 11.0.10
  • SpringBoot 2.6.2
  • Idea

主要依赖

<dependency>
      <groupId>redis.clients</groupId>
       <artifactId>jedis</artifactId>
</dependency>
<dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-pool2</artifactId>
</dependency>
 <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-aop</artifactId>
</dependency>
 <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-devtools</artifactId>
      <scope>runtime</scope>
      <optional>true</optional>
  </dependency>
  <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-configuration-processor</artifactId>
      <optional>true</optional>
  </dependency>

核心代码

自定义注解
@Documented
@Retention(RUNTIME)
@Target(value = {ElementType.METHOD})
public @interface RateLimit {

    /**
     * 限流接口名称
     * @return 限流接口名称
     */
    String interfaceName();

    /**
     * 最大令牌数
     * @return 最大令牌数
     */
    long maxPermits();

    /**
     * 每秒生成的令牌数
     * @return
     */
    long tokensPerSeconds();
}
限流器抽象类
public abstract class RateLimiter {

    private static final Logger logger = LoggerFactory.getLogger(RateLimiter.class);

    /**
     * 是否开启限流
     */
    private boolean limited = true;

    /**
     * 开启限流功能
     */
    public void open() {
        if (!this.limited) {
            this.limited = true;
        } else {
            logger.info("the limiter has started...");
        }
    }

    /**
     * 关闭限流功能
     */
    public void close() {
        if (this.limited) {
            this.limited = false;
        } else {
            logger.info("the limiter has stopped...");
        }
    }

    /**
     * 获取令牌(指定接口限流)
     * @param interfaceName 需要限流的接口名
     * @param maxPermits 最大令牌数
     * @param tokensPerSeconds 每秒生成的令牌数
     * @return boolean 是否通过限流(获取到令牌)
     */
    protected abstract boolean acquire(String interfaceName, long maxPermits, long tokensPerSeconds);

    /**
     * 获取令牌(指定接口)
     * @param interfaceName 需要限流的接口名
     * @return boolean 是否通过限流(获取到令牌)
     */
    public boolean tryAcquire(String interfaceName, long maxPermits, long tokensPerSeconds) {
        if (this.limited) {
            return this.acquire(interfaceName, maxPermits, tokensPerSeconds);
        } else {
            return true;
        }
    }
}
令牌桶实现类
public class TokenBucketRateLimiter extends RateLimiter {

    private static final Logger logger = LoggerFactory.getLogger(TokenBucketRateLimiter.class);

    /**
     * redis的lua脚本
     */
    private DefaultRedisScript<Boolean> script;

    /**
     * redisTemplate
     */
    private RedisTemplate<String, Object> redisTemplate;

    public TokenBucketRateLimiter(DefaultRedisScript<Boolean> script, RedisTemplate<String, Object> redisTemplate) {
        this.script = script;
        this.redisTemplate = redisTemplate;
    }

    /**
     * 限流检测(单个接口)
     * @param interfaceName 需要限流的接口名
     * @param maxPermits 最大令牌数
     * @param tokensPerSeconds 每秒生成的令牌数
     * @return 是否通过限流 true: 通过
     */
    @Override
    protected boolean acquire(String interfaceName, long maxPermits, long tokensPerSeconds) {
        // 错误的参数将不起作用
        if (maxPermits <= 0 || tokensPerSeconds <= 0) {
            logger.warn("maxPermits and tokensPerSeconds can not be less than zero...");
            return true;
        }

        // 参数结构: KEYS = [限流的key]   ARGV = [最大令牌数, 每秒生成的令牌数, 本次请求的毫秒数]
        Boolean result = this.redisTemplate.execute(this.script, Collections.singletonList(interfaceName), maxPermits, tokensPerSeconds, System.currentTimeMillis());
        return result!=null && result;
    }
}
具体实现令牌桶的Lua脚本
-- LUA脚本会以单线程执行,不会有并发问题,一个脚本中的执行过程中如果报错,那么已执行的操作不会回滚
-- KEYS和ARGV是外部传入进来需要操作的redis数据库中的key,下标从1开始
-- 参数结构: KEYS = [限流的key]   ARGV = [最大令牌数, 每秒生成的令牌数, 本次请求的毫秒数]
local info = redis.pcall('HMGET', KEYS[1], 'last_time', 'stored_token_nums')
local last_time = info[1] --最后一次通过限流的时间
local stored_token_nums = tonumber(info[2]) -- 剩余的令牌数量
local max_token = tonumber(ARGV[1])
local token_rate = tonumber(ARGV[2])
local current_time = tonumber(ARGV[3])
local past_time = 0
local rateOfperMills = token_rate/1000 -- 每毫秒生产令牌速率

if stored_token_nums == nil then
    -- 第一次请求或者键已经过期
    stored_token_nums = max_token --令牌恢复至最大数量
    last_time = current_time --记录请求时间
else
    -- 处于流量中
    past_time = current_time - last_time --经过了多少时间

    if past_time <= 0 then
        --高并发下每个服务的时间可能不一致
        past_time = 0 -- 强制变成0 此处可能会出现少量误差
    end
    -- 两次请求期间内应该生成多少个token
    local generated_nums = math.floor(past_time * rateOfperMills)  -- 向下取整,多余的认为还没生成完
    stored_token_nums = math.min((stored_token_nums + generated_nums), max_token) -- 合并所有的令牌后不能超过设定的最大令牌数
end

local returnVal = 0 -- 返回值

if stored_token_nums > 0 then
    returnVal = 1 -- 通过限流
    stored_token_nums = stored_token_nums - 1 -- 减少令牌
    -- 必须要在获得令牌后才能重新记录时间。举例: 当每隔2ms请求一次时,只要第一次没有获取到token,那么后续会无法生产token,永远只过去了2ms
    last_time = last_time + past_time
end

-- 更新缓存
redis.call('HMSET', KEYS[1], 'last_time', last_time, 'stored_token_nums', stored_token_nums)
-- 设置超时时间
-- 令牌桶满额的时间(超时时间)(ms) = 空缺的令牌数 * 生成一枚令牌所需要的毫秒数(1 / 每毫秒生产令牌速率)
redis.call('PEXPIRE', KEYS[1], math.ceil((1/rateOfperMills) * (max_token - stored_token_nums)))

return returnVal
切面类
@Aspect
public class RateLimitAspect {

    private static final Logger logger = LoggerFactory.getLogger(RateLimitAspect.class);

    private RateLimiter rateLimiter;

    public RateLimitAspect(RateLimiter rateLimiter) {
        this.rateLimiter = rateLimiter;
    }

    /**
     * 标注切点-所有标识了RateLimit注解的方法
     */
    @Pointcut("@annotation(cn.t.redis.limiter.annotations.RateLimit)")
    public void pointCut(){};

    @Before("pointCut()")
    public void before(JoinPoint joinPoint) {
        Method method = ((MethodSignature)joinPoint.getSignature()).getMethod();
        RateLimit a = method.getAnnotation(RateLimit.class);
        if (a != null) {
            String name = a.interfaceName();
            long maxPermits = a.maxPermits();
            long tokensPerSeconds = a.tokensPerSeconds();
            // 执行限流判断
            var ret = this.rateLimiter.tryAcquire(name, maxPermits, tokensPerSeconds);
            if (!ret) {
                throw new RateLimitException("the interface can not be accessed in the meantime...");
            }
        }
    }
}
自定义异常
public class RateLimitException extends RuntimeException {

    public RateLimitException() {}

    public RateLimitException(String message) {
        super(message);
    }
}
自动配置类
@Configuration
@AutoConfigureBefore(RedisAutoConfiguration.class) // 高优先级,先于自动默认的自动配置生成RedisTemplate
public class LimiterAutoConfiguration {

    @Autowired
    private RedisConnectionFactory connectionFactory;

    /**
     * 配置redisTemplate
     * @return redisTemplate
     */
    @Bean
    @ConditionalOnMissingBean(RedisTemplate.class)
    public RedisTemplate<String, Object> redisTemplate() {

        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(this.connectionFactory);

        // 定义Jackson2JsonRedisSerializer序列化对象
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);

        ObjectMapper objectMapper = new ObjectMapper();
        // 指定要序列化的域,ALL:field,get和set等,ANY: 可见性,会将有private修饰符的字段也序列化
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);

        // 使用StringRedisSerializer来序列化和反序列化redis的key值
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        // 使用jackson2JsonRedisSerializer序列化和反序列化value
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
        // 属性设置完成afterPropertiesSet就会被调用,可以对设置不成功的做一些默认处理
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

    /**
     * redis的lua脚本对象
     * @return lua脚本对象
     */
    @Bean
    public DefaultRedisScript<Boolean> redisScript() {
        DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("RateLimiter.lua")));
        redisScript.setResultType(Boolean.class);
        return redisScript;
    }

    /**
     * 默认限流器的实现-令牌桶
     * @return 默认限流器
     */
    @Bean
    @ConditionalOnMissingBean(RateLimiter.class)
    public RateLimiter rateLimiter() {
        return new TokenBucketRateLimiter(this.redisScript(), this.redisTemplate());
    }

    /**
     * 限流切面
     * @param rateLimiter
     * @return
     */
    @Bean
    @ConditionalOnBean(RateLimiter.class)
    public RateLimitAspect rateLimitAspect(RateLimiter rateLimiter) {
        return new RateLimitAspect(rateLimiter);
    }
}
自动配置类指示文件(src/main/resources/META-INF/spring.factories)
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.t.redis.limiter.configuration.LimiterAutoConfiguration

打包后在需要使用限流功能的模块中引入即可

使用方法

  1. 引入本jar包

    <dependency>
        <groupId>cn.t.redis.limiter</groupId>
        <artifactId>limiter-spring-boot-starter</artifactId>
        <version>1.0.0</version>
     </dependency>
    
  2. 配置Redis连接信息

spring:
  redis:
    #host: localhost  # 单点连接ip
    #port: 18379 # # 单点连接端口
    timeout: 6000 # 连接超时时间
    password: your password
    client-type: lettuce #指定连接工厂类型
    cluster:
      max-redirects: 3  # 获取失败 最大重定向次数
      nodes: # 集群节点
        - 127.0.0.1:7001
        - 127.0.0.1:7002
        - 127.0.0.1:7003
        - 127.0.0.1:7004
        - 127.0.0.1:7005
        - 127.0.0.1:7006
    lettuce: # lettuce连接池
      pool:
        max-active: 100  # 连接池最大连接数(使用负值表示没有限制)
        max-idle: 20 # 最大空闲连接数
        min-idle: 10  # 最小空闲连接数
        max-wait: 1500 # 连接池最大阻塞等待时间(ms)(使用负值表示没有限制)
  1. 在需要限流的接口处使用注解
@RequestMapping("/index")
@RateLimit(interfaceName = "limit", maxPermits = 5, tokensPerSeconds = 1)
public String ratelimit() {
    return "hello world";
}
  1. 未通过限流的访问会抛出异常,建议在全局异常处理器中捕获处理。

例如:

@RestControllerAdvice
public class GlobalErrorController {
    @ExceptionHandler(RateLimitException.class)
    public String ratelimiteHanler(RateLimitException e) {
        return e.getMessage();
    }
}

代码地址: 基于Redis + Lua的令牌桶限流器的实现

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容