一:目标:实现任意滑动计数器的抽象,保证高性能
此抽象可以提供给限流 熔断 防刷等统计模块使用,进行任意时间的统计与计数,高性能,线程安全是基本要求
二:算法如下
- 数组模拟时间轮
- 当前使用的数据为当前时间对数组大小进行取模确定.
累加逻辑为:当前数组元素往前n个大小进行累加 - 清理逻辑分两种实现
3.1 数组大小=滑动窗口大小+未来n个时间单位的大小,定时任务提前清理未来n个时间单位大小
3.2 使用时清理,不使用buffer.需要一个纪录最后一次使用位置的指针,当使用时清理最后一次位置到当前位置的数据包括当前位置,考虑并发调用的 情况,此清理过程只能一个线程进行清理,
清理后把最后一次的指针指向当前位置,释放锁,其他线程才能进入再次执行次逻辑,通常其他线程进行后发现当前指针跟自己持有的指针相同, 不清理跳过。统计逻辑也类似
3.3 3.1逻辑较为简单,3.2比较复杂,但是少了提前预留的n个大小
可以用次抽象实现任意单位时间的n秒滑动计数器
三:实现
提前清理
public class TimeRollingScheduleCleanCounter {
private static final ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
private static final ConcurrentLinkedQueue<TimeRollingScheduleCleanCounter> queue = new ConcurrentLinkedQueue<>();
private static final int nextBufferSize=10;
public static void register(TimeRollingScheduleCleanCounter timeRollingCounterScheduleClean) {
queue.add(timeRollingCounterScheduleClean);
}
static {
service.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for (TimeRollingScheduleCleanCounter clean : queue) {
int index = currentIndex(clean.getBucketsSize());
for(int i=0;i<nextBufferSize;i++){
clean.buckets[(index+i+clean.getBucketsSize())%clean.getBucketsSize()].set(0);
}
}
}
}, 1, 1, TimeUnit.SECONDS);
}
/**
* 滑动计数器对应的bucket
*/
private AtomicLong[] buckets;
private int bucketsSize;
public TimeRollingScheduleCleanCounter(int size) {
bucketsSize=size+nextBufferSize;
buckets = new AtomicLong[bucketsSize];
for (int i = 0; i < buckets.length; i++) {
buckets[i] = new AtomicLong(0);
}
register(this);
}
public void add(Long count) {
buckets[currentIndex(bucketsSize)].getAndAdd(count);
}
public long accumulative(int n) {
if (n > bucketsSize-nextBufferSize) {
throw new IllegalArgumentException("params is too large");
}
int start = currentIndex(bucketsSize);
long total = 0;
for (int i = 0; i < n; i++) {
total += buckets[(start - i+bucketsSize) % bucketsSize].get();
}
return total;
}
public int getBucketsSize() {
return bucketsSize;
}
private static int currentIndex(int bucketsSize) {
long currentTime = System.currentTimeMillis() / 1000;
return (int) (currentTime % bucketsSize);
}
public long increment() {
return buckets[currentIndex(bucketsSize)].incrementAndGet();
}
}
使用时清理
public class TimeRollingCounter {
private static Logger logger = LoggerFactory.getLogger(TimeRollingCounter.class);
/**
* 滑动计数器对应的bucket
*/
private AtomicInteger[] buckets;
//private LongAdder[] longAdders;
/**
* bucket对应的时间
*/
private AtomicLong currentBucketTime;
/**
* 桶的默认大小
*/
private int bucketSize =1;
private AtomicBoolean windowRolling = new AtomicBoolean(false);
private static final long MILLIS_IN_ONE_SECOND=1000L;//1秒等于1000毫秒
/**
* 给当前时间的计数器增加1
* 如果当前时间和桶的当前时间差大于BUCKET_SIZE,重置所有的bucket
* @return
*/
public int increment(){
final long targetBucketTime = getNowSecond();//秒转换成毫秒
final int targetBucketIndex = this.calIndex(targetBucketTime);
//窗口滑动,重置计数器
this.windowRolling(targetBucketTime,this.currentBucketTime.get());
//增加当前计数
buckets[targetBucketIndex].incrementAndGet();
//返回当前计数值
return buckets[targetBucketIndex].intValue();
}
/**
* 获取之前N秒的计数之和
* @param n
* @return
*/
public int accumulative(int n){
if(n> bucketSize){
n= bucketSize;
}
final long targetBucketTime = getNowSecond();
final int targetBucketIndex = this.calIndex(targetBucketTime);
//窗口滑动,重置计数器
this.windowRolling(targetBucketTime,this.currentBucketTime.get());
AtomicInteger beforeSum = new AtomicInteger(0);
for(int i=0;i<n;i++){
beforeSum.addAndGet(buckets[(targetBucketIndex+ bucketSize -i)% bucketSize].intValue());
}
return beforeSum.intValue();
}
/**
* 初始化一个当前时间的全0计数器,并可以设置bucket大小
*/
public TimeRollingCounter(int size, TimeUnit timeUnit){
final long currentTime = getNowSecond();
bucketSize =size;
buckets=new AtomicInteger[bucketSize];
for(int i = 0; i< bucketSize; i++){
buckets[i]=new AtomicInteger(0);
}
currentBucketTime=new AtomicLong(currentTime);
}
/**
* 重置计数器
*/
public void reset(){
final long currentTime = getNowSecond();;
for(AtomicInteger counter:buckets){
counter.set(0);
}
currentBucketTime.set(currentTime);
}
/**
* 窗口滑动,使用自旋滑动
* @param targetBucketTime
* @param currentBucketTime
*/
private void windowRolling(final long targetBucketTime,Long currentBucketTime){
if(targetBucketTime > currentBucketTime){
while(!windowRolling.compareAndSet(false,true)){
}
try {
//重新获取值
currentBucketTime = this.currentBucketTime.get();
//计数器已经被其它线程滑动了,不再滑动,防止误操作数据
if (currentBucketTime >= targetBucketTime) {
return ;
}
final int sep = this.calSep(currentBucketTime, targetBucketTime);
final int currentIndex = this.calIndex(currentBucketTime);
//重置计数器
this.resetBucket(currentIndex, sep);
//重新设置当前时间
this.currentBucketTime.set(targetBucketTime);
} catch(Exception e){
logger.error("Window rolling error",e);
}finally {
windowRolling.set(false);
}
}
}
/**
* 时间差大于BUCKET_SIZE,重置bucket
* 否则重置两个窗口区间的脏数据,注意这是一个循环数组
* @param currentBucketIndex
* @param sep
*/
private void resetBucket(final int currentBucketIndex,int sep){
//时间差大于buck_size,重置所有的bucket
if(sep>= bucketSize){
for(AtomicInteger counter:buckets){
counter.set(0);
}
}else {
//循环队列
if(sep<0){
sep= bucketSize +sep;
}
//时间差在1到sep+1秒之间,重置之间的区间,包括新使用的本身
for(int i=1;i<sep+1;i++){
buckets[(currentBucketIndex+i)% bucketSize].set(0);
}
}
}
/**
* 计算指定时间对应的bucket索引
* @param currentTime
* @return
*/
private int calIndex(Long currentTime){
int currIndex = Long.valueOf(currentTime% bucketSize).intValue();
return currIndex;
}
/**
* 计算两个指定时间差,以秒为单位,可能为负数
* @param beforeTime
* @param currentTime
* @return
*/
private int calSep(Long beforeTime,Long currentTime){
return Long.valueOf(currentTime-beforeTime).intValue();
}
public int getBucketSize() {
return bucketSize;
}
/**
* 获取当前的秒数
* @return
*/
public static long getNowSecond(){
return System.currentTimeMillis()/MILLIS_IN_ONE_SECOND;
}
}
三:选择哪种
1.理解成本
代码更多的时间是在做维护,上面复杂的写法很多时候在做解释,review更是耗费巨大时间,修改一行代码也设计成本很高,以前的实现也有bug,参考我之前的分析
[janus本地防刷代码问题分析]
2.gc成本
创建是注册到队列上,无法回收掉,因为无法判断何时移除。当然可以换一种实现,提供一个可以拿到当前使用所有计数器的接口。
我们目前场景用完后都会放缓存队列,暂时不存在该问题
3.清理损耗
定时器一秒清理未来buffer 数量,但我们场景存在百万级别的数量,是否能快速清理掉,需要测试1s的清理速度,理论上两次清理完成间隔相差十秒就满足条件
根据我们场景,大概会有将近百万的计数器(统计ip session的防刷)本地测试能在1s内清理掉,但cpu却一致高负载,此方案在百万级别的情况下会损耗不少cpu资源
数量增大到5百万,有其他使用cpu时,清理时间不稳定,但在预期内可以清理掉。
测试过程遇到卡住,为old区满掉,频繁gc但无空间所致。
综上,3的cpu消耗不能容忍,只能选择较为复杂的方案实现