本文将从以下几个方面分析限流策略:
- 什么是限流
- 限流算法
- 限流算法的应用
什么是限流
在开发高并发系统时,有很多手段来防止系统过载:缓存、降级、限流。缓存的目的是提升系统访问速度和增大系统的吞吐量,降级和限流的目的如下:
降级
降级是当服务出问题或者影响到核心流程的性能时需要暂时屏蔽掉某些功能,等高峰或者问题解决后再打开。降级一般有几种实现手段,自动降级和人工降级:
1、通过配置降级开关,实现对流程的控制
2、前置化降级开关, 基于 OpenResty+配置中心实现降级
3、业务降低,比如在大促的时候,会优先保证核心业务的流程可用
限流
限流是对资源访问做控制,防止恶意请求流量、恶意攻击、或者防止流量超过系统峰值。它有两个核心概念:
资源:被流量控制的对象,比如接口
策略:由限流算法和可调节的参数两部分组成
限流算法
漏桶算法
桶本身具有一个恒定的速率往下漏水,而上方时快时慢的会有水进入桶内。当桶还未满时,上方的水可以加入。一旦水满,上方的水就无法加入。桶满正是算法中的一个关键的触发条件(即流量异常判断成立的条件)。而此条件下如何处理上方流下来的水,有两种方式:
1、暂时拦截住上方水的向下流动,等待桶中的一部分水漏走后,再放行上方水
2、溢出的上方水直接抛弃
令牌桶算法(能够解决突发流量)
令牌桶算法是网络流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一种算法。典型情况下,令牌桶算法用来控制发送到网络上的数据的数目,并允许突发数据的发送。
令牌桶是一个存放固定容量令牌(token)的桶,按照固定速率往桶里添加令牌; 令牌桶算法实际上由三部分组成:两个流和一个桶,分别是令牌流、数据流和令牌桶
限流算法的应用
Guava 的 RateLimiter 实现
在 Guava 中 RateLimiter 的实现有两种: Bursty 和 WarmUp
bursty
bursty 基于令牌桶的算法实现。
RateLimiter rateLimiter=RateLimiter.create(permitPerSecond); //创建一个 bursty实例。
rateLimiter.acquire(); //获取 1 个 permit,当令牌数量不够时会阻塞直到获取为止
WarmingUp
WarmingUp基于漏桶的算法实现,QPS 是固定的,使用于需要预热时间的使用场景。
RateLimiter rateLimiter =RateLimiter.create(permitsPerSecond,warmupPeriod,timeUnit);//warmupPeriod 是指预热的时间
rateLimiter.acquire();//获取 1 个 permit
public class TokenDemo {
private int qps;
private int countOfReq;
private RateLimiter rateLimiter;
public TokenDemo(int qps, int countOfReq) {
this.qps = qps;
this.countOfReq = countOfReq;
}
public TokenDemo processWithTokenBucket(){
rateLimiter=RateLimiter.create(qps);
return this;
}
public TokenDemo processWithLeakyBucket(){
rateLimiter=RateLimiter.create(qps,00,TimeUnit.MILLISECONDS);
return this;
}
private void processRequest(){
System.out.println("RateLimiter:"+rateLimiter.getClass());
long start=System.currentTimeMillis();
for(int i=0;i<countOfReq;i++){
rateLimiter.acquire();
}
long end=System.currentTimeMillis()-start;
System.out.println("处理请求数量:"+countOfReq+"," +
"耗时:"+end+"," +
"qps:"+rateLimiter.getRate()+"," +
"实际 qps:"+Math.ceil(countOfReq / (end / 1000.00)));
}
public void doProcess() throws InterruptedException {
for(int i=0;i<20;i=i+5){
TimeUnit.SECONDS.sleep(i);
processRequest();
}
}
public static void main(String[] args) throws InterruptedException
{
new TokenDemo(50,100).processWithTokenBucket().doProcess();
new TokenDemo(50,100).processWithLeakyBucket().doProcess();
}
}