深度解析 Soul 网关——数据同步

引子

从官网 clone 代码下来后,依次启动 soul-admin、soul-bootstrap 和 soul-examples-http 模块,启动成功后查看 soul-admin 会发现 soul-examples-http 模块里的代理配置和路由规则已经自动同步到了 soul-admin 上,通过 postman 调用发现配置也自动生效。

soul-examples-http 核心配置

soul:
  http: 
    adminUrl: http://localhost:9095
    port: 8188
    contextPath: /http
    appName: http
    full: false

soul-examples-http 主要接口

接口类

soul-admin 自动更新到的配置

devide 配置全局


更新到的配置

选择器详情


自动更新到的选择器配置

可以看到选择器已经读取了 soul-examples-http 配置文件中的信息,并且 soul-admin 自动获取到了soul-examples-http 服务的 IP 和端口信息。

postman 验证

直接请求 soul-examples-http 服务


在这里插入图片描述

通过网关转发到 soul-examples-http 服务


在这里插入图片描述

结果符合预期,证明 soul-examples-http 的配置自动同步给了 soul-admin 和 soul-bootstrap,接下来就研究一下 soul 是怎么实现这一块逻辑的。

客户端向 soul-admin 同步

配置分析

通过查看 soul-examples-http 的配置文件,可以看到有配置 soul-admin 的地址

soul:
  http:
    adminUrl: http://localhost:9095

跟进这个配置映射 Java 实体,找到了soul-client-springmvc 下的SoulSpringMvcConfig类,这个类定值了配置相关的选择

public class SoulSpringMvcConfig {
    /**
     * soul-admin 地址
     */
    private String adminUrl;
    /**
     * 代理服务上下文地址
     */
    private String contextPath;
    /**
     * 服务名
     */
    private String appName;    
    /**
     * 是否全局代理
     */
    private boolean full;    
    /**
     * 服务 host 地址
     */
    private String host;    
    /**
     * 服务端口
     */
    private Integer port;
}

客户端源码解析

查看配置类的调用方,追踪到两个类,分别是 ContextRegisterListener 和 SpringMvcClientBeanPostProcessor,分别在项目启动完成后执行和 bean 扫描后执行。具体代码和解析如下:

public class ContextRegisterListener implements ApplicationListener<ContextRefreshedEvent> {
    private final AtomicBoolean registered = new AtomicBoolean(false);

    private final String url;

    private final SoulSpringMvcConfig soulSpringMvcConfig;

    /**
     * Instantiates a new Context register listener.
     *
     * @param soulSpringMvcConfig the soul spring mvc config
     */
    public ContextRegisterListener(final SoulSpringMvcConfig soulSpringMvcConfig) {
        ValidateUtils.validate(soulSpringMvcConfig);
        this.soulSpringMvcConfig = soulSpringMvcConfig;
        url = soulSpringMvcConfig.getAdminUrl() + "/soul-client/springmvc-register";
    }

    @Override
    public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {
        if (!registered.compareAndSet(false, true)) {
            return;
        }
        if (soulSpringMvcConfig.isFull()) {
            RegisterUtils.doRegister(buildJsonParams(), url, RpcTypeEnum.HTTP);
        }
    }
    private String buildJsonParams() {
        String contextPath = soulSpringMvcConfig.getContextPath();
        String appName = soulSpringMvcConfig.getAppName();
        Integer port = soulSpringMvcConfig.getPort();
        String path = contextPath + "/**";
        String configHost = soulSpringMvcConfig.getHost();
        String host = StringUtils.isBlank(configHost) ? IpUtils.getHost() : configHost;
        SpringMvcRegisterDTO registerDTO = SpringMvcRegisterDTO.builder()
                .context(contextPath)
                .host(host)
                .port(port)
                .appName(appName)
                .path(path)
                .rpcType(RpcTypeEnum.HTTP.getName())
                .enabled(true)
                .ruleName(path)
                .build();
        return OkHttpTools.getInstance().getGson().toJson(registerDTO);
    }
}

核心代码逻辑是当配置为全局代理的时候,调用 soul-admin 的/soul-client/springmvc-register 接口,将配置信息同步给 soul-admin。

package org.dromara.soul.client.springmvc.init;
/**
 * The type Soul spring mvc client bean post processor.
 *
 * @author xiaoyu(Myth)
 */
