引子
从官网 clone 代码下来后,依次启动 soul-admin、soul-bootstrap 和 soul-examples-http 模块,启动成功后查看 soul-admin 会发现 soul-examples-http 模块里的代理配置和路由规则已经自动同步到了 soul-admin 上,通过 postman 调用发现配置也自动生效。
soul-examples-http 核心配置
soul:
http:
adminUrl: http://localhost:9095
port: 8188
contextPath: /http
appName: http
full: false
soul-examples-http 主要接口
soul-admin 自动更新到的配置
devide 配置全局
选择器详情
可以看到选择器已经读取了 soul-examples-http 配置文件中的信息,并且 soul-admin 自动获取到了soul-examples-http 服务的 IP 和端口信息。
postman 验证
直接请求 soul-examples-http 服务
通过网关转发到 soul-examples-http 服务
结果符合预期,证明 soul-examples-http 的配置自动同步给了 soul-admin 和 soul-bootstrap,接下来就研究一下 soul 是怎么实现这一块逻辑的。
客户端向 soul-admin 同步
配置分析
通过查看 soul-examples-http 的配置文件,可以看到有配置 soul-admin 的地址
soul:
http:
adminUrl: http://localhost:9095
跟进这个配置映射 Java 实体,找到了soul-client-springmvc 下的SoulSpringMvcConfig类,这个类定值了配置相关的选择
public class SoulSpringMvcConfig {
/**
* soul-admin 地址
*/
private String adminUrl;
/**
* 代理服务上下文地址
*/
private String contextPath;
/**
* 服务名
*/
private String appName;
/**
* 是否全局代理
*/
private boolean full;
/**
* 服务 host 地址
*/
private String host;
/**
* 服务端口
*/
private Integer port;
}
客户端源码解析
查看配置类的调用方,追踪到两个类,分别是 ContextRegisterListener 和 SpringMvcClientBeanPostProcessor,分别在项目启动完成后执行和 bean 扫描后执行。具体代码和解析如下:
public class ContextRegisterListener implements ApplicationListener<ContextRefreshedEvent> {
private final AtomicBoolean registered = new AtomicBoolean(false);
private final String url;
private final SoulSpringMvcConfig soulSpringMvcConfig;
/**
* Instantiates a new Context register listener.
*
* @param soulSpringMvcConfig the soul spring mvc config
*/
public ContextRegisterListener(final SoulSpringMvcConfig soulSpringMvcConfig) {
ValidateUtils.validate(soulSpringMvcConfig);
this.soulSpringMvcConfig = soulSpringMvcConfig;
url = soulSpringMvcConfig.getAdminUrl() + "/soul-client/springmvc-register";
}
@Override
public void onApplicationEvent(final ContextRefreshedEvent contextRefreshedEvent) {
if (!registered.compareAndSet(false, true)) {
return;
}
if (soulSpringMvcConfig.isFull()) {
RegisterUtils.doRegister(buildJsonParams(), url, RpcTypeEnum.HTTP);
}
}
private String buildJsonParams() {
String contextPath = soulSpringMvcConfig.getContextPath();
String appName = soulSpringMvcConfig.getAppName();
Integer port = soulSpringMvcConfig.getPort();
String path = contextPath + "/**";
String configHost = soulSpringMvcConfig.getHost();
String host = StringUtils.isBlank(configHost) ? IpUtils.getHost() : configHost;
SpringMvcRegisterDTO registerDTO = SpringMvcRegisterDTO.builder()
.context(contextPath)
.host(host)
.port(port)
.appName(appName)
.path(path)
.rpcType(RpcTypeEnum.HTTP.getName())
.enabled(true)
.ruleName(path)
.build();
return OkHttpTools.getInstance().getGson().toJson(registerDTO);
}
}
核心代码逻辑是当配置为全局代理的时候,调用 soul-admin 的/soul-client/springmvc-register 接口,将配置信息同步给 soul-admin。
package org.dromara.soul.client.springmvc.init;
/**
* The type Soul spring mvc client bean post processor.
*
* @author xiaoyu(Myth)
*/
@Slf4j
public class SpringMvcClientBeanPostProcessor implements BeanPostProcessor {
private final ThreadPoolExecutor executorService;
private final String url;
private final SoulSpringMvcConfig soulSpringMvcConfig;
public SpringMvcClientBeanPostProcessor(final SoulSpringMvcConfig soulSpringMvcConfig) {
ValidateUtils.validate(soulSpringMvcConfig);
this.soulSpringMvcConfig = soulSpringMvcConfig;
url = soulSpringMvcConfig.getAdminUrl() + "/soul-client/springmvc-register";
executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
}
@Override
public Object postProcessAfterInitialization(@NonNull final Object bean, @NonNull final String beanName) throws BeansException {
if (soulSpringMvcConfig.isFull()) {
return bean;
}
Controller controller = AnnotationUtils.findAnnotation(bean.getClass(), Controller.class);
RestController restController = AnnotationUtils.findAnnotation(bean.getClass(), RestController.class);
RequestMapping requestMapping = AnnotationUtils.findAnnotation(bean.getClass(), RequestMapping.class);
if (controller != null || restController != null || requestMapping != null) {
SoulSpringMvcClient clazzAnnotation = AnnotationUtils.findAnnotation(bean.getClass(), SoulSpringMvcClient.class);
String prePath = "";
if (Objects.nonNull(clazzAnnotation)) {
if (clazzAnnotation.path().indexOf("*") > 1) {
String finalPrePath = prePath;
executorService.execute(() -> RegisterUtils.doRegister(buildJsonParams(clazzAnnotation, finalPrePath), url,
RpcTypeEnum.HTTP));
return bean;
}
prePath = clazzAnnotation.path();
}
final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(bean.getClass());
for (Method method : methods) {
SoulSpringMvcClient soulSpringMvcClient = AnnotationUtils.findAnnotation(method, SoulSpringMvcClient.class);
if (Objects.nonNull(soulSpringMvcClient)) {
String finalPrePath = prePath;
executorService.execute(() -> RegisterUtils.doRegister(buildJsonParams(soulSpringMvcClient, finalPrePath), url,
RpcTypeEnum.HTTP));
}
}
}
return bean;
}
private String buildJsonParams(final SoulSpringMvcClient soulSpringMvcClient, final String prePath) {
String contextPath = soulSpringMvcConfig.getContextPath();
String appName = soulSpringMvcConfig.getAppName();
Integer port = soulSpringMvcConfig.getPort();
String path = contextPath + prePath + soulSpringMvcClient.path();
String desc = soulSpringMvcClient.desc();
String configHost = soulSpringMvcConfig.getHost();
String host = StringUtils.isBlank(configHost) ? IpUtils.getHost() : configHost;
String configRuleName = soulSpringMvcClient.ruleName();
String ruleName = StringUtils.isBlank(configRuleName) ? path : configRuleName;
SpringMvcRegisterDTO registerDTO = SpringMvcRegisterDTO.builder()
.context(contextPath)
.host(host)
.port(port)
.appName(appName)
.path(path)
.pathDesc(desc)
.rpcType(soulSpringMvcClient.rpcType())
.enabled(soulSpringMvcClient.enabled())
.ruleName(ruleName)
.registerMetaData(soulSpringMvcClient.registerMetaData())
.build();
return OkHttpTools.getInstance().getGson().toJson(registerDTO);
}
}
代码核心逻辑是
- 校验代码是否有 @Controller 或者 @RestController 或者 @RequestMapping 注解
- 如果有,则校验是否有 @SoulSpringMvcClient 注解
- 检查路径配置是否有 1 个以上的 * 号,如果有,则直接发送配置信息到 soul-admin
- 如果不包含 1 个以上 * 号,则遍历类中每一个方法,找到有 @SoulSpringMvcClient 注解的方法,发送配置到 soul-admin
soul-admin 源码解析
研究服务端注册接口 /soul-client/springmvc-register ,其实现类为 SoulClientController 。
具体实现代码是下面部分
@Override
@Transactional
public String registerSpringMvc(final SpringMvcRegisterDTO dto) {
if (dto.isRegisterMetaData()) {
MetaDataDO exist = metaDataMapper.findByPath(dto.getPath());
if (Objects.isNull(exist)) {
saveSpringMvcMetaData(dto);
}
}
String selectorId = handlerSpringMvcSelector(dto);
handlerSpringMvcRule(selectorId, dto);
return SoulResultMessage.SUCCESS;
}
- 根据配置地址查库校验是否已存在
- 如果不存在,则根据收到的配置保存元数据
- 校验 Selector 是否存在,不存在则保存
- 校验 Rule 是否存在,不存在则保存
小结
至此,客户端向 soul-admin 同步代码的逻辑就梳理清楚了,其核心配置是 yml 文件中的配置文件和 @SoulSpringMvcClient 注解,发送配置实现的核心代码为 ContextRegisterListener 类和 SpringMvcClientBeanPostProcessor 类,接收配置核心类为 SoulClientController。
soul-admin 向 soul-bootstrap 同步
数据同步流程图
阅读官方文档,发现数据同步流程图如下
核心依赖
从图中可以看到,数据同步核心为粉色框部分,四种方式的 SPI,然后 soul-gateway 去pull/watch 数据同步到本地。
查看 soul-gateway 的依赖,找到依赖模块为
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-sync-data-zookeeper</artifactId>
<version>${project.version}</version>
</dependency>
<!--soul data sync start use websocket-->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId>
<version>${project.version}</version>
</dependency>
<!--soul data sync start use http-->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-sync-data-http</artifactId>
<version>${project.version}</version>
</dependency>
<!-- soul data sync start use nacos -->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-sync-data-nacos</artifactId>
<version>${project.version}</version>
</dependency>
查看他们的 SPI 定义,代码位置在下图
代码解析
根据 SpringBoot 的 SPI 规范,查看模块对应的 spring.factories 描述文件,以 websocket 为例,配置了同步的配置文件如下
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.dromara.soul.spring.boot.starter.sync.data.websocket.WebsocketSyncDataConfiguration
找到其代码实现为
@Bean
public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
log.info("you use websocket sync soul data.......");
return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
主要是 new 了 WebsocketSyncDataService 服务,说明核心服务在 WebsocketSyncDataService 类中。继续跟踪代码研究具体实现逻辑。
public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers) {
String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));
for (String url : urls) {
try {
clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
} catch (URISyntaxException e) {
log.error("websocket url({}) is error", url, e);
}
}
try {
for (WebSocketClient client : clients) {
boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
if (success) {
log.info("websocket connection is successful.....");
} else {
log.error("websocket connection is error.....");
}
executor.scheduleAtFixedRate(() -> {
try {
if (client.isClosed()) {
boolean reconnectSuccess = client.reconnectBlocking();
if (reconnectSuccess) {
log.info("websocket reconnect is successful.....");
} else {
log.error("websocket reconnection is error.....");
}
}
} catch (InterruptedException e) {
log.error("websocket connect is error :{}", e.getMessage());
}
}, 10, 30, TimeUnit.SECONDS);
}
/* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/
} catch (InterruptedException e) {
log.info("websocket connection...exception....", e);
}
}
- 读取配置中的同步地址列表,根据地址数量创建定时调度连接池
- 每个同步地址都发起一个 websocket 连接,核心为放入了3个订阅者
- 每个 socket 连接每 30 秒检测一次健康度,并打印日志
- 线程池的线程发起一个每 30 秒触发一次的请求,检查连接是否成功,并打印日志
很明显,核心为第二步的3个订阅者,这里采用了订阅者模式,发布者一定是在 soul-admin 中进行的。
数据发布实现
观察 soul-admin 代码,找到如下代码
四种不同的同步方式,都是对 DataChangedListener 的不同实现,发送的自定义事件为DataChangedEvent。
DataChangedListener 定义了5种响应时间,分别是 AppAuth 变更、Plugin 变更、Selector 变更、MetaData 变更和 Rule 变更。
DataChangedEvent 定义了 2 中枚举类型,分别是操作类型(新增、变更等)和数据类型(Plugin、Rule等)。
一次发送事件的实例代码
代码源于org.dromara.soul.admin.listener.zookeeper.HttpServiceDiscovery:line171
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,
Collections.singletonList(selectorData)));
小结
soul-admin 向 soul-gateway 同步核心原理为 SPI 机制,SPI 的定义在soul-spring-boot-starter-sync-data-center 模块下的spring.factories描述文件中。
SPI 具体实现在 soul-sync-data-center 模块下,本文以 websocket 方式为例解析了代码,其它方式同步在具体实现方式上略有不同。
网关获取数据采用发布订阅模式,订阅者为 soul-gateway 网关,发布者为 soul-admin,发布者的实现逻辑在 org.dromara.soul.admin.listener 包下面。