前言
假设你领导给你安排了一个任务,具体需求如下:
针对具体的接口做限流
不同接口限流的力度可以不同
可以动态调整限流配置,实时生效
如果你接到上面的任务,你会怎么去设计+实现呢?
每个人看待问题的角度不同,自然思考出来的方案也不同,正所谓条条大路通罗马,能到达目的地的路那就是一条好路。
如何分析需求
下面我给出我的实现方式,仅供各位参考。
具体问题具体分析,针对需求点,分别去做分析。
需求一 “如何针对具体的接口做限流” ,只需要让KeyResolver返回的是接口的URI即可,这样限流的维度那就是对这个接口进行限流。
需求二 “不同接口限流的力度可以不同” 这个通过配置的方式明显实现不了,配置中的replenishRate和burstCapacity都是配置死的,如果要做成动态的那么必须的自己通过扩展RedisRateLimiter来实现。
前提是必须有一个配置列表,这个配置列表就是每个接口对应的限流数值。有了这个配置我们就可以通过请求的接口获取这个接口对应的限流值。
需求三“可以动态调整限流配置,实时生效” 这个的话也比较容易,无论你是存文件,存数据库,存缓存只要每次都去读取,必然是实时生效的,但是性能问题我们不得不考虑啊。
存文件,读取文件,耗IO,主要是不方便修改
存数据库,可以通过web界面去修改,也可以直接改数据库,每次都要查询,性能不行
存分布式缓存(redis),性能比数据库有提高
对比下来肯定是缓存是最优的方案,还有更好的方案吗?
有,结合配置中心来做,我这边用自己的配置中心(https://github.com/yinjihuan/smconf)来讲解,换成其他的配置中心也是一样的思路。
配置中心的优点在于它本来就是用来存储配置的,配置在项目启动时加载完毕,当有修改时推送更新,每次读取都在本地对象中,性能好。
具体方案有了之后我们就可以开始撸代码了,但是你有想过这么多接口的限流值怎么初始化吗?手动一个个去加?
不同的服务维护的小组不同,当然也有可能是一个小组维护,从设计者的角度来思考,应该把设置的权利交给用户,交给我们的接口开发者,每个接口能够承受多少并发让用户来定,你的职责就是在网关进行限流。当然在公司中具体的限制量也不一定会由开发人员来定哈,这个得根据压测结果,做最好的调整。
话不多说-开始撸码
首先我们定义自己的RedisRateLimiter,复制源码稍微改造下即可, 这边只贴核心代码。
public class CustomRedisRateLimiter extends AbstractRateLimiter
implements ApplicationContextAware {
publicstaticfinalString CONFIGURATION_PROPERTY_NAME ="custom-redis-rate-limiter";publicstaticfinalString REDIS_SCRIPT_NAME ="redisRequestRateLimiterScript";publicstaticfinalString REMAINING_HEADER ="X-RateLimit-Remaining";publicstaticfinalString REPLENISH_RATE_HEADER ="X-RateLimit-Replenish-Rate";publicstaticfinalString BURST_CAPACITY_HEADER ="X-RateLimit-Burst-Capacity";publicCustomRedisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate, RedisScript<List<Long>> script,
Validator validator){super(Config.class, CONFIGURATION_PROPERTY_NAME, validator);this.redisTemplate = redisTemplate;this.script = script; initialized.compareAndSet(false,true);}publicCustomRedisRateLimiter(intdefaultReplenishRate,intdefaultBurstCapacity){super(Config.class, CONFIGURATION_PROPERTY_NAME,null);this.defaultConfig =newConfig().setReplenishRate(defaultReplenishRate).setBurstCapacity(defaultBurstCapacity);}
// 限流配置
private RateLimitConf rateLimitConf;
@Override@SuppressWarnings("unchecked")publicvoidsetApplicationContext(ApplicationContext context) throws BeansException{ **// 加载配置**this.rateLimitConf = context.getBean(RateLimitConf.class); }/**
* This uses a basic token bucket algorithm and relies on the fact that
* Redis scripts execute atomically. No other operations can run between
* fetching the count and writing the new count.
*/@Override@SuppressWarnings("unchecked")publicMonoisAllowed(String routeId, String id){if(!this.initialized.get()) {thrownewIllegalStateException("RedisRateLimiter is not initialized"); }//Config routeConfig = getConfig().getOrDefault(routeId, defaultConfig);if(rateLimitConf ==null) {thrownewIllegalArgumentException("No Configuration found for route "+ routeId); } Map routeConfig = rateLimitConf.getLimitMap();// Key的格式:服务名称.接口URI.类型String replenishRateKey = routeId +"."+ id +".replenishRate";intreplenishRate = routeConfig.get(replenishRateKey) ==null? routeConfig.get("default.replenishRate") : routeConfig.get(replenishRateKey); String burstCapacityKey = routeId +"."+ id +".burstCapacity";intburstCapacity = routeConfig.get(burstCapacityKey) ==null? routeConfig.get("default.burstCapacity") : routeConfig.get(burstCapacityKey);try{ List keys = getKeys(id);// The arguments to the LUA script. time() returns unixtime in// seconds.List scriptArgs = Arrays.asList(replenishRate +"", burstCapacity +"", Instant.now().getEpochSecond() +"","1");// allowed, tokens_left = redis.eval(SCRIPT, keys, args)Flux> flux =this.redisTemplate.execute(this.script, keys, scriptArgs);// .log("redisratelimiter", Level.FINER);returnflux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L,-1L))) .reduce(newArrayList(), (longs, l) -> { longs.addAll(l);returnlongs; }).map(results -> { boolean allowed = results.get(0) ==1L; Long tokensLeft = results.get(1); Response response =newResponse(allowed, getHeaders(replenishRate, burstCapacity, tokensLeft));if(log.isDebugEnabled()) { log.debug("response: "+ response); }returnresponse; }); }catch(Exception e) {/*
* We don't want a hard dependency on Redis to allow traffic. Make
* sure to set an alert so you know if this is happening too much.
* Stripe's observed failure rate is 0.01%.
*/log.error("Error determining if user allowed from redis", e); }returnMono.just(newResponse(true, getHeaders(replenishRate, burstCapacity,-1L)));}publicHashMapgetHeaders(Integer replenishRate, Integer burstCapacity, Long tokensLeft){ HashMap headers =newHashMap<>(); headers.put(this.remainingHeader, tokensLeft.toString()); headers.put(this.replenishRateHeader, String.valueOf(replenishRate)); headers.put(this.burstCapacityHeader, String.valueOf(burstCapacity));returnheaders;}
}
需要在setApplicationContext中加载我们的配置类,配置类的定义如下:
@CxytianDiConf(system="fangjia-gateway")
public class RateLimitConf {
// 限流配置
@ConfField(value = "limitMap")
private Map limitMap = new HashMap(){{
put("default.replenishRate", 100);
put("default.burstCapacity", 1000);
}};
public void setLimitMap(Map limitMap) {
this.limitMap = limitMap;
}
public Map getLimitMap() {
return limitMap;
}
}
所有的接口对应的限流信息都在map中,有默认值,如果没有对应的配置就用默认的值对接口进行限流。
isAllowed方法中通过‘服务名称.接口URI.类型’组成一个Key, 通过这个Key去Map中获取对应的值。
类型的作用主要是用来区分replenishRate和burstCapacity两个值。
接下来就是配置CustomRedisRateLimiter:
@Beanbr/>@Primary
public CustomRedisRateLimiter customRedisRateLimiter(
ReactiveRedisTemplate redisTemplate,
@Qualifier(CustomRedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript> redisScript,
Validator validator) {
return new CustomRedisRateLimiter(redisTemplate, redisScript, validator);
}
网关这边的逻辑已经实现好了,接下来就是需要在具体的服务中自定义注解,然后将限流的参数初始化到我们的配置中心就可以了。
@Target(ElementType.METHOD)br/>@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ApiRateLimit {
* 速率
* @return
*/intreplenishRate()default100;/**
* 容积
* @return
*/intburstCapacity()default1000;
}
启动监听器,读取注解,初始化配置
/**
初始化API网关需要进行并发限制的API
@author yinjihuan
*/
public class InitGatewayApiLimitRateListener implements ApplicationListener {
// Controller包路径
private String controllerPath;
private RateLimitConf rateLimitConf;
private ConfInit confInit;
private String applicationName;
public InitGatewayApiLimitRateListener(String controllerPath) {
this.controllerPath = controllerPath;
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
this.rateLimitConf = event.getApplicationContext().getBean(RateLimitConf.class);
this.confInit = event.getApplicationContext().getBean(ConfInit.class);
this.applicationName = event.getApplicationContext().getEnvironment().getProperty("spring.application.name");
try {
initLimitRateAPI();
} catch (Exception e) {
throw new RuntimeException("初始化需要进行并发限制的API异常", e);
}
}
/**
初始化需要进行并发限制的API
@throws IOException
@throws ClassNotFoundException
*/
private void initLimitRateAPI() throws IOException, ClassNotFoundException {
Map limitMap = rateLimitConf.getLimitMap();
ClasspathPackageScannerUtils scan = new ClasspathPackageScannerUtils(this.controllerPath);
List classList = scan.getFullyQualifiedClassNameList();
for (String clazz : classList) {
Class clz = Class.forName(clazz);
if (!clz.isAnnotationPresent(RestController.class)) {
continue;
}
Method[] methods = clz.getDeclaredMethods();
for (Method method : methods) {
if (method.isAnnotationPresent(ApiRateLimit.class)) {
ApiRateLimit apiRateLimit = method.getAnnotation(ApiRateLimit.class);
String replenishRateKey = applicationName + "." + getApiUri(clz, method) + ".replenishRate";
String burstCapacityKey = applicationName + "." + getApiUri(clz, method) + ".burstCapacity";
limitMap.put(replenishRateKey, apiRateLimit.replenishRate());
limitMap.put(burstCapacityKey, apiRateLimit.burstCapacity());
}
}
}
rateLimitConf.setLimitMap(limitMap);
// 初始化值到配置中心
confInit.init(rateLimitConf);
}
private String getApiUri(Class clz, Method method) {
StringBuilder uri = new StringBuilder();
uri.append(clz.getAnnotation(RequestMapping.class).value()[0]);
if (method.isAnnotationPresent(GetMapping.class)) {
uri.append(method.getAnnotation(GetMapping.class).value()[0]);
} else if (method.isAnnotationPresent(PostMapping.class)) {
uri.append(method.getAnnotation(PostMapping.class).value()[0]);
} else if (method.isAnnotationPresent(RequestMapping.class)) {
uri.append(method.getAnnotation(RequestMapping.class).value()[0]);
}
return uri.toString();
}
}
配置监听器
SpringApplication application = new SpringApplication(FshHouseServiceApplication.class);
application.addListeners(new InitGatewayApiLimitRateListener("com.fangjia.fsh.house.controller"));
context = application.run(args);
最后使用就很简单了,只需要增加注解就可以了
@ApiRateLimit(replenishRate=10, burstCapacity=100)br/>@GetMapping("/data")
public HouseInfo getData(@RequestParam("name") String name) {
return new HouseInfo(1L, "上海", "虹口", "东体小区");
}
我这边只是给大家提供一种去实现的思路,也许大家还有更好的方案。
我觉得只要不让每个开发都去关心这种非业务性质的功能,那就可以了,都在框架层面处理掉。当然实现原理可以跟大家分享下,会用很好,既会用又了解原理那就更好了。
欢迎工作一到五年的Java工程师朋友们加入Java技术交流:659270626
本群提供免费的学习指导 架构资料 以及免费的解答
不懂得问题都可以在本群提出来 之后还会有职业生涯规划以及面试指导