应用层限流:令牌桶算法与RateLimiter

一、高并发处理思想
建议先看看高并发的处理思想,脑子里先有个全局概念。
高并发处理的5个思想

二、令牌桶算法
以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。令牌桶允许请求某种程度的突发传输。这是令牌桶区别于漏桶的地方。

令牌桶算法

三、代码示例
pom.xml依赖

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>28.1-jre</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>

1、请求速率限制注解类

package com.hello;

import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;

@Target({ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RequestRateLimitAnnotation {
    /**
     * 固定令牌个数
     * @return
     */
    double limitNum();
    /**
     * 获取令牌超时时间
     * @return
     */
    long timeout();
    /**
     * 单位-默认毫秒
     * @return
     */
    TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
    /**
     * 无法获取令牌时的错误信息
     * @return
     */
    String errMsg() default "请求太频繁!";
}

2、aop拦截器

package com.hello;

import com.google.common.collect.Maps;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import javax.servlet.http.HttpServletRequest;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.Objects;

@Slf4j
@Aspect
@Component
public class RequestRateLimitAspect {
    /**
     * 使用url做为key,存放令牌桶 防止每次重新创建令牌桶
     */
    private Map<String, RateLimiter> limitMap = Maps.newConcurrentMap();

    // 拦截带RequestRateLimitAnnotation注解的接口         
  @Pointcut("@annotation(com.hello.RequestRateLimitAnnotation)")
    public void execMethod() {}

    @Around("execMethod()")
    public Object doAround(ProceedingJoinPoint joinPoint) throws Throwable {
        //获取请求uri
        HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
        String reqUrl=request.getRequestURI();

        // 获取令牌
        RequestRateLimitAnnotation rateLimiter = this.getRequestRateLimiter(joinPoint);
        if (Objects.nonNull(rateLimiter)) {
            RateLimiter limiter = getRateLimiter(reqUrl, rateLimiter);
            // 请求获取令牌,参数为等待超时时间
            boolean acquire = limiter.tryAcquire(rateLimiter.timeout(), rateLimiter.timeUnit());
            if (!acquire) {
                return new Response(200, reqUrl.concat(rateLimiter.errMsg())).toString();
            }
        }

        //获得令牌,继续执行
        return joinPoint.proceed();
    }


    /**
     * 获取RateLimiter
     * @return
     */
    private RateLimiter getRateLimiter(String reqUrl, RequestRateLimitAnnotation rateLimiter) {
        RateLimiter limiter = limitMap.get(reqUrl);
        if (Objects.isNull(limiter)) {
            synchronized (this) {
                limiter = limitMap.get(reqUrl);
                if (Objects.isNull(limiter)) {
                    // 创建一个限流器,参数代表每秒生成的令牌数
                    limiter = RateLimiter.create(rateLimiter.limitNum());
                    limitMap.put(reqUrl, limiter);
                    log.info("RequestRateLimitAspect请求{},创建令牌桶,容量{} 成功!!!", reqUrl, rateLimiter.limitNum());
                }
            }
        }
        return limiter;
    }

    /**
     * 获取注解对象
     * @param joinPoint 对象
     * @return ten LogAnnotation
     */
    private RequestRateLimitAnnotation getRequestRateLimiter(final JoinPoint joinPoint) {
        Method[] methods = joinPoint.getTarget().getClass().getDeclaredMethods();
        String name = joinPoint.getSignature().getName();
        if (!StringUtils.isEmpty(name)) {
            for (Method method : methods) {
                RequestRateLimitAnnotation annotation = method.getAnnotation(RequestRateLimitAnnotation.class);
                if (!Objects.isNull(annotation) && name.equals(method.getName())) {
                    return annotation;
                }
            }
        }
        return null;
    }
}

3、控制器类

package com.hello;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/test")
public class TestController {

    @RequestRateLimitAnnotation(limitNum = 2, timeout = 10)
    @GetMapping("/rate_limit")
    public String testRateLimit() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        /**
         * 测试代码
         */
        return "success";
    }
}

4、测试类

package com.hello;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class AccessClient {

    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);

    /**
     * get请求
     * @param realUrl
     * @return
     */
    public static String sendGet(URL realUrl) {
        String result = "";
        BufferedReader in = null;
        try {
            // 打开和URL之间的连接
            URLConnection connection = realUrl.openConnection();
            // 设置通用的请求属性
            connection.setRequestProperty("accept", "*/*");
            connection.setRequestProperty("connection", "Keep-Alive");
            connection.setRequestProperty("user-agent",
                    "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)");
            // 建立实际的连接
            connection.connect();

            // 定义 BufferedReader输入流来读取URL的响应
            in = new BufferedReader(new InputStreamReader(
                    connection.getInputStream()));
            String line;
            while ((line = in.readLine()) != null) {
                result += line;
            }
        } catch (Exception e) {
            System.out.println("发送GET请求出现异常!" + e);
            e.printStackTrace();
        }
        // 使用finally块来关闭输入流
        finally {
            try {
                if (in != null) {
                    in.close();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
        return result;
    }

    public void access() throws Exception{
        final URL url = new URL("http://localhost:8080/test/rate_limit");

        for(int i=0;i<4;i++) {
            fixedThreadPool.submit(new Runnable() {
                public void run() {
                    System.out.println(sendGet(url));
                }
            });
        }

        fixedThreadPool.shutdown();
        fixedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
    }

    public static void main(String[] args) throws Exception{
        AccessClient accessClient = new AccessClient();
        accessClient.access();
    }

}

测试结果:

Response(code=200, msg=/test/rate_limit请求太频繁!)
success
success
success

结果分析:
  部分请求由于获取的令牌可以成功执行,其余请求没有拿到令牌,我们可以根据实际业务来做区分处理。还有一点要注意,我们通过RateLimiter.create(2.0)配置的是每一秒2枚令牌,但是限流的时候发出的是3枚,改用其他值验证,也是实际的比配置的大1。

四、RateLimiter源码解析
待补充。

五、qps多大合适?
一直再说高并发,多少QPS才算高并发?
Web开发中,什么级别才算是高并发
总结我比较关心的几点:
1、如果某个系统的日pv在千万级别以上,他就可能是一个高并发的系统。
2、PV和QPS
比如微博每天1亿多pv的系统一般也就1500QPS,5000QPS峰值。
比如有人说:
2C4G机器单机一般1000QPS。
8C8G机器单机可承受7000QPS。
3、具体多少QPS跟业务强相关,只读接口读缓存,将压力给到缓存单机3000+没问题,写请求1000+也正常,也复杂些可能也就几百+QPS。
所以QPS和业务场景和设计相关性很大,比如可以通过浏览器本地缓存,用缓存做热点数据查询,写事务MQ异步处理等方式提升QPS。

参考资料

https://www.cnblogs.com/itfly8/p/12589212.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352