目标
- 研究Soul网关是如何和Soul Admin进行数据同步的。
数据同步原理
- Soul网关和Soul Admin进行数据同步有以下四种的方式:
- 基于websocket
- 基于http长连接
- 基于zookeeper
- 基于nacos
主要研究Soul网关是怎么通过websocket协议来获取数据并将这些数据写入到缓存中(内存)。下面是以Dubbo的MetaData数据为例进行源码分析。
核心类
- SoulWebsocketClient
- WebsocketDataHandler
- DataHandler
- MetaDataHandler
- ApacheDubboMetaDataSubscriber
- ApplicationConfigCache
源码分析
SoulWebsocketClient
SoulWebsocketClient类的主要功能是在Soul网关启动时获取相关配置信息,和相关配置信息变更的时候,实时同步到Soul网关。我们看一下关键的代码部分:
@Override
public void onOpen(final ServerHandshake serverHandshake) {
// 是否已同步过
if (!alreadySync) {
// 向server端发送一条DataEventTypeEnum类型为MYSELF的数据到server端也就是Soul Admin。
// Soul Admin接收到这条消息,会把插件列表、选择器、规则、元数据等相关数据发送给Soul网关
send(DataEventTypeEnum.MYSELF.name());
alreadySync = true;
}
}
/**
* 接收来到Soul Admin发送过来的数据,如插件列表、选择器、规则、元数据等等
* @param result
*/
@Override
public void onMessage(final String result) {
log.info("websocket on message data: {}", result);
// 对数据进行处理,并调用WebsocketDataHandler类的execute方法
handleResult(result);
}
WebsocketDataHandler
WebsocketDataHandler类是一个工厂类,采用工厂方法设计模式,根据不同的数据类型调用不同的DataHandler。
DataHandler
DataHandler是一个接口,只有一个方法,该方法主要功能是把数据写入内存、更新数据或删除数据。
DataHandler的实现类:
- AbstractDataHandler 是抽象类,采用模板方法设计模式,主要方法如下:
- doRefresh()方法:先将数据清空,并把最新的数据写入到内存中。
- doUpdate()方法:只更新部分数据或替换旧数据
- doDelete()方法:删除数据
- PluginDataHandler 插件数据处理器
- SelectorDataHandler 选择器数据处理器
- RuleDataHandler 规则数据处理器
- MetaDataHandler 元数据处理器
- AuthDataHandler 认证数据处理器
MetaDataHandler
MetaDataHandler类会根据rpcType来处理不同的RPC服务,这次我们以Dubbo RPC服务为例进行分析。如果rpcType是dubbo。如果它就调用ApacheDubboMetaDataSubscriber类的onSubscribe()方法
ApacheDubboMetaDataSubscriber
ApacheDubboMetaDataSubscriber主要功能是做为Dubbo Consumer端,将元数据信息注册到元数据中心中,如zookeeper。
public void onSubscribe(final MetaData metaData) {
// 首先判断rpcType是否为dubbo
if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {
MetaData exist = META_DATA.get(metaData.getPath());
// 如果 path 不存在缓存中
if (Objects.isNull(META_DATA.get(metaData.getPath())) || Objects.isNull(ApplicationConfigCache.getInstance().get(metaData.getPath()))) {
// The first initialization
// 进行初始化DubboReference,验证DubboService是否有Provider。如果没有则抛异常
ApplicationConfigCache.getInstance().initRef(metaData);
} else {
// There are updates, which only support the update of four properties of serviceName rpcExt parameterTypes methodName,
// because these four properties will affect the call of Dubbo;
// 如果存在则只更新ServiceName、MethodName、ParameterTypes或一些扩展信息
if (!metaData.getServiceName().equals(exist.getServiceName())
|| !metaData.getRpcExt().equals(exist.getRpcExt())
|| !metaData.getParameterTypes().equals(exist.getParameterTypes())
|| !metaData.getMethodName().equals(exist.getMethodName())) {
ApplicationConfigCache.getInstance().build(metaData);
}
}
META_DATA.put(metaData.getPath(), metaData);
}
}
ApplicationConfigCache
ApplicationConfigCache类主要功能是缓存Dubbo ReferenceConfig数据、初始Dubbo Config信息等等
public void init(final DubboRegisterConfig dubboRegisterConfig) {
// 设置Dubbo的 ApplicationConfig
if (applicationConfig == null) {
applicationConfig = new ApplicationConfig("soul_proxy");
}
// 设置Dubbo的 RegistryConfig 注册中心
if (registryConfig == null) {
registryConfig = new RegistryConfig();
// 通过application.yml获取
registryConfig.setProtocol(dubboRegisterConfig.getProtocol());
registryConfig.setId("soul_proxy");
registryConfig.setRegister(false);
// 通过application.yml获取
registryConfig.setAddress(dubboRegisterConfig.getRegister());
Optional.ofNullable(dubboRegisterConfig.getGroup()).ifPresent(registryConfig::setGroup);
}
}
public ReferenceConfig<GenericService> initRef(final MetaData metaData) {
try {
// 如果缓存存在则返回ReferenceConfig
ReferenceConfig<GenericService> referenceConfig = cache.get(metaData.getPath());
if (StringUtils.isNoneBlank(referenceConfig.getInterface())) {
return referenceConfig;
}
} catch (ExecutionException e) {
log.error("init dubbo ref ex:{}", e.getMessage());
}
// 缓存不存在创建ReferenceConfig
return build(metaData);
}
至此,Dubbo相关MateData数据已同步到Soul网关的内存中。
总结
- 通过分析源码学到如何运用工厂方法和模板方法的设计模式
- Dubbo ReferenceConfig是如何初始化的。