ZookeeperSyncDataConfiguration
@Bean
// Zookeeper同步的bean初始化
public SyncDataService syncDataService(final ObjectProvider<ZkClient> zkClient, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
log.info("you use zookeeper sync soul data.......");
return new ZookeeperSyncDataService(zkClient.getIfAvailable(), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
ZookeeperSyncDataService,订阅变更数据
// 监听所有的数据:插件,选择器,规则
private void watcherData() {
// 插件顶层目录
final String pluginParent = ZkPathConstants.PLUGIN_PARENT;
// 获取当前目录下的子列表【都是插件】
List<String> pluginZKs = zkClientGetChildren(pluginParent);
for (String pluginName : pluginZKs) {
// 遍历监听所有的插件变动
watcherAll(pluginName);
}
zkClient.subscribeChildChanges(pluginParent, (parentPath, currentChildren) -> {
if (CollectionUtils.isNotEmpty(currentChildren)) {
for (String pluginName : currentChildren) {
// 监听子插件变动
watcherAll(pluginName);
}
}
});
}
中途还发现一个监听错误
// 监听元数据
private void watchMetaData() {
final String metaDataPath = ZkPathConstants.META_DATA;
List<String> childrenList = zkClientGetChildren(metaDataPath);
if (CollectionUtils.isNotEmpty(childrenList)) {
childrenList.forEach(children -> {
String realPath = buildRealPath(metaDataPath, children);
cacheMetaData(zkClient.readData(realPath));
subscribeMetaDataChanges(realPath);
});
}
// 监听类型错误 cutie 20200123
subscribeChildChanges(ConfigGroupEnum.APP_AUTH, metaDataPath, childrenList);
}
admin模块中的需要发布变动的地方都使用了ApplicationEventPublisher发布消息
DataChangedEventDispatcher实现了ApplicationListener来接收变动信息
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
private ApplicationContext applicationContext;
private List<DataChangedListener> listeners;
public DataChangedEventDispatcher(final ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
for (DataChangedListener listener : listeners) {
switch (event.getGroupKey()) {
case APP_AUTH:
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN:
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE:
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR:
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA:
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default:
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}
@Override
public void afterPropertiesSet() {
Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();
this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));
}
}
里面的监听器针对不同的groupKey进行对应的监听操作。所有的数据监听都实现了DataChangedListener接口,包括ZooKeeper使用的ZookeeperDataChangedListener
Zookeeper节点的变动都在这里进行操作,其他zk客户端监听zk的变动之后就会进行数据同步,以插件变动为例:
@Override
public void onPluginChanged(final List<PluginData> changed, final DataEventTypeEnum eventType) {
for (PluginData data : changed) {
final String pluginPath = ZkPathConstants.buildPluginPath(data.getName());
// delete
if (eventType == DataEventTypeEnum.DELETE) {
deleteZkPathRecursive(pluginPath);
final String selectorParentPath = ZkPathConstants.buildSelectorParentPath(data.getName());
deleteZkPathRecursive(selectorParentPath);
final String ruleParentPath = ZkPathConstants.buildRuleParentPath(data.getName());
deleteZkPathRecursive(ruleParentPath);
continue;
}
//create or update
upsertZkNode(pluginPath, data);
}
}