前言
昨天调试的时候,发现 listeners 中包含 WebsocketDataChangedListener。

listeners
那这个是怎么来的呢?今天就初探数据同步原理。
同步策略概要
官网上有张关于同步数据的高清无码的大图。

同步策略.png
从这张大图上我们大体知道,soul-admin 配置后怎么同步到 soul-bootstrap 层生效的。
之前我们也了解过
ApplicationEventPublisher 和 DataChangedEventDispatcher ,写了一个关于怎么使用 ApplicationEventPublisher 简单的 demo ,然后又通过 Debug 我们验证配置流程
继续调试
当进入 DataChangedEventDispatcher#onApplicationEvent 的方法时,具体方法如下:
@Override
    @SuppressWarnings("unchecked")
    public void onApplicationEvent(final DataChangedEvent event) {
        for (DataChangedListener listener : listeners) {
            switch (event.getGroupKey()) {
                case APP_AUTH:
                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
                    break;
                case PLUGIN:
                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
                    break;
                case RULE:
                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
                    break;
                case SELECTOR:
                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
                    break;
                case META_DATA:
                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
                    break;
                default:
                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
            }
        }
    }
    @Override
// afterPropertiesSet方法,初始化bean的时候执行,可以针对某个具体的bean进行配置。
    public void afterPropertiesSet() {
        Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();
        this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));
    }
首先映入眼帘的是循环变量 listeners, 查看得知 listeners 的类型是
List<DataChangedListener>, 其中 DataChangedListener 是其元素的类型, 然后继续追踪 DataChangedListener;

DataChangedListener
发现它是一个接口,并非是实现,他的实现有
- 
HttpLongPollingDataChangedListenerhttp的长轮询
- 
NacosDataChangedListenernacos 配置中心
- 
WebsocketDataChangedListenerwebsocket 的方式
- 
ZookeeperDataChangedListenerzk 配置中心(这个配置中心能dubbo 的共用么?)
那这些实现又是怎么注册到应用中的呢?
看这里的时候有点懵,然后我重新启动应用: SoulBootstrapApplication,  在命令行中发现 you use websocket sync soul data,  全文检索,得到以下方法:
    @Bean
    public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
                                           final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
        log.info("you use websocket sync soul data.......");
        return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
                metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
    }
该方法所在类上有这么一个注解
@ConditionalOnProperty(prefix = "soul.sync.websocket", name = "urls")
在 配置文件中:application-local.yml 中 配置
soul :
    sync:
        websocket :
             urls: ws://localhost:9095/websocket
这样的配置在启动时就尝试去连接了,soul-admin 中 websocket 服务器。
然后我们继续看 soul-admin,  同理推想是不是已经在启动时注册了,查看配置文件
application.yml,
soul:
  sync:
    websocket:
      enabled: true
查看 DataSyncConfiguration
@Configuration
    @ConditionalOnProperty(name = "soul.sync.websocket.enabled", havingValue = "true", matchIfMissing = true)
    @EnableConfigurationProperties(WebsocketSyncProperties.class)
    static class WebsocketListener {
        /**
         * Config event listener data changed listener.
         *
         * @return the data changed listener
         */
        @Bean
        @ConditionalOnMissingBean(WebsocketDataChangedListener.class)
        public DataChangedListener websocketDataChangedListener() {
            return new WebsocketDataChangedListener();
        }
        /**
         * Websocket collector websocket collector.
         *
         * @return the websocket collector
         */
        @Bean
        @ConditionalOnMissingBean(WebsocketCollector.class)
        public WebsocketCollector websocketCollector() {
            return new WebsocketCollector();
        }
        /**
         * Server endpoint exporter server endpoint exporter.
         *
         * @return the server endpoint exporter
         */
        @Bean
        @ConditionalOnMissingBean(ServerEndpointExporter.class)
        public ServerEndpointExporter serverEndpointExporter() {
            return new ServerEndpointExporter();
        }
    }
注册了 WebsocketDataChangedListener。到此同步策略怎么来的已经走完了。大体的流程图如下:

同步策略
总结
- 目前还是只看了大体的流程。一些细节的地方还需要在斟酌斟酌。
- 针对以上内容,还需要看非 Soul 上的 afterPropertiesSet, spring-bean模块中InitializingBean接口
- 接下来要看的地方就是数据同步的具体实现