序
双向队列(Deque),是Queue的一个子接口,双向队列是指该队列两端的元素既能入队(offer)也能出队(poll)。使用场景比如工作窃取,比如限流。
限流实例
使用deque来限流,其中timeIntervalInMs为事件窗口,maxLimit为该事件窗口的最大值。
public class MyRateLimiter {
private static final Logger LOGGER = LoggerFactory.getLogger(DemoRateLimiter.class);
private final Deque<Long> queue;
private long timeIntervalInMs;
public MyRateLimiter(long timeIntervalInMs, int maxLimit) {
this.timeIntervalInMs = timeIntervalInMs;
this.queue = new LinkedBlockingDeque<Long>(maxLimit);
}
public boolean incrAndReachLimit(){
long currentTimeMillis = System.currentTimeMillis();
boolean success = queue.offerFirst(currentTimeMillis);
if(success){
//没有超过maxLimit
return false;
}
synchronized (this){
//queue is full
long last = queue.getLast();
//还在时间窗口内,超过maxLimit
if (currentTimeMillis - last < timeIntervalInMs) {
return true;
}
LOGGER.info("time window expired,current:{},last:{}",currentTimeMillis,last);
//超过时间窗口了,超过maxLimit的情况下,重置时间窗口
queue.removeLast();
queue.addFirst(currentTimeMillis);
return false;
}
}
}
测试
@Test
public void testDeque() throws InterruptedException {
DemoRateLimiter limiter = new DemoRateLimiter(5*1000,3);
Callable<Void> test = new Callable<Void>(){
@Override
public Void call() throws Exception {
for(int i=0;i<1000;i++){
LOGGER.info("result:{}",limiter.incrAndReachLimit());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return null;
}
};
ExecutorService pool = Executors.newFixedThreadPool(10);
pool.invokeAll(Arrays.asList(test,test,test,test,test));
Thread.sleep(100000);
}
小结
这里使用了Deque的容量来作为时间窗口的限流大小,利用两端来判断时间窗口,相对来讲有点巧妙。