1.4.1 数据同步方式
我们知道Soul数据同步可以配置为 Websocket,Http 长轮询,Zookeeper 和 Nacos 这四种方式,我们从更新Selector的接口作为切入点,根据调用链一步步分析各种数据同步方式。
1.4.1 通用处理链路
在决定使用那种方式发送前都会经过一个通用的分配流程,我们从新Selector的接口出发,SelectorController 的 updateSelector 方法会被端调用进行服务更新。这里调用了 SelectorServiceImpl服务进行更新。
这里先执行数据库操作,将对应的Selector更新,然后将旧的rule删除,然后再逐条插入新的规则,最终调用到了eventPubliser的publishEvent方法,这里使用了SpringBoot的消息发布机制,最终发布了一个DataChangedEvent类型的消息,我们接着看消息消费者 DataChangedEventDispatcher 。
DataChangedEventDispatcher 实现了 ApplicationListener<DataChangedEvent> 这个接口,它重写了OnApplicationEvent方法,该方法就是接收消息的方法。同时 DataChangedEventDispatcher 也实现了InitializingBean 接口,他会在所有的Bean都准备就绪后触发 afterPropertiesSet 方法。
我们看它主要就是查询里面所有实现了 DataChangedListener 方法的实现类,其主要实现类有如下几个。其实就是我们接下来要说的几种数据同步方式。
我们回过头来看一下 OnApplicationEvent 做了什么,我们可以看到,它主要是轮询所有注册了的Listener,然后根据不同的Group Key 调用对应的方法。listener的方法。
我们看这里的listener是如何注册进ApplicationContext的呢。我们看它被引用的地方 DataSyncConfiguration ,这里根据我们的配置进行对应的加载,我们看一下websocket的流程:但开启soul.sync.websocket.enabled 的时候会进行bean的加载。这里包括三部分,WebsocketDataChangedListener 即我们刚才被调用的listener;WebsocketCollector 这是个WebSocket 的客户端,主要负责收发数据;ServerEndpointExporter 这个是Spring-WebSocket的一个类,他也实现了SmartInitializingSingleton 接口,他会在每个单例bean注册完成后对其进行扫描是否使用了 ServerEndpoint 注解,然后将该类注入到serverContainer中。
1.4.2 websocket Server 端更新流程
由于我们是更新selector , 所以会调用listener 的 onSelectorChanged 方法,这里最终还是调用了WebsocketCollector 的send 方法。
我们可以看出来,在Spring中使用WebSocket非常简单,我们只需要使用ServerEndpoint 这个注解定义这个是websocket处理类,OnOpen 这个注解类定义在websocket链接建立后生产对应的session然后回调这个服务,这里将session作为静态属性持有,OnMessage 接收消息处理方法,onClose 链接关闭处理方法。
我们可以看到Souladmin这里会持有所有的WebSocket链接。
在发送的时候会给所有的客户端进行发送,这里还有对自定义类型的处理。
1.4.2 websocket Client 端
我们再来看看接收端,我们可以看到使用websocket同步数据,BootStrap需要使用以下依赖
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId>
<version>${project.version}</version>
</dependency>
我们找到该项目,这是一个标准的springboot-starter,这里定义个一个自动配置类。
我们来看一下配置类 WebsocketSyncDataConfiguration, 它主要是初始化了两个Bean 一个是 websocketSyncDataService 即为数据同步服务, 一个是 websocketConfig 即websocket的一些配置项。这里会将以soul.sync.websocket 开头的配置注入到这个类中,这里只有一个属性 urls。
websocketSyncDataService 这里使用到了 ObjectProvider 这个是SpringFactory到一个扩展点,我们知道在一个方法前Autowrite也可以实现注入,但是假如存在多个参数类是以这里到常见到话Autowrite就懵,
- 如果注入实例为空时,使用ObjectProvider则避免了强依赖导致的依赖对象不存在异常;主要是 getIfAvailable 这个方法。
- 如果有多个实例,ObjectProvider的方法会根据Bean实现的Ordered接口或@Order注解指定的先后顺序获取一个Bean。从而了提供了一个更加宽松的依赖注入方式。主要通过 orderedStream 方法返回一个根据order注解的一个bean 的 stream。
@Bean
public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
log.info("you use websocket sync soul data.......");
return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
我们根据debug数据看一下主要注入的参数,PluginDataSubscriber 这个主要是各个插件的Handler 主要负责插件内的数据更新。metaDataSubscribers 元数据订阅者,authDataSubscribers 认证数据订阅。
这里会根据websocket的配置生成一个SoulWebsocketClient 客户端,然后尝试链接服务端。
这里会触发服务端的OnMessage方法,然后触发SysnAll方法,向客户端发送所有数据。我们看调用栈,这里会包含所有信息同步给客户端。
这里包括了插件数据,selector 数据和 rule 数据。
@Override
public boolean syncAll(final DataEventTypeEnum type) {
appAuthService.syncData();
List<PluginData> pluginDataList = pluginService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.PLUGIN, type, pluginDataList));
List<SelectorData> selectorDataList = selectorService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, type, selectorDataList));
List<RuleData> ruleDataList = ruleService.listAll();
eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, type, ruleDataList));
metaDataService.syncData();
return true;
}
我们再看看SoulWebsocketClient 做了什么,其主要是在接收到OnMessge的时候调用websocketDataHandler 进行消息处理。
websocketDataHandler 里面再注册了各种数据的处理器,如图 包括plugin selector rule app_auth meta 这几种数据的更新。
我们看handle 是一个default方法,主要是调用各个实现类的refresh方法
我们更新selector主要是做以下操作,更新内存数据,然后通知各个插件数据更新。
1.4.4 总结
在这期的学习过程中,我学到很一些Spring的新特性如 ObjectProvider,还知道了在SpringBoot中如何写一个WebSocket的客户端。