开篇
一周一篇技术博文又来了,这周我们讲点什么呢?看标题就知道了,那就是分布式下的限流策略(实在不知道写些什么好呢),至于限流的用处,好处,和处理场景就不这里赘述了(Google全是)。ok,凌云小课堂正式开始啦,今天要介绍的主要是三种限流策略
1. 计数限流
2. 漏桶限流
3. 令牌桶限流
基本上述三种限流策略可以涵盖大多数需要限流的场景。为了和广大的技术博主不一样(555),我们今天只讲分布式下的限流实现,至于什么是漏桶和令牌桶Google喽。。。
计数限流(时间窗)
计数限流主要有两种,一个是计数器限流,简单粗暴不值一提,另一个就是本文要讲的滑动时间窗限流法,我们粗暴的贴代码。。。
/**
* 简单时间窗限流策略:
* 每X秒允许行为发生Y次
*
* @param key
* @param period
* @param maxCount
* @return
*/
public boolean isActionAllowed(String key, int period, int maxCount) {
ShardedJedis jedis = pool.getResource();
long nowTs = System.currentTimeMillis();
try {
ShardedJedisPipeline pipe = jedis.pipelined();
// 移除时间窗之前所有集合
pipe.zremrangeByScore(key, 0f, new Double(nowTs - period * 1000));
// 获取窗口内的行为数量
Response<Long> count = pipe.zcard(key);
pipe.sync();
// 是否超过限制
if (count.get() <= maxCount) {
// 记录行为
pipe.zadd(key, nowTs, "" + nowTs);
// 设置zset过期时间,避免冷用户持续占用内存,过期时间等于时间窗口,再多宽限 1s
pipe.expire(key, period + 1);
pipe.sync();
return true;
}
} catch (Exception e) {
log.error("redis timeLimit:Error", e);
} finally {
jedis.close();
}
return false;
}
这个限流策略是基于Redis实现的。主要精髓就是利用zset的score数学特性,将时间戳存于score中。处理逻辑如下
1.每次进件先移除时间窗(给定的时间周期)外的所有key
2.对比当前zset中key的总量与MaxCount确定是否放弃本次进件
3.如果同意zset中新增value
4.设置过期时间
这个实现方式在并发量不大的应用中是完全可以应付的,但是一旦并发量过大在第2步和第3步之间因为不是原子操作,所以可能出现key的总量突破最大限流数。所以一定要用在合适的场景中。为什么不用lua实现原子操作呢?因为ShardedJedis不支持。。。默默哭泣中。
漏桶限流-令牌桶限流
漏桶和令牌桶是一对孪生兄弟,它两都是流量整形,限流中的常用算法,唯一区别呢就是令牌桶是允许突发流量的而漏桶严格限制流量速率。他们的实现甚至可能一摸一样。。。
我们继续暴力贴代码。。555
/**
* 漏斗限流
*
* @author Lingyun
* @Date 2018-12-08 21:48
*/
public class FunnelRateLimiter {
private Map<String, Funnel> funnels = new HashMap<>();
public boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate) {
String key = String.format("%s:%s", userId, actionKey);
Funnel funnel = funnels.get(key);
if (funnel == null) {
funnel = new Funnel(capacity, leakingRate);
funnels.put(key, funnel);
}
return funnel.watering(1); // 需要1个quota
}
static class Funnel {
/**
* 漏斗容量
*/
int capacity;
/**
* 漏嘴流水速率
*/
float leakingRate;
/**
* 漏斗剩余空间
*/
int leftQuota;
/**
* 上一次进水时间
*/
long leakingTs;
public Funnel(int capacity, float leakingRate) {
this.capacity = capacity;
this.leakingRate = leakingRate;
this.leftQuota = capacity;
this.leakingTs = System.currentTimeMillis();
}
/**
* 空间修正
*/
void makeSpace() {
long nowTs = System.currentTimeMillis();
long deltaTs = nowTs - leakingTs;//距离上次进水时间差
int deltaQuota = (int) (deltaTs * leakingRate);//可腾出空间
if (deltaQuota < 0) { // 间隔时间过长,整数数字过大溢出
this.leftQuota = capacity;
this.leakingTs = nowTs;
return;
}
if (deltaQuota < 1) { // 可腾出空间过小,最小单位是1
return;
}
this.leftQuota += deltaQuota;
this.leakingTs = nowTs;
//判断剩余空间是否超过总容量
if (this.leftQuota > this.capacity) {
this.leftQuota = this.capacity;
}
}
boolean watering(int quota) {
makeSpace();
if (this.leftQuota >= quota) {//判断剩余空间是否足够
this.leftQuota -= quota;
return true;
}
return false;
}
}
}
以上就是漏桶限流的java实现,至于令牌桶限流代码,Google的Guava中有该算法的实现(RateLimiter)使用还是非常简单的(意思就是自行Google)。
说好的分布式呢。。。
漏桶算法的分布式实现思路
将Funnel 对象的内容按字段存储到一个 hash 结构中,进水的时候将 hash 结构的字段取出来进行逻辑运算后,再将新值回填到 hash 结构中就完成了一次行为频度的检测。
但是有个问题,我们无法保证整个过程的原子性。从 hash 结构中取值,然后在内存里运算,再回填到 hash 结构,这三个过程无法原子化,意味着需要进行适当的加锁控制。而一旦加锁,就意味着会有加锁失败,加锁失败就需要选择重试或者放弃。
如果重试的话,就会导致性能下降。如果放弃的话,就会影响用户体验。同时,代码的复杂度也跟着升高很多。这真是个艰难的选择,我们该如何解决这个问题呢?救星来了!我们可以使用Redis-Cell,可惜的是Redis-Cell是一个模块,而模块特效是Redis4.0中带来的。。。而公司的Redis还是可怜的2.X版本。。。
令牌桶分布式实现
我们公司使用的接口限流策略就是令牌桶限流。实现方式是Guava的RateLimit+zookeeper,zookeeper中存储了网关中心的服务数量,各个服务均分接口流量,并对网关中心最高QPS也在每个服务器上做均分。这样的好处是限流令牌在本地保存,不通过网络传输,每台服务器绝对的均分限流,当集群中有服务下线也不会影响其他服务器的限流阈值,不会因为一台服务的下线流量集中导致其他服务的限流阈值上升,继而出现雪崩。缺点就是不灵活,在高并发时负载不均衡的情况肯定会出现,但是因为每台服务器的限流阈值是一个定值,这就导致某些压力较大的服务不能灵活的根据整个集群的限流情况,调整限流阈值,只能拒绝服务。
暴力贴代码又来了。。。
/**
* 限流校验
*
* @param key 限流缓存对象key
* @return 校验通过返回true 反之返回false
*/
private boolean checkRateLimit(String key, Integer limiterConcurrency, Integer limiterTimeOut) {
boolean valid = true;
try {
long startTime = System.currentTimeMillis();
int tmpCount = serverCount == 0 ? DEFAUL_SERVER_COUNT : serverCount;
double permitsPerSecond = BigDecimal.valueOf(limiterConcurrency).divide(BigDecimal.valueOf(tmpCount), 2, RoundingMode.FLOOR).doubleValue();
RateLimiter rateLimiter = getRateLimiter(key, permitsPerSecond);
if (limiterTimeOut == null || limiterTimeOut == 0) {
valid = rateLimiter.tryAcquire();
} else {
valid = rateLimiter.tryAcquire(limiterTimeOut, TimeUnit.MILLISECONDS);
}
long endTime = System.currentTimeMillis();
LOGGER.info(String.format("api接口限流校验,key=%s,valid=%s,serverCount=%s,耗时=%s", key, valid, tmpCount, endTime - startTime));
} catch (Throwable e) {
LOGGER.error(String.format("api接口限流校验异常,key=%s", key), e);
}
return valid;
}
/**
* 获取限流对象
*
* @param key 限流对象缓存KEY
* @param permitsPerSecond
* @return
*/
private RateLimiter getRateLimiter(String key, double permitsPerSecond) {
RateLimiter rateLimiter = rateLimiterMap.get(key);
if (rateLimiter == null) {
synchronized (this) {
rateLimiter = rateLimiterMap.get(key);
if (rateLimiter == null) {
rateLimiter = RateLimiter.create(permitsPerSecond);
rateLimiterMap.put(key, rateLimiter);
return rateLimiter;
}
}
}
double diff = permitsPerSecond - rateLimiter.getRate();
if (diff >= 0.01 || diff < 0) {
LOGGER.info(String.format("api接口限流-并发量更新,key=%s,permitsPerSecond=%s", key, permitsPerSecond));
rateLimiter.setRate(permitsPerSecond);
}
return rateLimiter;
}
又要快乐的结束了.....
写到这里时,分布式限流终于要结束了,开开心心的看了看(>﹏<)左耳朵耗子的限流设计博文,嗯,我真是渣.....