/**
* 限流器初始化
*
**/
@Component
public class RateLimiterInitializer implements InitializingBean {
@Resource
private ApplicationContext applicationContext;
@Override
public void afterPropertiesSet() {
Map<String, Object> map = applicationContext.getBeansWithAnnotation(UseLimiter.class);
for (Map.Entry<String, Object> entry : map.entrySet()) {
Object val = entry.getValue();
Class<?> clazz = AopUtils.getTargetClass(val);
Method[] methods = clazz.getMethods();
for (Method m : methods) {
if (!m.isAnnotationPresent(Limiter.class)) {
continue;
}
StringBuilder params = new StringBuilder();
params.append(m.getName()).append(";");
for (Type type : m.getGenericParameterTypes()) {
params.append(type.getTypeName()).append(";");
}
params.append(m.getAnnotatedReturnType().getType());
Limiter limiter = m.getAnnotation(Limiter.class);
RateLimitContextHolder.put(params.toString(), limiter.permits());
}
}
}
}
```java
// aop处理
/**
* <h2>限流处理</h2>
* <pre>
* 当相应的参数大于0则进入到相应的处理流程中;
*
* </pre>
**/
@Component
@Aspect
public class RateLimitAspect {
// 拦截方法上的@Limiter
@Pointcut("@annotation(.Limiter)")
public void point() {
//
}
@Around("point()")
public Object limit(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
MethodSignature methodSignature = (MethodSignature) proceedingJoinPoint.getSignature();
Limiter limit = methodSignature.getMethod().getDeclaredAnnotation(Limiter.class);
if (limit.timeout() > 0) {
return circuitBreaker(proceedingJoinPoint, limit.timeout());
} else if (limit.permits() > 0) {
StringBuilder key = new StringBuilder();
key.append(methodSignature.getMethod().getName()).append(";");
for (Type type : methodSignature.getMethod().getGenericParameterTypes()) {
key.append(type.getTypeName()).append(";");
}
key.append(methodSignature.getMethod().getAnnotatedReturnType().getType());
Semaphore limiter = RateLimitContextHolder.get(key.toString());
try {
log.info("限流执行中,剩余可用令牌数:{}, 参数:{}, 方法:{}",
limiter == null ? "empty" : limiter.availablePermits(), key, methodSignature.getMethod());
if (limiter != null && !limiter.tryAcquire()) {
throw new BizException("请求频率超过限制,请稍后再试!");
}
return proceedingJoinPoint.proceed();
} finally {
// 释放
if(limiter != null){
limiter.release();
}
}
}
return null;
}
/**
* 熔断器
*
* @param proceedingJoinPoint point
* @param timeout timeout
* @return 处理结果
*/
private Object circuitBreaker(ProceedingJoinPoint proceedingJoinPoint, int timeout) {
ExecutorService es = Executors.newFixedThreadPool(2);
Future<Object> future = es.submit(() -> {
try {
return proceedingJoinPoint.proceed();
} catch (Throwable throwable) {
throw new BizException("处理异常,请稍后再试!");
}
});
final Object obj;
try {
obj = future.get(timeout, TimeUnit.MILLISECONDS);
return obj;
} catch (Throwable e) {
future.cancel(true);
throw new BizException("处理异常,请稍后再试!");
}
}
}