1. 先谈谈Nginx分流
对,要考虑限流先得假设访问量达到了一定得程度,再高并发得前提下,请求过多很有可能导致某天服务器承受不了导致死机。
在这个前提下,我相信你最先想到的一定是nginx,使用nginx分流让所有的请求不要直接到达某一个服务器,当并发量继续上升的时候,我提供更多的服务器视乎就能解决问题了,这想法看上去很对,而且很多行业就是这么做的,比如下面的nginx配置文件,很简单,我相信你一看就能懂
#user nobody;
worker_processes 1;
events {
worker_connections 1024;
}
http {
upstream enjoy{
server 127.0.0.1:8080;
server 127.0.0.1:8081;
}
server {
listen 80;
location / {
proxy_pass http://enjoy;
}
}
}
在上面案例中我配置了个代理,当高并发的请求过来的时候,会把请求分发给8080与8081两个端口的服务器。
但现在我们假设一个极端的情况,这个时候由于业务有个秒杀要求,请求过大一下就让这两个服务器爆了,这个时候在我继续增加服务器之前这整个秒杀业务几乎都处在瘫痪状态,而这突然的访问量是你始料未及的,也就是说你根本就没法事先准备好足够的服务器来解决这种情况。
这个时候我相信你已经看出了如果仅仅做分流依然会出现很严重的问题,怎么办呢?这个时候你就需要限流了。
2. 再说下限流
限流依然是再高并发的前提下,如果某个服务器承受不了数目过多的请求量的一种限制机制,不同的是分流是让请求分发的其他服务器,而限流是达到某个阈值后直接不让你访问了
如果想完成限流的功能其实是有一些解决方案的(算法),比如来说,基于令牌桶的,程序计数器以及漏桶算法;
今天挑一个最简单的计数器方式来讲讲限流
3. 解决方案
计数器的解决方式是最简单最容易实现的一种解决方案,假设有一个接口,要求1分钟的访问量不能超过10次
这样当有任何请求过来,我可以让计数器+1;如果这个计数器的值大于10,而且和第一次的请求相比,时间间隔在1分钟以内,那么久能说明该请求访问过多。
如果这个请求与第一次请求的访问时间之间的间隔超过了1分钟,那么该计数器的值久还是限流范围之内,接下来久只要重置计数器就好;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class EnjoyCountLimit {
private int limtCount = 60;// 限制最大访问的容量
AtomicInteger atomicInteger = new AtomicInteger(0); // 每秒钟 实际请求的数量
private long start = System.currentTimeMillis();// 获取当前系统时间
private int interval = 60*1000;// 间隔时间60秒
public boolean acquire() {
long newTime = System.currentTimeMillis();
if (newTime > (start + interval)) {
// 判断是否是一个周期
start = newTime;
atomicInteger.set(0); // 清理为0
return true;
}
atomicInteger.incrementAndGet();// i++;
return atomicInteger.get() <= limtCount;
}
static EnjoyCountLimit limitService = new EnjoyCountLimit();
public static void main(String[] args) {
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
for (int i = 1; i < 100; i++) {
final int tempI = i;
newCachedThreadPool.execute(new Runnable() {
public void run() {
if (limitService.acquire()) {
System.out.println("你没有被限流,可以正常访问逻辑 i:" + tempI);
} else {
System.out.println("你已经被限流呢 i:" + tempI);
}
}
});
}
}
}
这个计数器的限流方式很简单吧,但这样问题吗?好好想想……
还是以60运行访问10次请求为例,在第一次0-58秒之内,没有访问请求,在59秒之内突然来了10次请求,这个时候会做什么,由于已经到了1分钟计数器会重置。
这个时候第二次的1秒内(1分0秒)又了10请求,这个时候是不是就在2秒之内有20个请求被放行了呢?(59秒,1分0秒),如果某个服务器的访问量只能是10次请求,那这种限流方式已经导致服务器挂了;
4. 滑动窗口计数器
前面已经知道简单的计数器的实现方式,也知道他会出现的一些问题,虽然这些问题举得有些极端,但还是有更好得解决方案,这方案就是使用滑动窗口计数器
滑动窗口计数器得原理是在没错请求过来得时候,先判断前面N个单位内得总访问量是否操过得阈值,并且在当前得时间单位得请求数上+1
举例来说,要求1分钟的访问量不能超过10次
可以把1分钟看成是6个10秒钟的时间,0-9秒的访问数记录到第一个格子,10-19秒的访问数记录数记录到第二个格子以此内推,每次统计将6个格子里面的数据求和,如果超过了10次就不允许访问。
import java.util.concurrent.atomic.AtomicInteger;
public class EnjoySlidingWindow {
private AtomicInteger[] timeSlices;
/* 队列的总长度 */
private final int timeSliceSize;
/* 每个时间片的时长 */
private final long timeMillisPerSlice;
/* 窗口长度 */
private final int windowSize;
/* 当前所使用的时间片位置 */
private AtomicInteger cursor = new AtomicInteger(0);
public static enum Time {
MILLISECONDS(1),
SECONDS(1000),
MINUTES(SECONDS.getMillis() * 60),
HOURS(MINUTES.getMillis() * 60),
DAYS(HOURS.getMillis() * 24),
WEEKS(DAYS.getMillis() * 7);
private long millis;
Time(long millis) {
this.millis = millis;
}
public long getMillis() {
return millis;
}
}
public EnjoySlidingWindow(int windowSize, Time timeSlice) {
this.timeMillisPerSlice = timeSlice.millis;
this.windowSize = windowSize;
// 保证存储在至少两个window
this.timeSliceSize = windowSize * 2 + 1;
init();
}
/**
* 初始化
*/
private void init() {
AtomicInteger[] localTimeSlices = new AtomicInteger[timeSliceSize];
for (int i = 0; i < timeSliceSize; i++) {
localTimeSlices[i] = new AtomicInteger(0);
}
timeSlices = localTimeSlices;
}
private int locationIndex() {
long time = System.currentTimeMillis();
return (int) ((time / timeMillisPerSlice) % timeSliceSize);
}
/**
* <p>对时间片计数+1,并返回窗口中所有的计数总和
* <p>该方法只要调用就一定会对某个时间片进行+1
* @return
*/
public int incrementAndSum() {
int index = locationIndex();
int sum = 0;
// cursor等于index,返回true
// cursor不等于index,返回false,并会将cursor设置为index
int oldCursor = cursor.getAndSet(index);
if (oldCursor == index) {
// 在当前时间片里继续+1
sum += timeSlices[index].incrementAndGet();
} else {
//轮到新的时间片,置0,可能有其它线程也置了该值,容许
timeSlices[index].set(0);
// 清零,访问量不大时会有时间片跳跃的情况
clearBetween(oldCursor, index);
sum += timeSlices[index].incrementAndGet();
}
for (int i = 1; i < windowSize; i++) {
sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
}
return sum;
}
/**
* 判断是否允许进行访问,未超过阈值的话才会对某个时间片+1
* @param threshold
* @return
*/
public boolean allow(int threshold) {
int index = locationIndex();
int sum = 0;
int oldCursor = cursor.getAndSet(index);
if (oldCursor != index) {
timeSlices[index].set(0);
clearBetween(oldCursor, index);
}
for (int i = 0; i < windowSize; i++) {
sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
}
// 阈值判断
if (sum < threshold) {
// 未超过阈值才+1
timeSlices[index].incrementAndGet();
return true;
}
return false;
}
/**
* <p>将fromIndex~toIndex之间的时间片计数都清零
* <p>极端情况下,当循环队列已经走了超过1个timeSliceSize以上,这里的清零并不能如期望的进行
* @param fromIndex 不包含
* @param toIndex 不包含
*/
private void clearBetween(int fromIndex, int toIndex) {
for (int index = (fromIndex + 1) % timeSliceSize; index != toIndex; index = (index + 1) % timeSliceSize) {
timeSlices[index].set(0);
}
}
public static void main(String[] args) {
EnjoySlidingWindow window = new EnjoySlidingWindow(5, Time.MILLISECONDS);
for (int i = 0; i < 10; i++) {
System.out.println(window.allow(7));
}
}
}