我们知道协议转换也是API
网关常见的一个功能,这次我们看下soul
网关是如何实现协议转换的。
请求流图
大致流程:
一、dubbo服务提供者注册服务到soul-admin
- 使用
@SoulDubboClient
注解,暴露http服务地址到soul-admin
- 接下来在启动过程中,通过spring的BeanPostProcessor机制,对dubbo服务接口做拦截处理。主要逻辑就是先获取到所有dubbo服务的
ServiceBean
对象,再遍历这些ServiceBean
,其接口方法上看是否存在@SoulDubboClient
注解;若是,则会调用soul-admin
的dubbo服务client的注册接口/soul-client/dubbo-register
,完成路由信息的注册。该逻辑由org.dromara.soul.client.alibaba.dubbo.AlibabaDubboServiceBeanPostProcessor
完成
private void handler(final ServiceBean<?> serviceBean) {
Class<?> clazz = serviceBean.getRef().getClass();
if (ClassUtils.isCglibProxyClass(clazz)) {
String superClassName = clazz.getGenericSuperclass().getTypeName();
try {
clazz = Class.forName(superClassName);
} catch (ClassNotFoundException e) {
log.error(String.format("class not found: %s", superClassName));
return;
}
}
// 获取class的去重后的方法,包括父类
final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(clazz);
for (Method method : methods) {
SoulDubboClient soulDubboClient = method.getAnnotation(SoulDubboClient.class);
// 如果SoulDubboClient不为空,则注册服务信息到admin
if (Objects.nonNull(soulDubboClient)) {
RegisterUtils.doRegister(buildJsonParams(serviceBean, soulDubboClient, method), url, RpcTypeEnum.DUBBO);
}
}
}
- 【soul-admin】接收到注册请求并处理
org.dromara.soul.admin.controller.SoulClientController
@PostMapping("/dubbo-register")
public String registerRpc(@RequestBody final MetaDataDTO metaDataDTO) {
return soulClientRegisterService.registerDubbo(metaDataDTO);
}
- dubbo的注册请求逻辑:将元数据、选择器、规则进行落库(更新或插入);
org.dromara.soul.admin.service.impl.SoulClientRegisterServiceImpl
- 落库完成后,会发布配置变更事件,给到
soul-bootstrap
private void publishEvent(final RuleDO ruleDO, final List<RuleConditionDTO> ruleConditions) {
SelectorDO selectorDO = selectorMapper.selectById(ruleDO.getSelectorId());
PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());
List<ConditionData> conditionDataList =
ruleConditions.stream().map(ConditionTransfer.INSTANCE::mapToRuleDTO).collect(Collectors.toList());
// publish change event.
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, DataEventTypeEnum.UPDATE,
Collections.singletonList(RuleDO.transFrom(ruleDO, pluginDO.getName(), conditionDataList))));
}
二、配置变更事件发送给soul-bootstrap
- 从上我们得知每次的配置变更会通过spring的应用事件机制发送一个
DataChangedEvent
事件,经过在源码中搜索关键字ApplicationListener<DataChangedEvent>
能定位到事件的回调处理类为org.dromara.soul.admin.listener.DataChangedEventDispatcher
;
public void onApplicationEvent(final DataChangedEvent event) {
// 遍历事件变更的监听者,根据不同的事件类型,调用相应事件的处理逻辑
for (DataChangedListener listener : listeners) {
switch (event.getGroupKey()) {
case APP_AUTH:
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN:
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE:
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR:
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA:
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default:
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}
-
再看下这里的
DataChangedListener
又是如何来的?为一个接口,查看其实现,得知存在如下几种事件处理的方式
结合官网得知
soul-admin
的配置信息同步到soul-bootstap
存在如下几种方式:http长轮询、Nacos配置同步、Zookeeper配置同步、websocket连接同步-
我们查看到数据同步支持的方式,
org.dromara.soul.admin.config.DataSyncConfiguration
从上图得知,具体使用哪种配置同步方式,又由属性参数决定
soul.sync.http.enabled=true
# 默认
soul.sync.websocket.enabled=true
soul.sync.zookeeper.url=xxx
soul.sync.nacos.url=xxx
- 默认同步方式为websocket,我们继续跟踪源码,看下websocket的同步方式是怎么实现的
- 当前存在的配置数据类型如下:
- APP_AUTH --- app权限配置
- PLUGIN --- 插件配置
- RULE --- 规则配置
- SELECTOR --- 选择器配置
- META_DATA --- 元数据配置
-
org.dromara.soul.admin.listener.websocket.WebsocketDataChangedListener
为数据变更的监听的webscoket实现,存在上述几种配置数据的处理
- 我们跟踪下一个具体配置的处理
onPluginChanged
,会发现调用org.dromara.soul.admin.listener.websocket.WebsocketCollector
的send方法,将消息发送给webscoket的客户端,也就是soul-bootstrap
public void onPluginChanged(final List<PluginData> pluginDataList, final DataEventTypeEnum eventType) {
WebsocketData<PluginData> websocketData =
new WebsocketData<>(ConfigGroupEnum.PLUGIN.name(), eventType.name(), pluginDataList);
// 通过websocket收集器发送变更消息
WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
}
-
WebsocketCollector.send
逻辑
public static void send(final String message, final DataEventTypeEnum type) {
if (StringUtils.isNotBlank(message)) {
// 自身的消息
if (DataEventTypeEnum.MYSELF == type) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("websocket send result is exception: ", e);
}
return;
}
// 其他消息则遍历所有的websocket请求,依次发送消息;这里有多个session,因soul-admin可能是集群
for (Session session : SESSION_SET) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("websocket send result is exception: ", e);
}
}
}
}
- 这里还有全量同步的情况,当
soul-bootstrap
启动的时候,会给soul-admin
发一个self
的消息,soul-admin
接收到消息后,会获取当前所有的配置类型数据,每次生成一个类型的消息,发给soul-bootstrap
。这样就完成了配置的全量同步。
public void onMessage(final String message, final Session session) {
// 收到myself的消息,用于bootstrap初始连接上来的时候,相当于是给admin自己的消息
// 该消息会同步所有类型的配置数据,给到连上来的这个bootstrap
// 如果有多台bootstrap同时连接上来,又会怎样?onMessage保证了有序性?这里应该存在并发问题
// TODO question
if (message.equals(DataEventTypeEnum.MYSELF.name())) {
WebsocketCollector.session = session;
SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF);
}
}
- 使用
websocket
方式的同步时,需要soul-admin
与soul-bootstrap
是互通的