接口限流
简述
有时候,接口对外提供服务的时候,需要保护我们的接口,避免并发过大导致系统瘫痪。
限流算法
常用的限流算法存在两种:漏桶算法和令牌桶算法。
漏桶算法:故名思意,就是桶下面有个洞,以恒定的速率处理请求,请求过来放入桶中,请求量大于桶的容量益处,则拒绝服务。
令牌桶算法:令牌桶算法是一个存放固定容量令牌的桶,按照固定速率往桶里添加令牌。令牌桶算法的描述如下: 假设限制2r/s,则按照500毫秒的固定速率往桶中添加令牌; 桶中最多存放b个令牌,当桶满时,新添加的令牌被丢弃或拒绝; 当一个n个字节大小的数据包到达,将从桶中删除n个令牌,接着数据包被发送到网络上; 如果桶中的令牌不足n个,则不会删除令牌,且该数据包将被限流(要么丢弃,要么缓冲区等待)
限流工具类RateLimiter
Google开源工具包Guava提供了限流工具类RateLimiter,该类基于令牌桶算法来完成限流,非常易于使用。
方法摘要
修饰符和类型 | 方法和描述 |
---|---|
double | acquire()从RateLimiter获取一个许可,该方法会被阻塞直到获取到请求 |
double | acquire(int permits)从RateLimiter获取指定许可数,该方法会被阻塞直到获取到请求 |
static RateLimiter | create(double permitsPerSecond)根据指定的稳定吞吐率创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少查询) |
static RateLimiter | create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)根据指定的稳定吞吐率和预热期来创建RateLimiter,这里的吞吐率是指每秒多少许可数(通常是指QPS,每秒多少个请求量),在这段预热时间内RateLimiter每秒分配的许可数会平稳地增长直到预热期结束时达到其最大速率。(只要存在足够请求数来使其饱和) |
double | getRate()返回RateLimiter 配置中的稳定速率,该速率单位是每秒多少许可数 |
void | setRate(double permitsPerSecond)更新RateLimite的稳定速率,参数permitsPerSecond 由构造RateLimiter的工厂方法提供。 |
String | toString()返回对象的字符表现形式 |
boolean | tryAcquire()从RateLimiter 获取许可,如果该许可可以在无延迟下的情况下立即获取得到的话 |
boolean | tryAcquire(int permits)从RateLimiter 获取许可数,如果该许可数可以在无延迟下的情况下立即获取得到的话 |
boolean | tryAcquire(int permits, long timeout, TimeUnit unit)从RateLimiter 获取指定许可数如果该许可数可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可数的话,那么立即返回false (无需等待) |
boolean | tryAcquire(long timeout, TimeUnit unit)从RateLimiter 获取许可如果该许可可以在不超过timeout的时间内获取得到的话,或者如果无法在timeout 过期之前获取得到许可的话,那么立即返回false(无需等待) |
举例
一个秒杀活动限流例子。每秒限流10个请求
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author td
* @date 2017/09/13
*/
public class TokenBucket {
private AtomicInteger phoneNumbers = new AtomicInteger(0);
private final static int LIMIT = 100;
private RateLimiter rateLimiter = RateLimiter.create(10);
private final int saleLimit;
public TokenBucket() {
this(LIMIT);
}
public TokenBucket(int saleLimit) {
this.saleLimit = saleLimit;
}
public int buy() {
Stopwatch stopwatch = Stopwatch.createStarted();
boolean success = rateLimiter.tryAcquire(10, TimeUnit.MILLISECONDS);
if (success) {
int phoneNum = phoneNumbers.getAndIncrement();
if (phoneNum>=saleLimit) {
throw new IllegalStateException("not any phone can be sale,please wait to next time.");
}
handleOrder();
System.out.println(Thread.currentThread()+"user get the phone "+phoneNum+",ELT:"+stopwatch.stop());
return phoneNum;
}else {
stopwatch.stop();
throw new RuntimeException("Sorry,occur excepiton when buy phone");
}
}
private void handleOrder() {
try {
TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* @author td
* @date 2017/09/13
*/
public class TokenBucketExample {
public static void main(String[] args) {
final TokenBucket tokenBucket = new TokenBucket();
for (int i=0;i< 200;i++) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(tokenBucket::buy).start();
}
}
}