漏斗限流是最常用的限流方法之一
漏斗的剩余空间就代表着当前行为可以持续进行的数量,漏嘴的流水速率代表着系统允许该行为的最大频率。
public class FunnelRateLimiter {
static class Funnel {
//漏斗容量
int capacity;
//流出速率
float leakingRate;
//漏斗剩余空间
int leftQuota;
//上一次漏水时间
long leakingTs;
public Funnel(int capacity, float leakingRate) {
this.capacity = capacity;
this.leakingRate = leakingRate;
this.leftQuota = capacity;
this.leakingTs = System.currentTimeMillis();
}
/**
*核心方法,
*/
void makeSpace() {
long nowTs = System.currentTimeMillis();
//距离上一次漏水过去了多久
long deltaTs = nowTs - leakingTs;
//腾出的空间
int deltaQuota = (int) (deltaTs * leakingRate);
if (deltaQuota < 0) { // 腾的空间过小等待下一次
this.leftQuota = capacity;
this.leakingTs = nowTs;
return;
}
if (deltaQuota < 1) { // 腾出空间太小,最小单位是 1
return;
}
//增加剩余空间
this.leftQuota += deltaQuota;
//记录漏水时间
this.leakingTs = nowTs;
//剩余空间不得高于容量
if (this.leftQuota > this.capacity) {
this.leftQuota = this.capacity;
}
}
boolean watering(int quota) {
makeSpace();
//判断剩余空间是否足够
if (this.leftQuota >= quota) {
this.leftQuota -= quota;
return true;
}
return false;
}
}
private Map<String, Funnel> funnels = new HashMap<>();
public boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate) {
String key = String.format("%s:%s", userId, actionKey);
Funnel funnel = funnels.get(key);
if (funnel == null) {
funnel = new Funnel(capacity, leakingRate);
funnels.put(key, funnel);
}
return funnel.watering(1); // 需要 1 个 quota
}
}
分布式限流实现
Redis 4.0 提供了一个限流 Redis 模块,它叫 redis-cell。该模块也使用了漏斗算法,并提供了原子的限流指令。
该模块只有 1 条指令 cl.throttle。
是允许「用户老钱回复行为」的频率为每 60s 最多 30 次(漏水速率),漏斗的初始容量为 15,也就是说一开始可以连续回复 15 个帖子,然后才开始受漏水速率的影响。
> cl.throttle laoqian:reply 15 30 60
1) (integer) 0 # 0 表示允许,1 表示拒绝
2) (integer) 15 # 漏斗容量 capacity
3) (integer) 14 # 漏斗剩余空间 left_quota
4) (integer) -1 # 如果拒绝了,需要多长时间后再试(漏斗有空间了,单位秒)
5) (integer) 2 # 多长时间后,漏斗完全空出来(left_quota==capacity,单位秒)
在执行限流指令时,如果被拒绝了,就需要丢弃或重试。直接取返回结果数组的第四个值进行 sleep 即可,如果不想阻塞线程,也可以异步定时任务来重试。