实现一个简单的限流器

/**
 * 限流器初始化
 * 
 **/
@Component
public class RateLimiterInitializer implements InitializingBean {
    @Resource
    private ApplicationContext applicationContext;

    @Override
    public void afterPropertiesSet() {
        Map<String, Object> map = applicationContext.getBeansWithAnnotation(UseLimiter.class);
        for (Map.Entry<String, Object> entry : map.entrySet()) {
            Object val = entry.getValue();
            Class<?> clazz = AopUtils.getTargetClass(val);
            Method[] methods = clazz.getMethods();
            for (Method m : methods) {
                if (!m.isAnnotationPresent(Limiter.class)) {
                    continue;
                }

                StringBuilder params = new StringBuilder();
                params.append(m.getName()).append(";");
                for (Type type : m.getGenericParameterTypes()) {
                    params.append(type.getTypeName()).append(";");
                }
                params.append(m.getAnnotatedReturnType().getType());
                Limiter limiter = m.getAnnotation(Limiter.class);
                RateLimitContextHolder.put(params.toString(), limiter.permits());
            }

        }

    }

}

```java
//  aop处理

/**
 * <h2>限流处理</h2>
 * <pre>
 *     当相应的参数大于0则进入到相应的处理流程中;
 *   
 * </pre>
 **/
@Component
@Aspect
public class RateLimitAspect {

   // 拦截方法上的@Limiter
    @Pointcut("@annotation(.Limiter)")
    public void point() {
        //
    }

    @Around("point()")
    public Object limit(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
        MethodSignature methodSignature = (MethodSignature) proceedingJoinPoint.getSignature();
        Limiter limit = methodSignature.getMethod().getDeclaredAnnotation(Limiter.class);
        if (limit.timeout() > 0) {
            return circuitBreaker(proceedingJoinPoint, limit.timeout());
        } else if (limit.permits() > 0) {
            StringBuilder key = new StringBuilder();
            key.append(methodSignature.getMethod().getName()).append(";");
            for (Type type : methodSignature.getMethod().getGenericParameterTypes()) {
                key.append(type.getTypeName()).append(";");
            }
            key.append(methodSignature.getMethod().getAnnotatedReturnType().getType());

            Semaphore limiter = RateLimitContextHolder.get(key.toString());
            try {
                log.info("限流执行中,剩余可用令牌数:{}, 参数:{}, 方法:{}",
                        limiter == null ? "empty" : limiter.availablePermits(), key, methodSignature.getMethod());

                if (limiter != null && !limiter.tryAcquire()) {
                    throw new BizException("请求频率超过限制,请稍后再试!");
                }
                return proceedingJoinPoint.proceed();
            } finally {
                // 释放
                if(limiter != null){
                    limiter.release();
                }
            }

        }
        return null;
    }


    /**
     * 熔断器
     *
     * @param proceedingJoinPoint point
     * @param timeout             timeout
     * @return 处理结果
     */
    private Object circuitBreaker(ProceedingJoinPoint proceedingJoinPoint, int timeout) {
        ExecutorService es = Executors.newFixedThreadPool(2);
        Future<Object> future = es.submit(() -> {
            try {
                return proceedingJoinPoint.proceed();
            } catch (Throwable throwable) {
                throw new BizException("处理异常,请稍后再试!");
            }
        });

        final Object obj;
        try {
            obj = future.get(timeout, TimeUnit.MILLISECONDS);
            return obj;
        } catch (Throwable e) {
            future.cancel(true);
            throw new BizException("处理异常,请稍后再试!");
        }
    }

}





最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容