前言
昨天调试的时候,发现 listeners 中包含 WebsocketDataChangedListener
。
那这个是怎么来的呢?今天就初探数据同步原理。
同步策略概要
官网上有张关于同步数据的高清无码的大图。
从这张大图上我们大体知道,soul-admin 配置后怎么同步到 soul-bootstrap 层生效的。
之前我们也了解过
ApplicationEventPublisher
和 DataChangedEventDispatcher
,写了一个关于怎么使用 ApplicationEventPublisher
简单的 demo ,然后又通过 Debug 我们验证配置流程
继续调试
当进入 DataChangedEventDispatcher#onApplicationEvent 的方法时,具体方法如下:
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
for (DataChangedListener listener : listeners) {
switch (event.getGroupKey()) {
case APP_AUTH:
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN:
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE:
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR:
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA:
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default:
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}
@Override
// afterPropertiesSet方法,初始化bean的时候执行,可以针对某个具体的bean进行配置。
public void afterPropertiesSet() {
Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();
this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));
}
首先映入眼帘的是循环变量 listeners, 查看得知 listeners 的类型是
List<DataChangedListener>
, 其中 DataChangedListener 是其元素的类型, 然后继续追踪 DataChangedListener;
发现它是一个接口,并非是实现,他的实现有
-
HttpLongPollingDataChangedListener
http的长轮询 -
NacosDataChangedListener
nacos 配置中心 -
WebsocketDataChangedListener
websocket 的方式 -
ZookeeperDataChangedListener
zk 配置中心(这个配置中心能dubbo 的共用么?)
那这些实现又是怎么注册到应用中的呢?
看这里的时候有点懵,然后我重新启动应用: SoulBootstrapApplication, 在命令行中发现 you use websocket sync soul data
, 全文检索,得到以下方法:
@Bean
public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
log.info("you use websocket sync soul data.......");
return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
该方法所在类上有这么一个注解
@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")
在 配置文件中:application-local.yml
中 配置
soul :
sync:
websocket :
urls: ws://localhost:9095/websocket
这样的配置在启动时就尝试去连接了,soul-admin 中 websocket 服务器。
然后我们继续看 soul-admin, 同理推想是不是已经在启动时注册了,查看配置文件
application.yml
,
soul:
sync:
websocket:
enabled: true
查看 DataSyncConfiguration
@Configuration
@ConditionalOnProperty(name = "soul.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)
@EnableConfigurationProperties(WebsocketSyncProperties.class)
static class WebsocketListener {
/**
* Config event listener data changed listener.
*
* @return the data changed listener
*/
@Bean
@ConditionalOnMissingBean(WebsocketDataChangedListener.class)
public DataChangedListener websocketDataChangedListener() {
return new WebsocketDataChangedListener();
}
/**
* Websocket collector websocket collector.
*
* @return the websocket collector
*/
@Bean
@ConditionalOnMissingBean(WebsocketCollector.class)
public WebsocketCollector websocketCollector() {
return new WebsocketCollector();
}
/**
* Server endpoint exporter server endpoint exporter.
*
* @return the server endpoint exporter
*/
@Bean
@ConditionalOnMissingBean(ServerEndpointExporter.class)
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
注册了 WebsocketDataChangedListener
。到此同步策略怎么来的已经走完了。大体的流程图如下:
总结
- 目前还是只看了大体的流程。一些细节的地方还需要在斟酌斟酌。
- 针对以上内容,还需要看非 Soul 上的 afterPropertiesSet, spring-bean模块中InitializingBean接口
- 接下来要看的地方就是数据同步的具体实现