滑动计数器实现算法对比

一:目标:实现任意滑动计数器的抽象,保证高性能

此抽象可以提供给限流 熔断 防刷等统计模块使用,进行任意时间的统计与计数,高性能,线程安全是基本要求

二:算法如下

  1. 数组模拟时间轮
  2. 当前使用的数据为当前时间对数组大小进行取模确定.
    累加逻辑为:当前数组元素往前n个大小进行累加
  3. 清理逻辑分两种实现
    3.1 数组大小=滑动窗口大小+未来n个时间单位的大小,定时任务提前清理未来n个时间单位大小
    3.2 使用时清理,不使用buffer.需要一个纪录最后一次使用位置的指针,当使用时清理最后一次位置到当前位置的数据包括当前位置,考虑并发调用的 情况,此清理过程只能一个线程进行清理,
    清理后把最后一次的指针指向当前位置,释放锁,其他线程才能进入再次执行次逻辑,通常其他线程进行后发现当前指针跟自己持有的指针相同, 不清理跳过。统计逻辑也类似
    3.3 3.1逻辑较为简单,3.2比较复杂,但是少了提前预留的n个大小
    可以用次抽象实现任意单位时间的n秒滑动计数器

三:实现

提前清理

public class TimeRollingScheduleCleanCounter {


  private static final ScheduledExecutorService service = Executors.newScheduledThreadPool(1);


  private static final ConcurrentLinkedQueue<TimeRollingScheduleCleanCounter> queue = new ConcurrentLinkedQueue<>();

  private static final int nextBufferSize=10;


  public static void register(TimeRollingScheduleCleanCounter timeRollingCounterScheduleClean) {
      queue.add(timeRollingCounterScheduleClean);
  }

  static {
      service.scheduleAtFixedRate(new Runnable() {
          @Override
          public void run() {

              for (TimeRollingScheduleCleanCounter clean : queue) {
                  int index = currentIndex(clean.getBucketsSize());
                  for(int i=0;i<nextBufferSize;i++){
                      clean.buckets[(index+i+clean.getBucketsSize())%clean.getBucketsSize()].set(0);
                  }
              }
          }
      }, 1, 1, TimeUnit.SECONDS);
  }


  /**
   * 滑动计数器对应的bucket
   */
  private AtomicLong[] buckets;

  private int bucketsSize;

  public TimeRollingScheduleCleanCounter(int size) {
      bucketsSize=size+nextBufferSize;
      buckets = new AtomicLong[bucketsSize];
      for (int i = 0; i < buckets.length; i++) {
          buckets[i] = new AtomicLong(0);
      }
      register(this);
  }

  public void add(Long count) {
      buckets[currentIndex(bucketsSize)].getAndAdd(count);
  }


  public long accumulative(int n) {
      if (n > bucketsSize-nextBufferSize) {
          throw new IllegalArgumentException("params is too large");
      }
      int start = currentIndex(bucketsSize);
      long total = 0;
      for (int i = 0; i < n; i++) {
          total += buckets[(start - i+bucketsSize) % bucketsSize].get();
      }
      return total;
  }

  public int getBucketsSize() {
      return bucketsSize;
  }

  private static int currentIndex(int bucketsSize) {
      long currentTime = System.currentTimeMillis() / 1000;
      return (int) (currentTime % bucketsSize);
  }

  public long increment() {
      return buckets[currentIndex(bucketsSize)].incrementAndGet();
  }
  }

使用时清理

public class TimeRollingCounter {

  private static Logger logger = LoggerFactory.getLogger(TimeRollingCounter.class);
   
 /**
  * 滑动计数器对应的bucket
  */
 private AtomicInteger[] buckets;

 //private LongAdder[] longAdders;

 /**
  * bucket对应的时间
  */
 private AtomicLong currentBucketTime;

 /**
  * 桶的默认大小
  */
 private  int bucketSize =1;

 private AtomicBoolean windowRolling = new AtomicBoolean(false);

 private static final long MILLIS_IN_ONE_SECOND=1000L;//1秒等于1000毫秒
  
 /**
  * 给当前时间的计数器增加1
  * 如果当前时间和桶的当前时间差大于BUCKET_SIZE,重置所有的bucket
  * @return
  */
 public int increment(){
    final long targetBucketTime = getNowSecond();//秒转换成毫秒
    final int targetBucketIndex = this.calIndex(targetBucketTime);
      //窗口滑动,重置计数器
      this.windowRolling(targetBucketTime,this.currentBucketTime.get());
    //增加当前计数
    buckets[targetBucketIndex].incrementAndGet();
    //返回当前计数值
    return buckets[targetBucketIndex].intValue();
 }

