上一节讲了数据持久化后,发送事件后,Spring监听到事件后,做了什么事,并看到现有四种数据同步机制。这节具体加一下http长轮训
org.dromara.soul.admin.listener.http.HttpLongPollingDataChangedListener http长轮训数据监听器
先看下构造器:在构造器中,构造了一个1024长度的阻塞队列,以及一个ScheduledThreadPoolExecutor,并初始化HttpSyncProperties,
/**
* Blocked client.
*/
private final BlockingQueue<LongPollingClient> clients;
private final ScheduledExecutorService scheduler;
private final HttpSyncProperties httpSyncProperties;
/**
* Instantiates a new Http long polling data changed listener.
* @param httpSyncProperties the HttpSyncProperties
*/
public HttpLongPollingDataChangedListener(final HttpSyncProperties httpSyncProperties) {
this.clients = new ArrayBlockingQueue<>(1024);
this.scheduler = new ScheduledThreadPoolExecutor(1,
SoulThreadFactory.create("long-polling", true));
this.httpSyncProperties = httpSyncProperties;
}
HttpSyncProperties主要是http同步的配置
@Getter
@Setter
@ConfigurationProperties(prefix = "soul.sync.http")
public class HttpSyncProperties {
/**
* Whether enabled http sync strategy, default: true.
*/
private boolean enabled = true;
/**
* Periodically refresh the config data interval from the database, default: 5 minutes.
*/
private Duration refreshInterval = Duration.ofMinutes(5);
}
主要定义了http同步开关以及刷新周期。 类初始化之后,更新各种数据缓存,然后执行了一个定时任务,每次调用refreshLocalCache刷新本地缓存
@Override
public final void afterPropertiesSet() {
updateAppAuthCache();
updatePluginCache();
updateRuleCache();
updateSelectorCache();
updateMetaDataCache();
afterInitialize();
}
@Override
protected void afterInitialize() {
long syncInterval = httpSyncProperties.getRefreshInterval().toMillis();
// Periodically check the data for changes and update the cache
scheduler.scheduleWithFixedDelay(() -> {
log.info("http sync strategy refresh config start.");
try {
this.refreshLocalCache();
log.info("http sync strategy refresh config success.");
} catch (Exception e) {
log.error("http sync strategy refresh config error!", e);
}
}, syncInterval, syncInterval, TimeUnit.MILLISECONDS);
log.info("http sync strategy refresh interval: {}ms", syncInterval);
}
private void refreshLocalCache() {
this.updateAppAuthCache();
this.updatePluginCache();
this.updateRuleCache();
this.updateSelectorCache();
this.updateMetaDataCache();
}
这些方法要做的事情都很类似,就是从数据库拿到对应ConfigGroup的所有配置,并更新本地缓存,比如看updateAppAuthCache,就是将当前数据库的配置更新到本地缓存,至于具体为什么要更新到本地缓存,我们后面分晓。
//org.dromara.soul.admin.listener.AbstractDataChangedListener
protected static final ConcurrentMap<String, ConfigDataCache> CACHE = new ConcurrentHashMap<>();
/**
* Update app auth cache.
*/
protected void updateAppAuthCache() {
this.updateCache(ConfigGroupEnum.APP_AUTH, appAuthService.listAll());
}
protected <T> void updateCache(final ConfigGroupEnum group, final List<T> data) {
String json = GsonUtils.getInstance().toJson(data);
ConfigDataCache newVal = new ConfigDataCache(group.name(), json, Md5Utils.md5(json), System.currentTimeMillis());
ConfigDataCache oldVal = CACHE.put(newVal.getGroup(), newVal);
log.info("update config cache[{}], old: {}, updated: {}", group, oldVal, newVal);
}
之前我们看到,当数据事件变化监听器分发者,监听到事件后,会调用各个监听器的对应方法:
//org.dromara.soul.admin.listener.DataChangedEventDispatcher
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
for (DataChangedListener listener : listeners) {
switch (event.getGroupKey()) {
case APP_AUTH:
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN:
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE:
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR:
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA:
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default:
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}
那么在长轮训机制下,主要做了如下事情,还拿AppAuth看下
@Override
public void onAppAuthChanged(final List<AppAuthData> changed, final DataEventTypeEnum eventType) {
if (CollectionUtils.isEmpty(changed)) {
return;
}
this.updateAppAuthCache();
this.afterAppAuthChanged(changed, eventType);
}
接收到变更数据后,会先更新下对应的内存缓存,然后再做数据变更。
//org.dromara.soul.admin.listener.http.HttpLongPollingDataChangedListener#afterAppAuthChanged
@Override
protected void afterAppAuthChanged(final List<AppAuthData> changed, final DataEventTypeEnum eventType) {
scheduler.execute(new DataChangeTask(ConfigGroupEnum.APP_AUTH));
}
这里看到,通过线程池执行一个数据变化任务
/**
* When a group's data changes, the thread is created to notify the client asynchronously.
*/
class DataChangeTask implements Runnable {
/**
* The Group where the data has changed.
*/
private final ConfigGroupEnum groupKey;
/**
* The Change time.
*/
private final long changeTime = System.currentTimeMillis();
/**
* Instantiates a new Data change task.
*
* @param groupKey the group key
*/
DataChangeTask(final ConfigGroupEnum groupKey) {
this.groupKey = groupKey;
}
@Override
public void run() {
//循环所有的LongPollingClient,并调用了sendResponse
for (Iterator<LongPollingClient> iter = clients.iterator(); iter.hasNext();) {
LongPollingClient client = iter.next();
iter.remove();
client.sendResponse(Collections.singletonList(groupKey));
log.info("send response with the changed group,ip={}, group={}, changeTime={}", client.ip, groupKey, changeTime);
}
}
}
我们先来看下LongPollingClient是个什么东东,它主要有以下几个属性,一个异步的上下文,ip,超时时间和异步结果Future。LongPollingClient本身实现了一个Runnable接口
class LongPollingClient implements Runnable {
/**
* The Async context.
*/
private final AsyncContext asyncContext;
/**
* The Ip.
*/
private final String ip;
/**
* The Timeout time.
*/
private final long timeoutTime;
/**
* The Async timeout future.
*/
private Future<?> asyncTimeoutFuture;
我们再来看下run方法:这方法较难看懂。
@Override
public void run() {
//通过org.dromara.soul.admin.listener.http.HttpLongPollingDataChangedListener的ScheduledExecutorService scheduler延迟执行一个一次性的动作,延迟时间是timeoutTime毫秒,当延迟动作开始执行时,将当前的LongPollingClient对象从clients中移除
this.asyncTimeoutFuture = scheduler.schedule(() -> {
clients.remove(LongPollingClient.this);
//1.1
List<ConfigGroupEnum> changedGroups = compareChangedGroup((HttpServletRequest) asyncContext.getRequest());
//1.2
sendResponse(changedGroups);
}, timeoutTime, TimeUnit.MILLISECONDS);
//将当前对象加入到clients中
clients.add(this);
}
这里LongPollingClient.this之前没有见到过,主要是当我们在一个类的内部类中,如果需要访问外部类的方法或者成员域的时候,如果直接使用 this.成员域(与 内部类.this.成员域 没有分别) 调用的显然是内部类的域 , 如果我们想要访问外部类的域的时候,就要必须使用 外部类.this.成员域
package com.test;
public class TestA
{
public void tn()
{
System.out.println("外部类tn");
}
Thread thread = new Thread(){
public void tn(){System.out.println("inner tn");}
public void run(){
System.out.println("内部类run");
TestA.this.tn();//调用外部类的tn方法。
this.tn();//调用内部类的tn方法
}
};
public static void main(String aaa[])
{new TestA().thread.start();}
}
1.1 compareChangedGroup具体做了什么,先不要关注HttpServletRequest是从哪来的,这里也看出了我们本地Cache的作用是什么
private List<ConfigGroupEnum> compareChangedGroup(final HttpServletRequest request) {
List<ConfigGroupEnum> changedGroup = new ArrayList<>(4);
for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
// 针对每一个group获取的对应的参数
String[] params = StringUtils.split(request.getParameter(group.name()), ',');
if (params == null || params.length != 2) {
throw new SoulException("group param invalid:" + request.getParameter(group.name()));
}
//参数第一位时client端的Md5值, 第二位时client端的修改时间戳
String clientMd5 = params[0];
long clientModifyTime = NumberUtils.toLong(params[1]);
//获取本地缓存的配置
ConfigDataCache serverCache = CACHE.get(group.name());
// 检查是否需要更新服务器的缓存配置
//1.1.1
if (this.checkCacheDelayAndUpdate(serverCache, clientMd5, clientModifyTime)) {
changedGroup.add(group);
}
}
return changedGroup;
}
1.1.1 checkCacheDelayAndUpdate
private boolean checkCacheDelayAndUpdate(final ConfigDataCache serverCache, final String clientMd5, final long clientModifyTime) {
// 如果md5相等,说明配置相同,不需要更新
if (StringUtils.equals(clientMd5, serverCache.getMd5())) {
return false;
}
// 如果md5值不等,说明服务器的配置和客户端的缓存不一致
long lastModifyTime = serverCache.getLastModifyTime();
//在比对下服务器配置是否比客户端的更新
if (lastModifyTime >= clientModifyTime) {
// 如果更新,说明客户端的配置是旧的,需要更新
return true;
}
// 如果服务端的缓存配置,比客户端的配置还要老,那么说明,服务端的缓存配置需要更新了
// 这里soul考虑到并发问题,如果多个client都来soul拉取最新配置,而当前的soul-admin配置因为都会走到这里,那么如果我们不加锁的话,会导致,同时走到后面的refreshLocalCache,而refreshLocalCache我们前面看到是需要查询数据库并更新到本地缓存的,那么会导致大量的sql查询给数据库带来压力,所以这里加了一个锁,并设置了超时时间
boolean locked = false;
try {
locked = LOCK.tryLock(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return true;
}
if (locked) {
try {
//这里在拿到锁以后,先去本地缓存再拿一遍最新的缓存配置,与刚才获取到的配置做下对比,如果发现不相等,说明之前获取到锁之前已经有数据更新到缓存,
ConfigDataCache latest = CACHE.get(serverCache.getGroup());
if (latest != serverCache) {
// 在判断当前的最新配置和客户端配置的Md5是否一致.
return !StringUtils.equals(clientMd5, latest.getMd5());
}
// 更新缓存数据
this.refreshLocalCache();
//拿到最新的配置
latest = CACHE.get(serverCache.getGroup());
//比对
return !StringUtils.equals(clientMd5, latest.getMd5());
} finally {
LOCK.unlock();
}
}
// 没有获取到锁,默认当成需要更新处理
return true;
}
上面的代码,看出了soul设计的代码的精妙之处
接着上面代码,1.1之后,会调用sendResponse(changedGroups);
void sendResponse(final List<ConfigGroupEnum> changedGroups) {
// 这里逻辑场景就是上面我们刚开始跟过来的DataChangeTask执行的run里面,对所有client的主动触发的场景,这里是想取消掉client的run执行时候的延迟动作,防止重复运行,具体原因还需要在往后看
if (null != asyncTimeoutFuture) {
asyncTimeoutFuture.cancel(false);
}
//生成response,aysncContext完成
generateResponse((HttpServletResponse) asyncContext.getResponse(), changedGroups);
asyncContext.complete();
}
通过上面的源码分析。我们现在主要有几个疑惑点:
- AsyncContext到底是干嘛的?
- 为什么是直接生成的Response返回?
- Client是什么时候添加到org.dromara.soul.admin.listener.http.HttpLongPollingDataChangedListener#clients里面的
带着这几个问题,我们在看下一篇文章