项目需要使用限流措施,查阅后主要使用令牌桶算法实现,为了更灵活的实现限流,就自己实现了一个简单的基于令牌桶算法的限流实现。
令牌桶算法描述
令牌桶这种控制机制基于令牌桶中是否存在令牌来指示什么时候可以发送流量。令牌桶中的每一个令牌都代表一个字节。如果令牌桶中存在令牌,则允许发送流量;而如果令牌桶中不存在令牌,则不允许发送流量。因此,如果突发门限被合理地配置并且令牌桶中有足够的令牌,那么流量就可以以峰值速率发送。
简单的说就是,一边请求时会消耗桶内的令牌,另一边会以固定速率往桶内放令牌。当消耗的请求大于放入的速率时,进行相应的措施,比如等待,或者拒绝等。
Java的简单实现
为了更灵活的定制限流措施,自己实现了限流的部分代码,如下:
/**
* @author xiezhengchao
* @since 18/1/3 上午9:45.
* 限流器
*/
public class RateLimiter{
private volatile int token;
private final int originToken;
private static Unsafe unsafe = null;
private static final long valueOffset;
private final Object lock = new Object();
static {
try {
// 应用开发中使用unsafe对象必须通过反射获取
Class<?> clazz = Unsafe.class;
Field f = clazz.getDeclaredField("theUnsafe");
f.setAccessible(true);
unsafe = (Unsafe) f.get(clazz);
valueOffset = unsafe.objectFieldOffset(RateLimiter.class.getDeclaredField("token"));
} catch (Exception ex) {throw new Error(ex);}
}
public RateLimiter(int token){
this.originToken = token;
this.token = token;
}
/**
* 获取一个令牌
*/
public boolean acquire(){
int current = token;
if(current<=0){
// 保证在token已经用光的情况下依然有获取竞争的能力
current = originToken;
}
long expect = 1000;// max wait 1s
long future = System.currentTimeMillis()+expect;
while(current>0){
if(compareAndSet(current, current-1)){
return true;
}
current = token;
if(current<=0 && expect>0){
// 在有效期内等待通知
synchronized (lock){
try {
lock.wait(expect);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
current = token;
if(current<=0){
current = originToken;
}
expect = future - System.currentTimeMillis();
}
}
return false;
}
private boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
/**
* 补充令牌
*/
public void supplement(final ExecutorService executorService){
this.token = originToken;
executorService.execute(() -> {
synchronized (lock){
lock.notifyAll();
}
});
}
}
核心代码在acquire方法部分主要思路就是,优先采用CAS进行自旋操作获取令牌,一直尝试令牌消耗光,那么会进入等待,在定时线程调用supplement方法时,会唤醒所有等待线程,此时进入CAS进行尝试消耗令牌,以此循环一直到设置的最大等待时间(代码中的expect)消耗光,如果还没获得令牌,那么会返回false
这段代码如果自己起一个线程进行限流,然后自己开个定时线程进行补充也可以,但是实际运用中往往需要一个限流管理器来分配限流器,然后通过限流管理器统一的进行定时触发,这样可以不用开很多的定时线程,同时通过线程池也避免了在定时线程竞争锁时引发的过长等待造成定时线程不准的情况。
下面贴出限流管理器部分代码
/**
* @author xiezhengchao
* @since 18/1/3 上午9:43.
* 限流管理器
*/
@Component
public class ConfineManager{
// 定时线程
private final ScheduledThreadPoolExecutor scheduledCheck = new ScheduledThreadPoolExecutor(2);
// 执行补充线程池
private final ExecutorService executorService = new ThreadPoolExecutor(5, 200,
60L, TimeUnit.SECONDS, new SynchronousQueue<>(),
new NamedThreadFactory("supplement",true,false));
// 限流器容器
private Map<String,RateLimiter> rateLimiterMap = new ConcurrentHashMap<>();
@PostConstruct
public void init(){
scheduledCheck.scheduleAtFixedRate(new SupplementRateLimiter(), 1, 1, TimeUnit.SECONDS);
}
@PreDestroy
public void destroy(){
scheduledCheck.shutdown();
}
/**
* 通过key获取相应的限流器
*/
public void acquire(String key,int tokenCount){
RateLimiter rateLimiter = rateLimiterMap.get(key);
// 双检锁确保安全创建
if(rateLimiter==null){
synchronized (this){
// init RateLimiter
rateLimiter = rateLimiterMap.computeIfAbsent(key, k -> new RateLimiter(tokenCount));
}
}
// 尝试获取令牌
if(!rateLimiter.acquire()){
// 获取失败,根据实际情况进行处理,这里直接抛异常了
Assert.throwBizException(ErrorCode.API_CONFINE_RATE_LIMITER);
}
}
/**
* 补充相应的令牌数
*/
private class SupplementRateLimiter implements Runnable{
@Override
public void run(){
rateLimiterMap.values().forEach(rateLimiter -> rateLimiter.supplement(executorService));
}
}
}
代码中主要是创建了定时线程,补充令牌线程池。
部分代码不是开源的,需要调整下,不影响主流程,代码使用了一些spring的注解。
其中的不足
在集群的环境下,没有考虑分布式的情况,也就是如果一个应用部署的限流是1s产生10个令牌,假设部署了5个应用,那么实际1s可以产生50个令牌。如果需要考虑这部分,那么在CAS操作可以替换为通过redis的setnx来进行获取锁操作然后更新redis存储对应的令牌,补充则直接设置更新redis对应的令牌数即可,这样效率肯定比现在基于CAS操作低。
总结
实际上实现这个限流器更多的考虑是可以自行定义等待的最大时间,超时措施,定时补充令牌时间间隔等,同时也温习了一下之前的并发知识。