Spring Cloud Gateway 结合配置中心限流

前言

假设你领导给你安排了一个任务,具体需求如下:

针对具体的接口做限流

不同接口限流的力度可以不同

可以动态调整限流配置,实时生效

如果你接到上面的任务,你会怎么去设计+实现呢?

每个人看待问题的角度不同,自然思考出来的方案也不同,正所谓条条大路通罗马,能到达目的地的路那就是一条好路。

如何分析需求

下面我给出我的实现方式,仅供各位参考。

具体问题具体分析,针对需求点,分别去做分析。

需求一 “如何针对具体的接口做限流” ,只需要让KeyResolver返回的是接口的URI即可,这样限流的维度那就是对这个接口进行限流。

需求二 “不同接口限流的力度可以不同” 这个通过配置的方式明显实现不了,配置中的replenishRate和burstCapacity都是配置死的,如果要做成动态的那么必须的自己通过扩展RedisRateLimiter来实现。

前提是必须有一个配置列表,这个配置列表就是每个接口对应的限流数值。有了这个配置我们就可以通过请求的接口获取这个接口对应的限流值。

需求三“可以动态调整限流配置,实时生效” 这个的话也比较容易,无论你是存文件,存数据库,存缓存只要每次都去读取,必然是实时生效的,但是性能问题我们不得不考虑啊。

存文件,读取文件,耗IO,主要是不方便修改 

存数据库,可以通过web界面去修改,也可以直接改数据库,每次都要查询,性能不行 

存分布式缓存(redis),性能比数据库有提高

对比下来肯定是缓存是最优的方案,还有更好的方案吗? 

