十三、soul源码学习-http请求链路跟踪

前面我们从配置的修改是如何更新SoulAdmin本地缓存的,再到网关和SoulAdmin是如何同步数据等,讲解了数据同步的机制,是为了保证我们网关能够正确的处理请求,并针对配置的插件进行正确的处理,接下来我们从一个真正的用户请求http到网关以及如何最后到我们真正请求的整个链路做一下分析

SoulWebHandler,是网关请求的入口。

//org.dromara.soul.web.handler.SoulWebHandler
//实现了WebHandler
public final class SoulWebHandler implements WebHandler {

    private final List<SoulPlugin> plugins;

    private final Scheduler scheduler;

    /**
     * 初始化的时候注入所有的SoulPlugin插件
     */
    public SoulWebHandler(final List<SoulPlugin> plugins) {
        this.plugins = plugins;
        String schedulerType = System.getProperty("soul.scheduler.type", "fixed");
        if (Objects.equals(schedulerType, "fixed")) {
            int threads = Integer.parseInt(System.getProperty(
                    "soul.work.threads", "" + Math.max((Runtime.getRuntime().availableProcessors() << 1) + 1, 16)));
            scheduler = Schedulers.newParallel("soul-work-threads", threads);
        } else {
            scheduler = Schedulers.elastic();
        }
    }
}

在我们请求过来的时候,会走到handle

//org.dromara.soul.web.handler.SoulWebHandler#handle
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
  //监控相关
  MetricsTrackerFacade.getInstance().counterInc(MetricsLabelEnum.REQUEST_TOTAL.getName());
  Optional<HistogramMetricsTrackerDelegate> startTimer = MetricsTrackerFacade.getInstance().histogramStartTimer(MetricsLabelEnum.REQUEST_LATENCY.getName());
  //构造DefaultSoulPluginChain,默认的插件链进行处理
  return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler)
    .doOnSuccess(t -> startTimer.ifPresent(time -> MetricsTrackerFacade.getInstance().histogramObserveDuration(time)));
}

DefaultSoulPluginChain 使用了责任链的设计模式,针对一个请求,对所有的插件进行过滤

//org.dromara.soul.web.handler.SoulWebHandler.DefaultSoulPluginChain
private static class DefaultSoulPluginChain implements SoulPluginChain {

  private int index;

  private final List<SoulPlugin> plugins;
  
  DefaultSoulPluginChain(final List<SoulPlugin> plugins) {
    this.plugins = plugins;
  }

  /**
         * Delegate to the next {@code WebFilter} in the chain.
         *
         * @param exchange the current server exchange
         * @return {@code Mono<Void>} to indicate when request handling is complete
         */
  @Override
  public Mono<Void> execute(final ServerWebExchange exchange) {
    return Mono.defer(() -> {
      if (this.index < plugins.size()) {
        SoulPlugin plugin = plugins.get(this.index++);
        Boolean skip = plugin.skip(exchange);
        if (skip) {
          return this.execute(exchange);
        }
        return plugin.execute(exchange, this);
      }
      return Mono.empty();
    });
  }
}

我们看到plugins是按照顺序循环处理的,而且每次的顺序是一致的,GlobalPlugin肯定在第一位,这是怎么实现的,我们看下GlobalPlugin插件

//org.dromara.soul.plugin.global.GlobalPlugin
public class GlobalPlugin implements SoulPlugin {
    
    private final SoulContextBuilder builder;
    
    public GlobalPlugin(final SoulContextBuilder builder) {
        this.builder = builder;
    }
  
  //通过getOrder保证初始化的顺序
    @Override
    public int getOrder() {
        return 0;
    }
    
}

通过看getOrder的调用方我们发现

//org.dromara.soul.web.configuration.SoulConfiguration
@Configuration
@ComponentScan("org.dromara.soul")
@Import(value = {ErrorHandlerConfiguration.class, SoulExtConfiguration.class, SpringExtConfiguration.class})
@Slf4j
public class SoulConfiguration {

   
    @Bean("webHandler")
    public SoulWebHandler soulWebHandler(final ObjectProvider<List<SoulPlugin>> plugins) {
        List<SoulPlugin> pluginList = plugins.getIfAvailable(Collections::emptyList);
      //在这里进行重排序
        final List<SoulPlugin> soulPlugins = pluginList.stream()
                .sorted(Comparator.comparingInt(SoulPlugin::getOrder)).collect(Collectors.toList());
        soulPlugins.forEach(soulPlugin -> log.info("load plugin:[{}] [{}]", soulPlugin.named(), soulPlugin.getClass().getName()));
        return new SoulWebHandler(soulPlugins);
    }
}

所以 插件的顺序是定义好的,每次请求的第一个肯定是GlobalPlugin。GlobalPlugin是最先执行的插件