@Slf4j
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor {

    private final ThreadPoolExecutor executorService;

    private final String url;

    private final SoulSpringMvcConfig soulSpringMvcConfig;

    public SpringMvcClientBeanPostProcessor(final SoulSpringMvcConfig soulSpringMvcConfig) {
        ValidateUtils.validate(soulSpringMvcConfig);
        this.soulSpringMvcConfig = soulSpringMvcConfig;
        url = soulSpringMvcConfig.getAdminUrl() + "/soul-client/springmvc-register";
        executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
    }

    @Override
    public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {
        if (soulSpringMvcConfig.isFull()) {
            return bean;
        }
        Controller controller = AnnotationUtils.findAnnotation(bean.getClass(), Controller.class);
        RestController restController = AnnotationUtils.findAnnotation(bean.getClass(), RestController.class);
        RequestMapping requestMapping = AnnotationUtils.findAnnotation(bean.getClass(), RequestMapping.class);
        if (controller != null || restController != null || requestMapping != null) {
            SoulSpringMvcClient clazzAnnotation = AnnotationUtils.findAnnotation(bean.getClass(), SoulSpringMvcClient.class);
            String prePath = "";
            if (Objects.nonNull(clazzAnnotation)) {
                if (clazzAnnotation.path().indexOf("*") > 1) {
                    String finalPrePath = prePath;
                    executorService.execute(() -> RegisterUtils.doRegister(buildJsonParams(clazzAnnotation, finalPrePath), url,
                            RpcTypeEnum.HTTP));
                    return bean;
                }
                prePath = clazzAnnotation.path();
            }
            final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(bean.getClass());
            for (Method method : methods) {
                SoulSpringMvcClient soulSpringMvcClient = AnnotationUtils.findAnnotation(method, SoulSpringMvcClient.class);
                if (Objects.nonNull(soulSpringMvcClient)) {
                    String finalPrePath = prePath;
                    executorService.execute(() -> RegisterUtils.doRegister(buildJsonParams(soulSpringMvcClient, finalPrePath), url,
                            RpcTypeEnum.HTTP));
                }
            }
        }
        return bean;
    }

    private String buildJsonParams(final SoulSpringMvcClient soulSpringMvcClient, final String prePath) {
        String contextPath = soulSpringMvcConfig.getContextPath();
        String appName = soulSpringMvcConfig.getAppName();
        Integer port = soulSpringMvcConfig.getPort();
        String path = contextPath + prePath + soulSpringMvcClient.path();
        String desc = soulSpringMvcClient.desc();
        String configHost = soulSpringMvcConfig.getHost();
        String host = StringUtils.isBlank(configHost) ? IpUtils.getHost() : configHost;
        String configRuleName = soulSpringMvcClient.ruleName();
        String ruleName = StringUtils.isBlank(configRuleName) ? path : configRuleName;
        SpringMvcRegisterDTO registerDTO = SpringMvcRegisterDTO.builder()
                .context(contextPath)
                .host(host)
                .port(port)
                .appName(appName)
                .path(path)
                .pathDesc(desc)
                .rpcType(soulSpringMvcClient.rpcType())
                .enabled(soulSpringMvcClient.enabled())
                .ruleName(ruleName)
                .registerMetaData(soulSpringMvcClient.registerMetaData())
                .build();
        return OkHttpTools.getInstance().getGson().toJson(registerDTO);
    }
}

代码核心逻辑是

  1. 校验代码是否有 @Controller 或者 @RestController 或者 @RequestMapping 注解
  2. 如果有,则校验是否有 @SoulSpringMvcClient 注解
  3. 检查路径配置是否有 1 个以上的 * 号,如果有,则直接发送配置信息到 soul-admin
  4. 如果不包含 1 个以上 * 号,则遍历类中每一个方法,找到有 @SoulSpringMvcClient 注解的方法,发送配置到 soul-admin

soul-admin 源码解析

研究服务端注册接口 /soul-client/springmvc-register ,其实现类为 SoulClientController 。
具体实现代码是下面部分

    @Override
    @Transactional
    public String registerSpringMvc(final SpringMvcRegisterDTO dto) {
        if (dto.isRegisterMetaData()) {
            MetaDataDO exist = metaDataMapper.findByPath(dto.getPath());
            if (Objects.isNull(exist)) {
                saveSpringMvcMetaData(dto);
            }
        }
        String selectorId = handlerSpringMvcSelector(dto);
        handlerSpringMvcRule(selectorId, dto);
        return SoulResultMessage.SUCCESS;
    }
  1. 根据配置地址查库校验是否已存在
  2. 如果不存在,则根据收到的配置保存元数据
  3. 校验 Selector 是否存在,不存在则保存
  4. 校验 Rule 是否存在,不存在则保存

小结

至此,客户端向 soul-admin 同步代码的逻辑就梳理清楚了,其核心配置是 yml 文件中的配置文件和 @SoulSpringMvcClient 注解,发送配置实现的核心代码为 ContextRegisterListener 类和 SpringMvcClientBeanPostProcessor 类,接收配置核心类为 SoulClientController。

soul-admin 向 soul-bootstrap 同步

数据同步流程图

阅读官方文档,发现数据同步流程图如下


数据同步流程

核心依赖