有,结合配置中心来做,我这边用自己的配置中心(https://github.com/yinjihuan/smconf)来讲解,换成其他的配置中心也是一样的思路

配置中心的优点在于它本来就是用来存储配置的,配置在项目启动时加载完毕,当有修改时推送更新,每次读取都在本地对象中,性能好。

具体方案有了之后我们就可以开始撸代码了,但是你有想过这么多接口的限流值怎么初始化吗?手动一个个去加?

不同的服务维护的小组不同,当然也有可能是一个小组维护,从设计者的角度来思考,应该把设置的权利交给用户,交给我们的接口开发者,每个接口能够承受多少并发让用户来定,你的职责就是在网关进行限流。当然在公司中具体的限制量也不一定会由开发人员来定哈,这个得根据压测结果,做最好的调整。

话不多说-开始撸码

首先我们定义自己的RedisRateLimiter,复制源码稍微改造下即可, 这边只贴核心代码。

public class CustomRedisRateLimiter extends AbstractRateLimiter

implements ApplicationContextAware {

publicstaticfinalString CONFIGURATION_PROPERTY_NAME ="custom-redis-rate-limiter";publicstaticfinalString REDIS_SCRIPT_NAME ="redisRequestRateLimiterScript";publicstaticfinalString REMAINING_HEADER ="X-RateLimit-Remaining";publicstaticfinalString REPLENISH_RATE_HEADER ="X-RateLimit-Replenish-Rate";publicstaticfinalString BURST_CAPACITY_HEADER ="X-RateLimit-Burst-Capacity";publicCustomRedisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate, RedisScript<List<Long>> script,

        Validator validator){super(Config.class, CONFIGURATION_PROPERTY_NAME, validator);this.redisTemplate = redisTemplate;this.script = script;    initialized.compareAndSet(false,true);}publicCustomRedisRateLimiter(intdefaultReplenishRate,intdefaultBurstCapacity){super(Config.class, CONFIGURATION_PROPERTY_NAME,null);this.defaultConfig =newConfig().setReplenishRate(defaultReplenishRate).setBurstCapacity(defaultBurstCapacity);}

// 限流配置

private RateLimitConf rateLimitConf;

@Override@SuppressWarnings("unchecked")publicvoidsetApplicationContext(ApplicationContext context) throws BeansException{  **// 加载配置**this.rateLimitConf = context.getBean(RateLimitConf.class);  }/**

* This uses a basic token bucket algorithm and relies on the fact that

* Redis scripts execute atomically. No other operations can run between

* fetching the count and writing the new count.

*/@Override@SuppressWarnings("unchecked")publicMonoisAllowed(String routeId, String id){if(!this.initialized.get()) {thrownewIllegalStateException("RedisRateLimiter is not initialized");    }//Config routeConfig = getConfig().getOrDefault(routeId, defaultConfig);if(rateLimitConf ==null) {thrownewIllegalArgumentException("No Configuration found for route "+ routeId);    }    Map routeConfig = rateLimitConf.getLimitMap();// Key的格式:服务名称.接口URI.类型String replenishRateKey = routeId +"."+ id +".replenishRate";intreplenishRate = routeConfig.get(replenishRateKey) ==null? routeConfig.get("default.replenishRate") : routeConfig.get(replenishRateKey);    String burstCapacityKey = routeId +"."+ id +".burstCapacity";intburstCapacity = routeConfig.get(burstCapacityKey) ==null? routeConfig.get("default.burstCapacity") : routeConfig.get(burstCapacityKey);try{        List keys = getKeys(id);// The arguments to the LUA script. time() returns unixtime in// seconds.List scriptArgs = Arrays.asList(replenishRate +"", burstCapacity +"",                Instant.now().getEpochSecond() +"","1");// allowed, tokens_left = redis.eval(SCRIPT, keys, args)Flux> flux =this.redisTemplate.execute(this.script, keys, scriptArgs);// .log("redisratelimiter", Level.FINER);returnflux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L,-1L)))                .reduce(newArrayList(), (longs, l) -> {                    longs.addAll(l);returnlongs;                }).map(results -> {                    boolean allowed = results.get(0) ==1L;                    Long tokensLeft = results.get(1);                    Response response =newResponse(allowed, getHeaders(replenishRate, burstCapacity, tokensLeft));if(log.isDebugEnabled()) {                        log.debug("response: "+ response);                    }returnresponse;                });    }catch(Exception e) {/*

        * We don't want a hard dependency on Redis to allow traffic. Make

        * sure to set an alert so you know if this is happening too much.

        * Stripe's observed failure rate is 0.01%.

        */log.error("Error determining if user allowed from redis", e);    }returnMono.just(newResponse(true, getHeaders(replenishRate, burstCapacity,-1L)));}publicHashMapgetHeaders(Integer replenishRate, Integer burstCapacity, Long tokensLeft){    HashMap headers =newHashMap<>();    headers.put(this.remainingHeader, tokensLeft.toString());    headers.put(this.replenishRateHeader, String.valueOf(replenishRate));    headers.put(this.burstCapacityHeader, String.valueOf(burstCapacity));returnheaders;}

}

需要在setApplicationContext中加载我们的配置类,配置类的定义如下:

@CxytianDiConf(system="fangjia-gateway")

public class RateLimitConf {

// 限流配置

@ConfField(value = "limitMap")

private Map limitMap = new HashMap(){{

put("default.replenishRate", 100);

put("default.burstCapacity", 1000);

}};

public void setLimitMap(Map limitMap) {

this.limitMap = limitMap;

}

public Map getLimitMap() {

return limitMap;

}

}

所有的接口对应的限流信息都在map中,有默认值,如果没有对应的配置就用默认的值对接口进行限流。

isAllowed方法中通过‘服务名称.接口URI.类型’组成一个Key, 通过这个Key去Map中获取对应的值。

类型的作用主要是用来区分replenishRate和burstCapacity两个值。

接下来就是配置CustomRedisRateLimiter:

@Beanbr/>@Primary