//org.dromara.soul.plugin.global.GlobalPlugin
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
  final ServerHttpRequest request = exchange.getRequest();
  final HttpHeaders headers = request.getHeaders();
  final String upgrade = headers.getFirst("Upgrade");
  SoulContext soulContext;
  //先忽略Upgrade,普通请求upgrade为空
  if (StringUtils.isBlank(upgrade) || !"websocket".equals(upgrade)) {
    soulContext = builder.build(exchange);
  } else {
    final MultiValueMap<String, String> queryParams = request.getQueryParams();
    soulContext = transformMap(queryParams);
  }
  exchange.getAttributes().put(Constants.CONTEXT, soulContext);
  return chain.execute(exchange);
}

这里会走到DefaultSoulContextBuilder

//org.dromara.soul.plugin.global.DefaultSoulContextBuilder
@Override
public SoulContext build(final ServerWebExchange exchange) {
  final ServerHttpRequest request = exchange.getRequest();
  //获取到请求的path
  String path = request.getURI().getPath();
  //http先不关注metaData
  MetaData metaData = MetaDataCache.getInstance().obtain(path);
  if (Objects.nonNull(metaData) && metaData.getEnabled()) {
    exchange.getAttributes().put(Constants.META_DATA, metaData);
  }
  //将请求和元数据转换成SoulContext
  return transform(request, metaData);
}
//org.dromara.soul.plugin.global.DefaultSoulContextBuilder#transform
//构造Soul的上下文信息
private SoulContext transform(final ServerHttpRequest request, final MetaData metaData) {
  //Constants.APP_KEY = appKey
  final String appKey = request.getHeaders().getFirst(Constants.APP_KEY);
  //Constants.SIGN = sign
  final String sign = request.getHeaders().getFirst(Constants.SIGN);
  //Constants.TIMESTAMP = timestamp
  final String timestamp = request.getHeaders().getFirst(Constants.TIMESTAMP);
  //从header获取信息
  SoulContext soulContext = new SoulContext();
  String path = request.getURI().getPath();
  soulContext.setPath(path);
  //判断元数据信息,通过元数据来拍断当前的请求是属于什么类型
  if (Objects.nonNull(metaData) && metaData.getEnabled()) {
    if (RpcTypeEnum.SPRING_CLOUD.getName().equals(metaData.getRpcType())) {
      setSoulContextByHttp(soulContext, path);
      soulContext.setRpcType(metaData.getRpcType());
    } else if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {
      setSoulContextByDubbo(soulContext, metaData);
    } else if (RpcTypeEnum.SOFA.getName().equals(metaData.getRpcType())) {
      setSoulContextBySofa(soulContext, metaData);
    } else if (RpcTypeEnum.TARS.getName().equals(metaData.getRpcType())) {
      setSoulContextByTars(soulContext, metaData);
    } else {
      setSoulContextByHttp(soulContext, path);
      soulContext.setRpcType(RpcTypeEnum.HTTP.getName());
    }
    //默认当成http处理
  } else {
    setSoulContextByHttp(soulContext, path);
    soulContext.setRpcType(RpcTypeEnum.HTTP.getName());
  }
  //注入必要信息
  soulContext.setAppKey(appKey);
  soulContext.setSign(sign);
  soulContext.setTimestamp(timestamp);
  soulContext.setStartDateTime(LocalDateTime.now());
  Optional.ofNullable(request.getMethod()).ifPresent(httpMethod -> soulContext.setHttpMethod(httpMethod.name()));
  return soulContext;
}

GlobalPlugin是最关键的插件,通过上面流程,构造Soul的上下文,从而使得后面的插件判断才有依据来决定是走哪一个插件

通过责任链,依次循环调用所有的插件,直到中间某个插件匹配调用生效为止。我们来看下http请求,最终会命中Divide插件,我们来看下DividePlugin插件

//org.dromara.soul.plugin.divide.DividePlugin#skip
@Override
public Boolean skip(final ServerWebExchange exchange) {
  //GlobalPlugin构造的上下文
  final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
  //DividePlugin会判断当前的SoulContext的RpcType是否是Http
  return !Objects.equals(Objects.requireNonNull(soulContext).getRpcType(), RpcTypeEnum.HTTP.getName());
}

发现不需要跳过后,会进入AbstractSoulPlugin的execute。包括刚才的GlobalPlugin也会经过这里