从图中可以看到,数据同步核心为粉色框部分,四种方式的 SPI,然后 soul-gateway 去pull/watch 数据同步到本地。
查看 soul-gateway 的依赖,找到依赖模块为

        <dependency>
            <groupId>org.dromara</groupId>
            <artifactId>soul-spring-boot-starter-sync-data-zookeeper</artifactId>
            <version>${project.version}</version>
        </dependency>

        <!--soul data sync start use websocket-->
        <dependency>
            <groupId>org.dromara</groupId>
            <artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId>
            <version>${project.version}</version>
        </dependency>

        <!--soul data sync start use http-->
        <dependency>
            <groupId>org.dromara</groupId>
            <artifactId>soul-spring-boot-starter-sync-data-http</artifactId>
            <version>${project.version}</version>
        </dependency>

        <!-- soul data sync start use nacos -->
        <dependency>
            <groupId>org.dromara</groupId>
            <artifactId>soul-spring-boot-starter-sync-data-nacos</artifactId>
            <version>${project.version}</version>
        </dependency>

查看他们的 SPI 定义,代码位置在下图

spi 定义

代码解析

根据 SpringBoot 的 SPI 规范,查看模块对应的 spring.factories 描述文件,以 websocket 为例,配置了同步的配置文件如下

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.dromara.soul.spring.boot.starter.sync.data.websocket.WebsocketSyncDataConfiguration

找到其代码实现为

    @Bean
    public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
                                           final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
        log.info("you use websocket sync soul data.......");
        return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
    }

主要是 new 了 WebsocketSyncDataService 服务,说明核心服务在 WebsocketSyncDataService 类中。继续跟踪代码研究具体实现逻辑。

    public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
                                    final PluginDataSubscriber pluginDataSubscriber,
                                    final List<MetaDataSubscriber> metaDataSubscribers,
                                    final List<AuthDataSubscriber> authDataSubscribers) {
        String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
        executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));
        for (String url : urls) {
            try {
                clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
            } catch (URISyntaxException e) {
                log.error("websocket url({}) is error", url, e);
            }
        }
        try {
            for (WebSocketClient client : clients) {
                boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
                if (success) {
                    log.info("websocket connection is successful.....");
                } else {
                    log.error("websocket connection is error.....");
                }
                executor.scheduleAtFixedRate(() -> {
                    try {
                        if (client.isClosed()) {
                            boolean reconnectSuccess = client.reconnectBlocking();
                            if (reconnectSuccess) {
                                log.info("websocket reconnect is successful.....");
                            } else {
                                log.error("websocket reconnection is error.....");
                            }
                        }
                    } catch (InterruptedException e) {
                        log.error("websocket connect is error :{}", e.getMessage());
                    }
                }, 10, 30, TimeUnit.SECONDS);
            }
            /* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/
        } catch (InterruptedException e) {
            log.info("websocket connection...exception....", e);
        }

    }
  1. 读取配置中的同步地址列表,根据地址数量创建定时调度连接池
  2. 每个同步地址都发起一个 websocket 连接,核心为放入了3个订阅者
  3. 每个 socket 连接每 30 秒检测一次健康度,并打印日志
  4. 线程池的线程发起一个每 30 秒触发一次的请求,检查连接是否成功,并打印日志

很明显,核心为第二步的3个订阅者,这里采用了订阅者模式,发布者一定是在 soul-admin 中进行的。

数据发布实现

观察 soul-admin 代码,找到如下代码


发布者实现

四种不同的同步方式,都是对 DataChangedListener 的不同实现,发送的自定义事件为DataChangedEvent。

DataChangedListener 定义了5种响应时间,分别是 AppAuth 变更、Plugin 变更、Selector 变更、MetaData 变更和 Rule 变更。
DataChangedEvent 定义了 2 中枚举类型,分别是操作类型(新增、变更等)和数据类型(Plugin、Rule等)。

一次发送事件的实例代码
代码源于org.dromara.soul.admin.listener.zookeeper.HttpServiceDiscovery:line171

eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,
        Collections.singletonList(selectorData)));

小结

soul-admin 向 soul-gateway 同步核心原理为 SPI 机制,SPI 的定义在soul-spring-boot-starter-sync-data-center 模块下的spring.factories描述文件中。

SPI 具体实现在 soul-sync-data-center 模块下,本文以 websocket 方式为例解析了代码,其它方式同步在具体实现方式上略有不同。

网关获取数据采用发布订阅模式,订阅者为 soul-gateway 网关,发布者为 soul-admin,发布者的实现逻辑在 org.dromara.soul.admin.listener 包下面。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,874评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,102评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,676评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,911评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,937评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,935评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,860评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,660评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,113评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,363评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,506评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,238评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,861评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,486评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,674评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,513评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,426评论 2 352

推荐阅读更多精彩内容