public CustomRedisRateLimiter customRedisRateLimiter(

ReactiveRedisTemplate redisTemplate, 

@Qualifier(CustomRedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript> redisScript,

Validator validator) {

return new CustomRedisRateLimiter(redisTemplate, redisScript, validator);

}

网关这边的逻辑已经实现好了,接下来就是需要在具体的服务中自定义注解,然后将限流的参数初始化到我们的配置中心就可以了。

定义注解

@Target(ElementType.METHOD)br/>@Retention(RetentionPolicy.RUNTIME)

@Documented

public @interface ApiRateLimit {

/**

* 速率

* @return

*/intreplenishRate()default100;/**

* 容积

* @return

*/intburstCapacity()default1000;

}

启动监听器,读取注解,初始化配置

/**

初始化API网关需要进行并发限制的API

@author yinjihuan

*/

public class InitGatewayApiLimitRateListener implements ApplicationListener {

// Controller包路径

private String controllerPath;

private RateLimitConf rateLimitConf;

private ConfInit confInit;

private String applicationName;

public InitGatewayApiLimitRateListener(String controllerPath) {

this.controllerPath = controllerPath;

}

@Override

public void onApplicationEvent(ApplicationReadyEvent event) {

this.rateLimitConf = event.getApplicationContext().getBean(RateLimitConf.class);

this.confInit = event.getApplicationContext().getBean(ConfInit.class);

this.applicationName = event.getApplicationContext().getEnvironment().getProperty("spring.application.name");

try {

initLimitRateAPI();

} catch (Exception e) {

throw new RuntimeException("初始化需要进行并发限制的API异常", e);

}

}

/**

初始化需要进行并发限制的API

@throws IOException

@throws ClassNotFoundException

*/

private void initLimitRateAPI() throws IOException, ClassNotFoundException {

Map limitMap = rateLimitConf.getLimitMap();

ClasspathPackageScannerUtils scan = new ClasspathPackageScannerUtils(this.controllerPath);

List classList = scan.getFullyQualifiedClassNameList();

for (String clazz : classList) {

Class clz = Class.forName(clazz);

if (!clz.isAnnotationPresent(RestController.class)) {

continue;

}

Method[] methods = clz.getDeclaredMethods();

for (Method method : methods) {

if (method.isAnnotationPresent(ApiRateLimit.class)) {

ApiRateLimit apiRateLimit = method.getAnnotation(ApiRateLimit.class);

String replenishRateKey = applicationName + "." + getApiUri(clz, method) + ".replenishRate";

String burstCapacityKey = applicationName + "." + getApiUri(clz, method) + ".burstCapacity";

limitMap.put(replenishRateKey, apiRateLimit.replenishRate());

limitMap.put(burstCapacityKey, apiRateLimit.burstCapacity());

}

}

}

rateLimitConf.setLimitMap(limitMap);

// 初始化值到配置中心

confInit.init(rateLimitConf);

}

private String getApiUri(Class clz, Method method) {

StringBuilder uri = new StringBuilder();

uri.append(clz.getAnnotation(RequestMapping.class).value()[0]);

if (method.isAnnotationPresent(GetMapping.class)) {

uri.append(method.getAnnotation(GetMapping.class).value()[0]);

} else if (method.isAnnotationPresent(PostMapping.class)) {

uri.append(method.getAnnotation(PostMapping.class).value()[0]);

} else if (method.isAnnotationPresent(RequestMapping.class)) {

uri.append(method.getAnnotation(RequestMapping.class).value()[0]);

}

return uri.toString();

}

}

配置监听器

SpringApplication application = new SpringApplication(FshHouseServiceApplication.class);

application.addListeners(new InitGatewayApiLimitRateListener("com.fangjia.fsh.house.controller"));

context = application.run(args);

最后使用就很简单了,只需要增加注解就可以了

@ApiRateLimit(replenishRate=10, burstCapacity=100)br/>@GetMapping("/data")

public HouseInfo getData(@RequestParam("name") String name) {

return new HouseInfo(1L, "上海", "虹口", "东体小区");

}

总结

我这边只是给大家提供一种去实现的思路,也许大家还有更好的方案。

我觉得只要不让每个开发都去关心这种非业务性质的功能,那就可以了,都在框架层面处理掉。当然实现原理可以跟大家分享下,会用很好,既会用又了解原理那就更好了。

欢迎工作一到五年的Java工程师朋友们加入Java技术交流:659270626

本群提供免费的学习指导 架构资料 以及免费的解答

不懂得问题都可以在本群提出来 之后还会有职业生涯规划以及面试指导

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,133评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,682评论 3 390
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,784评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,508评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,603评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,607评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,604评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,359评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,805评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,121评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,280评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,959评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,588评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,206评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,193评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,144评论 2 352

推荐阅读更多精彩内容