Hystrix插件图
Bootstrap依赖模块
网关插件启动关键段
// @see org.dromara.soul.web.configuration.SoulConfiguration
@Bean("webHandler")
public SoulWebHandler soulWebHandler(final ObjectProvider<List<SoulPlugin>> plugins) {
// 将所有的网关插件,挂载到webHandler中;且按照顺序挂载
List<SoulPlugin> pluginList = plugins.getIfAvailable(Collections::emptyList);
final List<SoulPlugin> soulPlugins = pluginList.stream()
.sorted(Comparator.comparingInt(SoulPlugin::getOrder)).collect(Collectors.toList());
soulPlugins.forEach(soulPlugin -> log.info("load plugin:[{}] [{}]", soulPlugin.named(), soulPlugin.getClass().getName()));
return new SoulWebHandler(soulPlugins);
}
- 推测
webHandler
会将传入的插件列表,生成插件链,在处理流程中依次处理
// @see org.dromara.soul.web.handler.SoulWebHandler
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
MetricsTrackerFacade.getInstance().counterInc(MetricsLabelEnum.REQUEST_TOTAL.getName());
Optional<HistogramMetricsTrackerDelegate> startTimer = MetricsTrackerFacade.getInstance().histogramStartTimer(MetricsLabelEnum.REQUEST_LATENCY.getName());
// 直接将插件列表生成一个插件链DefaultSoulPluginChain,责任链的execute方法会遍历所有插件列表,依次处理
// Mono的subscribeOn没看明白???【就是讲线程池绑定到Mono请求流程中】;难道是经过Mono的请求,都会提交给这个线程池执行?Mono本身是有默认线程池吧,这里为什么要自用线程池?应该是自行创建的Mono对象
return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler)
.doOnSuccess(t -> startTimer.ifPresent(time -> MetricsTrackerFacade.getInstance().histogramObserveDuration(time)));
}
- 我们看下网关插件链
DefaultSoulPluginChain.execute
的逻辑实现
// 1. 执行方法,遍历插件列表,并调用插件的执行逻辑
// 2. 返回Mono对象,能方便的使用Reactive的事件驱动编程的机制
public Mono<Void> execute(final ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < plugins.size()) {
SoulPlugin plugin = plugins.get(this.index++);
// 先判断当前请求是否满足插件处理的逻辑,如果不满足,则跳过不处理;否则进行处理
Boolean skip = plugin.skip(exchange);
if (skip) {
// 跳过,直接执行下一个插件
return this.execute(exchange);
}
// 执行当前插件
return plugin.execute(exchange, this);
}
// 所有插件执行完毕
return Mono.empty();
});
}
- 我们看下插件
HystrixPlugin
的执行逻辑,从上面的类图可以得知是执行的AbstractSoulPlugin
的execute
方法
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
String pluginName = named();
// 获取插件的配置数据
final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
if (pluginData != null && pluginData.getEnabled()) {
// 获取插件选择器
final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
// 插件选择器为空的处理
if (CollectionUtils.isEmpty(selectors)) {
return handleSelectorIsNull(pluginName, exchange, chain);
}
// 匹配选择器
final SelectorData selectorData = matchSelector(exchange, selectors);
if (Objects.isNull(selectorData)) {
return handleSelectorIsNull(pluginName, exchange, chain);
}
selectorLog(selectorData, pluginName);
// 获取选择器的规则
final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
// 规则为空处理
if (CollectionUtils.isEmpty(rules)) {
return handleRuleIsNull(pluginName, exchange, chain);
}
// 匹配规则
RuleData rule;
if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
//get last
rule = rules.get(rules.size() - 1);
} else {
rule = matchRule(exchange, rules);
}
// 未匹配到规则的处理
if (Objects.isNull(rule)) {
return handleRuleIsNull(pluginName, exchange, chain);
}
ruleLog(rule, pluginName);
// 将匹配到插件配置数据,传递给插件本身处理自身的业务逻辑
return doExecute(exchange, chain, selectorData, rule);
}
// 插件数据没有,则直接进入到下一个插件
return chain.execute(exchange);
}
- 再看
HystrixPlugin
的doExecute
方法
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
assert soulContext != null;
// 构造hystrix处理
final HystrixHandle hystrixHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), HystrixHandle.class);
if (StringUtils.isBlank(hystrixHandle.getGroupKey())) {
hystrixHandle.setGroupKey(Objects.requireNonNull(soulContext).getModule());
}
if (StringUtils.isBlank(hystrixHandle.getCommandKey())) {
hystrixHandle.setCommandKey(Objects.requireNonNull(soulContext).getMethod());
}
// 生成hystrix command
Command command = fetchCommand(hystrixHandle, exchange, chain);
// 这里就木有看懂了,需额外学习RxJava
return Mono.create(s -> {
Subscription sub = command.fetchObservable().subscribe(s::success,
s::error, s::success);
s.onCancel(sub::unsubscribe);
if (command.isCircuitBreakerOpen()) {
log.error("hystrix execute have circuitBreaker is Open! groupKey:{},commandKey:{}", hystrixHandle.getGroupKey(), hystrixHandle.getCommandKey());
}
}).doOnError(throwable -> {
log.error("hystrix execute exception:", throwable);
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.ERROR.getName());
chain.execute(exchange);
}).then();
}
TODO
- soul网关如何调用hystrix进行控制的?
- Hystrix原理学习
- RxJava学习