1基于数组实现
package top.algorithm.mall.utils;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author zhff
* @version 1.0
* @date 2020/6/14 8:31
*/
public class SlidingTimeWindowTest {
// 已经限制得请求数
public static IntegerlimitCount=0;
//已经通过得请求数
public static IntegerpassCount=0;
// 每秒均分陈几块
public static Integerslot=2;
// 限制请求书
public static Integerlimit=50;
public static AtomicInteger[]array=new AtomicInteger[slot];
// 时间间隔
public static int interval=1000/slot;
//通过重新一秒,开始记录时间
public static LonglastTime=System.currentTimeMillis();
//重置请求数所用索引
public volatile static IntegerinitIndex=0;
//是否进入新一轮重置
public static volatile boolean initFlag=false;
static {
for(int i=0;i
array[i]=new AtomicInteger(0);
}
}
static volatile boolean isLimit =false;
public static void main(String[] args)throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
try {
new SlidingTimeWindowTest().doCheck();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
BlockingQueue blockingQueue=new ArrayBlockingQueue(10000);
ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(100,100,
10, TimeUnit.SECONDS,blockingQueue);
int m=10000;
Long l1=System.currentTimeMillis();
while(m>0){
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
try {
new SlidingTimeWindowTest().addCount();
}catch (Exception e) {
e.printStackTrace();
}
}
});
Thread.sleep(3);
m--;
}
Long l2=System.currentTimeMillis();
System.out.println("实际通过总数"+passCount+"===="+limitCount);
System.out.println("预估总数"+limit*(l2-l1)/1000 );
System.out.println("限制"+limit);
System.out.println("实际限流每秒"+passCount*1000/(l2-l1));
}
//单个线程检查 ,决定是否限流
public void doCheck(){
while(true){
Long l1=System.currentTimeMillis();
int inter= (int) (l1-lastTime);
if(inter>1000){
lastTime=l1;
initIndex=0;
initFlag=true;
array[initIndex++].set(0);
}else{
if(initFlag&&inter/interval>initIndex&&initIndex
array[initIndex++].set(0);
}
}
int now=0;
for(int i=0;i
now=now+array[i].intValue();
}
// 计算目前已经通过得请求数,是否超出限制
isLimit=limit<=now;
}
}
// 请求入口
public void addCount()throws InterruptedException {
Long l2=System.currentTimeMillis();
int index= (int) ((l2 -lastTime) %slot);
if(isLimit){
System.out.println("限流了");
limitCount++;
}else{
System.out.println("正在执行");
array[index].addAndGet(1);
passCount++;
Thread.sleep(50);
// 模拟单个请求耗时
}
}
}
2基于链表实现
package top.algorithm.mall.utils;
import java.util.LinkedList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author zhff
* @version 1.0
* @date 2020/6/14 15:59
*/
public class SlidingTimeWindowLink {
public static IntegerlimitCount=0;
public static IntegerpassCount=0;
public static Integerslot=2;
public static Integerlimit=20;
public static LinkedListlinkedList=new LinkedList();
public static int interval=1000/slot;
public static LonglastTime=System.currentTimeMillis();
public static LonglastIntervalTime=System.currentTimeMillis();
public static IntegerinitIndex=0;
public static boolean initFlag=false;
static {
for(int i=0;i
linkedList.add(new AtomicInteger(0));
}
}
static boolean isLimit =false;
public static void main(String[] args)throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
try {
new SlidingTimeWindowLink().doCheck();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
BlockingQueue blockingQueue=new ArrayBlockingQueue(10000);
ThreadPoolExecutor threadPoolExecutor=new ThreadPoolExecutor(30,50,
10, TimeUnit.SECONDS,blockingQueue);
int m=10000;
Long l1=System.currentTimeMillis();
while(m>0){
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
try {
new SlidingTimeWindowLink().addCount();
}catch (Exception e) {
e.printStackTrace();
}
}
});
Thread.sleep(3);
m--;
}
Long l2=System.currentTimeMillis();
System.out.println((l2-l1)+"===========");
}
public void doCheck(){
while(true){
Long l1=System.currentTimeMillis();
int inter= (int) (l1-lastTime);
if(inter>=100){
lastTime=l1;
linkedList.removeFirst();
linkedList.addLast(new AtomicInteger());
}
int now=0;
for(int i=0;i
now=now+linkedList.get(i).intValue();
}
isLimit=limit<=now;
//isLimit=limit<=(linkedList.getLast().intValue()-linkedList.getFirst().intValue());
}
}
public void addCount()throws InterruptedException {
Long l2=System.currentTimeMillis();
int index= (int) ((l2 -lastTime) %slot);
if(isLimit){
System.out.println("限流了");
limitCount++;
}else{
System.out.println("正在执行");
linkedList.get(index).addAndGet(1);
passCount++;
Thread.sleep(40);
}
System.out.println(passCount+"==="+limitCount);
}
}