描述
限流是指将处理请求数限定在单位时间的阀值内。常用的限流算法固定时间窗口限流算法和滑动时间窗口限流算法。固定时间窗口限流算法实现有计数器、漏桶、令牌桶。
比较
计数器是在缓存中增加计数器,每个请求加1,如果计数器大于阀值则拒绝请求。比如API元数据100每秒100次请求。第一个时间窗口的最后10ms有90次,第二个时间窗口的最前10ms有90次。那这20ms中的请求数已经超过100次,而这些请求限流失败。这种现象称为突刺现象。
漏桶是创建请求阀值大小的容器,请求过来后都放入容器中。如果容器满了则拒绝请求。另一端匀速的从容器中取出请求并处理。如果请求数波动大时。第一个时间窗口中大量请求过来,出现大量拒绝请求,而第二个时间窗口少量请求,就会对资源利用率少。
令牌桶创建一个盛放令牌的容器,以固定速率生成令牌。请求过来后去容器中取出令牌。如果令牌没有则请求拒绝。这个方法比较常用,可以解决流量波动情况。
简单实现
计数器算法实现简单分布式流控。使用redis做全局流控,jvm缓存做流控计数器。其中redis工具类使用Jedis。
思路:流控阀值初始化加载到jvm缓存,jvm进程中判断是否达到流控阀值,没有就将请求数加一。异步从redis中减去jvm缓存中请求数,这时请求数不清零。jvm缓存异步更新时将请求数置为零。
缺陷:
1.异步更新全局流控,不能精准控制流控。
2.时间窗口中无法避免突刺现象。
public class FlowControl {
private static final ExecutorService refreshPool = Executors.newFixedThreadPool(1);
private static final ScheduledExecutorService initWriteBackPool = Executors.newScheduledThreadPool(1);
private static final ScheduledExecutorService initPool = Executors.newScheduledThreadPool(1);
private static final LoadingCache<String, Pair<LongAdder, LongAdder>> cache = CacheBuilder.newBuilder()
.refreshAfterWrite(200, TimeUnit.MILLISECONDS)
.expireAfterAccess(10, TimeUnit.MINUTES)
.build(new CacheLoader<String, Pair<LongAdder, LongAdder>>() {
@Override
public Pair<LongAdder, LongAdder> load(String key) throws Exception {
String value = RedisCli.command(jedis -> jedis.get(key));
LongAdder threshold = new LongAdder();
threshold.add(Long.parseLong(value));
return new Pair<>(threshold, new LongAdder());
}
@Override
public ListenableFuture<Pair<LongAdder, LongAdder>> reload(String key, Pair<LongAdder, LongAdder> oldValue) throws Exception {
//return super.reload(key, oldValue);
System.out.println(key);
ListenableFutureTask<Pair<LongAdder, LongAdder>> task = ListenableFutureTask.create(() -> load(key));
refreshPool.execute(task);
return task;
}
});
static {
initWriteBackPool.scheduleWithFixedDelay(() -> {
System.out.println("write back");
Jedis jedis = RedisCli.getJedis();
Pipeline pipelined = jedis.pipelined();
try {
ConcurrentMap<String, Pair<LongAdder, LongAdder>> map = cache.asMap();
Iterator<String> iterator = map.keySet().iterator();
while (iterator.hasNext()) {
String next = iterator.next();
Pair<LongAdder, LongAdder> pair = cache.get(next);
if (pair.getValue().longValue() > 0) {
pipelined.decrBy(next, pair.getValue().longValue());
}
}
pipelined.close();
RedisCli.returnJedis(jedis);
} catch (ExecutionException e) {
e.printStackTrace();
}
}, -1, 100, TimeUnit.MILLISECONDS);
initPool.scheduleAtFixedRate(() -> {
RedisCli.command(jedis -> {
return jedis.set("sayHello", "50");
});
}, -1, 1, TimeUnit.SECONDS);
}
public static void flowControl(String name, int i) {
try {
Pair<LongAdder, LongAdder> pair = cache.get(name);
if (pair != null) {
LongAdder threshold = pair.getKey();
LongAdder flowTime = pair.getValue();
if (threshold.longValue() > flowTime.longValue()) {
flowTime.increment();
System.out.println(Thread.currentThread().getName() + "_流量控制_" + "request_" + cache.get(name).getValue());
}
}
} catch (ExecutionException e) {
e.printStackTrace();
}
}
测试代码
public static void main(String[] args) {
String name = "sayHello";
ScheduledExecutorService testPool1 = Executors.newScheduledThreadPool(10);
testPool1.scheduleAtFixedRate(() -> {
for (int i = 0; i < 100; i++) {
final int a = i;
FlowControl.flowControl(name, a);
}
}, -1, 1, TimeUnit.SECONDS);
ScheduledExecutorService testPool2 = Executors.newScheduledThreadPool(10);
testPool2.scheduleAtFixedRate(() -> {
for (int i = 0; i < 100; i++) {
final int a = i;
FlowControl.flowControl(name, a);
}
}, -1, 1, TimeUnit.SECONDS);
}
引用
https://www.iteye.com/blog/jinnianshilongnian-2305117
https://tech.youzan.com/api-gateway-in-practice/
浅析如何设计一个亿级网关
https://www.jianshu.com/p/76cc8ba5ca91
https://wetest.qq.com/lab/view/320.html?from=content_qcloud
https://github.com/abbshr/abbshr.github.io/issues/52
https://www.changping.me/2019/03/30/distributed-servicegovernance-flowcontrol-2/