//org.dromara.soul.plugin.base.AbstractSoulPlugin#execute
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
  String pluginName = named();
  //从本地缓存中获取PluginData数据,这里的本地缓存就是之前我们讲的数据同步所维护的缓存
  final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
  //判断plugin是否为空并且开启
  if (pluginData != null && pluginData.getEnabled()) {
    //在获取本地Selector数据缓存。获取SelectorData
    final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
    //如果该插件selector为空会走这里,里面逻辑实际上就是调用下一个插件
    if (CollectionUtils.isEmpty(selectors)) {
      return handleSelectorIsNull(pluginName, exchange, chain);
    }
    //如果Selectors不为空,则去看是否匹配,匹配逻辑暂时先不展开讲,之后再看
    final SelectorData selectorData = matchSelector(exchange, selectors);
    //如果匹配为空则继续调用下一个插件
    if (Objects.isNull(selectorData)) {
      return handleSelectorIsNull(pluginName, exchange, chain);
    }
    selectorLog(selectorData, pluginName);
    //在获取规则数据
    final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
    //规则为空则继续执行下一个插件
    if (CollectionUtils.isEmpty(rules)) {
      return handleRuleIsNull(pluginName, exchange, chain);
    }
    RuleData rule;
    //如果是全流量
    if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
      //则获取最后一个规则
      rule = rules.get(rules.size() - 1);
    } else {
      //判断是否有匹配规则,具体匹配校验之后再说
      rule = matchRule(exchange, rules);
    }
    //如果规则为空,继续执行下一个插件
    if (Objects.isNull(rule)) {
      return handleRuleIsNull(pluginName, exchange, chain);
    }
    ruleLog(rule, pluginName);
    //如果规则不为空,则调用doExecute。对应插件的具体实现
    return doExecute(exchange, chain, selectorData, rule);
  }
  return chain.execute(exchange);
}
//org.dromara.soul.plugin.divide.DividePlugin
public class DividePlugin extends AbstractSoulPlugin {

  
    @Override
    protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
      //这里拿到之前GlobalPlugin的上下文
        final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
        assert soulContext != null;
      //获取Divide规则处理器,DivideRuleHandle包含了负载均衡策略以及重试次数和超时时间信息
        final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class);
      //这里根据之前的探活机制,获取到对应选择器的上游服务列表
        final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
      //如果为空则抛异常,并直接返回WebFlux结果
        if (CollectionUtils.isEmpty(upstreamList)) {
            log.error("divide upstream configuration error: {}", rule.toString());
            Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
            return WebFluxResultUtils.result(exchange, error);
        }
      //获取当前调用方IP
        final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
      //通过负载均衡获取对应的上游
        DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
      //如果上游为空则返回异常信息
        if (Objects.isNull(divideUpstream)) {
            log.error("divide has no upstream");
            Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
            return WebFluxResultUtils.result(exchange, error);
        }
        // 注入必要信息
        String domain = buildDomain(divideUpstream);
        String realURL = buildRealURL(domain, soulContext, exchange);
        exchange.getAttributes().put(Constants.HTTP_URL, realURL);
      //在调用下一个插件
        exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
        exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
        return chain.execute(exchange);
    }
}

我们发现Divide插件并没有真正的去调用,而是主要做一些获取上游服务器列表以及根据负载均衡选择一个有效的远端服务,并注入到对应的属性中,供后面使用,真正调用远端的插件式WebClientPlugin插件

//org.dromara.soul.plugin.httpclient.WebClientPlugin#execute
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
  //获取上下文
  final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
  assert soulContext != null;
  //获取url地址
  String urlPath = exchange.getAttribute(Constants.HTTP_URL);
  if (StringUtils.isEmpty(urlPath)) {
    Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
    return WebFluxResultUtils.result(exchange, error);
  }
  long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
  int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0);
  log.info("The request urlPath is {}, retryTimes is {}", urlPath, retryTimes);
  HttpMethod method = HttpMethod.valueOf(exchange.getRequest().getMethodValue());
  WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(urlPath);
  //调用远端服务
  return handleRequestBody(requestBodySpec, exchange, timeout, retryTimes, chain);
}
//org.dromara.soul.plugin.httpclient.WebClientPlugin#handleRequestBody
private Mono<Void> handleRequestBody(final WebClient.RequestBodySpec requestBodySpec,
                                         final ServerWebExchange exchange,
                                         final long timeout,
                                         final int retryTimes,
                                         final SoulPluginChain chain) {
  //使用异步编程方式调用远端服务并返回结果
  return requestBodySpec.headers(httpHeaders -> {
    httpHeaders.addAll(exchange.getRequest().getHeaders());
    httpHeaders.remove(HttpHeaders.HOST);
  })
    .contentType(buildMediaType(exchange))
    .body(BodyInserters.fromDataBuffers(exchange.getRequest().getBody()))
    .exchange()
    .doOnError(e -> log.error(e.getMessage()))
    .timeout(Duration.ofMillis(timeout))
    .retryWhen(Retry.onlyIf(x -> x.exception() instanceof ConnectTimeoutException)
               .retryMax(retryTimes)
               .backoff(Backoff.exponential(Duration.ofMillis(200), Duration.ofSeconds(20), 2, true)))
    .flatMap(e -> doNext(e, exchange, chain));

}

到这里我们就完整走了一遍http的链路跟踪,中间还有很多其他的细节需要之后在讲解

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,692评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,482评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,995评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,223评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,245评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,208评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,091评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,929评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,346评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,570评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,739评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,437评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,037评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,677评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,833评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,760评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,647评论 2 354

推荐阅读更多精彩内容