 /**
  * 获取之前N秒的计数之和
  * @param n
  * @return
  */
 public int accumulative(int n){
    if(n> bucketSize){
       n= bucketSize;
    }
    final long targetBucketTime = getNowSecond();
    final int targetBucketIndex = this.calIndex(targetBucketTime);
      //窗口滑动,重置计数器
      this.windowRolling(targetBucketTime,this.currentBucketTime.get());
    AtomicInteger beforeSum = new AtomicInteger(0);
    for(int i=0;i<n;i++){
       beforeSum.addAndGet(buckets[(targetBucketIndex+ bucketSize -i)% bucketSize].intValue());
    }
    return beforeSum.intValue();
 }


 /**
  * 初始化一个当前时间的全0计数器,并可以设置bucket大小
  */
 public TimeRollingCounter(int size, TimeUnit timeUnit){
    final long currentTime = getNowSecond();
    bucketSize =size;
    buckets=new AtomicInteger[bucketSize];
    for(int i = 0; i< bucketSize; i++){
       buckets[i]=new AtomicInteger(0);
    }
    currentBucketTime=new AtomicLong(currentTime);
 }

 /**
  * 重置计数器
  */
 public void reset(){
    final long currentTime = getNowSecond();;
    for(AtomicInteger counter:buckets){
       counter.set(0);
    }
    currentBucketTime.set(currentTime);
 }

  /**
   * 窗口滑动,使用自旋滑动
   * @param targetBucketTime
   * @param currentBucketTime
   */
  private void windowRolling(final long targetBucketTime,Long currentBucketTime){
      if(targetBucketTime > currentBucketTime){

          while(!windowRolling.compareAndSet(false,true)){
       }
       try {
          //重新获取值
          currentBucketTime = this.currentBucketTime.get();
          //计数器已经被其它线程滑动了,不再滑动,防止误操作数据
          if (currentBucketTime >= targetBucketTime) {
             return ;
          }
          final int sep = this.calSep(currentBucketTime, targetBucketTime);
          final int currentIndex = this.calIndex(currentBucketTime);
          //重置计数器
          this.resetBucket(currentIndex, sep);
          //重新设置当前时间
          this.currentBucketTime.set(targetBucketTime);

       } catch(Exception e){
          logger.error("Window rolling error",e);
       }finally {
          windowRolling.set(false);
       }
      }
  }

 /**
  * 时间差大于BUCKET_SIZE,重置bucket
  * 否则重置两个窗口区间的脏数据,注意这是一个循环数组
  * @param currentBucketIndex
  * @param sep
  */
 private void resetBucket(final int currentBucketIndex,int sep){
    //时间差大于buck_size,重置所有的bucket
    if(sep>= bucketSize){
       for(AtomicInteger counter:buckets){
          counter.set(0);
       }
    }else {
       //循环队列
       if(sep<0){
          sep= bucketSize +sep;
       }
       //时间差在1到sep+1秒之间,重置之间的区间,包括新使用的本身
       for(int i=1;i<sep+1;i++){
          buckets[(currentBucketIndex+i)% bucketSize].set(0);
       }
    }
     
 }

 /**
  * 计算指定时间对应的bucket索引
  * @param currentTime
  * @return
  */
 private int calIndex(Long currentTime){
    int currIndex = Long.valueOf(currentTime% bucketSize).intValue();
    return currIndex;
 }

 /**
  * 计算两个指定时间差,以秒为单位,可能为负数
  * @param beforeTime
  * @param currentTime
  * @return
  */
 private int calSep(Long beforeTime,Long currentTime){
    return Long.valueOf(currentTime-beforeTime).intValue();
 }


 public int getBucketSize() {
    return bucketSize;
 }

 /**
  * 获取当前的秒数
  * @return
  */
 public static long getNowSecond(){
    return System.currentTimeMillis()/MILLIS_IN_ONE_SECOND;
 }
  }

三:选择哪种

1.理解成本

代码更多的时间是在做维护,上面复杂的写法很多时候在做解释,review更是耗费巨大时间,修改一行代码也设计成本很高,以前的实现也有bug,参考我之前的分析

[janus本地防刷代码问题分析]

2.gc成本

创建是注册到队列上,无法回收掉,因为无法判断何时移除。当然可以换一种实现,提供一个可以拿到当前使用所有计数器的接口。

我们目前场景用完后都会放缓存队列,暂时不存在该问题

3.清理损耗

定时器一秒清理未来buffer 数量,但我们场景存在百万级别的数量,是否能快速清理掉,需要测试1s的清理速度,理论上两次清理完成间隔相差十秒就满足条件

根据我们场景,大概会有将近百万的计数器(统计ip session的防刷)本地测试能在1s内清理掉,但cpu却一致高负载,此方案在百万级别的情况下会损耗不少cpu资源

数量增大到5百万,有其他使用cpu时,清理时间不稳定,但在预期内可以清理掉。

测试过程遇到卡住,为old区满掉,频繁gc但无空间所致。

综上,3的cpu消耗不能容忍,只能选择较为复杂的方案实现

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容