一 配置获取
- 获取目标配置集读锁,避免和并发写冲突
int lockResult = tryConfigReadLock(request, response, groupKey);
tryConfigReadLock调用 tryReadLock尝试获取读锁,最多尝试10次,每次获取失败sleep 1ms。
lockResult值 |
含义 |
0 |
表示不存在目标配置集 |
大于0 |
获取锁成功 |
小雨0 |
获取锁失败,已经有写锁。 |
static public int tryReadLock(String groupKey) {
CacheItem groupItem = CACHE.get(groupKey);
int result = (null == groupItem) ? 0 : (groupItem.rwLock.tryReadLock() ? 1 : -1);
if (result < 0) {
defaultLog.warn("[read-lock] failed, {}, {}", result, groupKey);
}
return result;
}
- 获取配置
根据key获取配置的相关信息CacheItem
非集群模式从数据库获取配置,集群模式从本地缓存文件获取配置
CacheItem cacheItem = ConfigService.getContentCache(groupKey);
md5 = cacheItem.getMd5();
lastModified = cacheItem.getLastModifiedTs();
if (STANDALONE_MODE && !PropertyUtil.isStandaloneUseMysql()) {
configInfoBase = persistService.findConfigInfo(dataId, group, tenant);
} else {
file = DiskUtil.targetFile(dataId, group, tenant);
}
二 配置变更
2.1异步通知
- 配置变更时,先更改数据库信息,同时更改配置表和添加一条配置历史记录。然后走异步通知所有服务端更新本地缓存文件配置。
addConfiTagsRelationAtomic(configId, configTags, configInfo.getDataId(), configInfo.getGroup(),
configInfo.getTenant());
insertConfigHistoryAtomic(0, configInfo, srcIp, srcUser, time, "I");
- 一次变更触发一个AsyncTask线程任务
遍历配置服务端列表,依次发送变更通知。
发送失败或当前服务端连接不稳定,则延迟重发。
- 延迟时间获取函数
private static int getDelayTime(NotifySingleTask task) {
int failCount = task.getFailCount();
int delay = MINRETRYINTERVAL + failCount * failCount * INCREASESTEPS;
if (failCount <= MAXCOUNT) {
task.setFailCount(failCount + 1);
}
return delay;
}
private static int MINRETRYINTERVAL = 500;
private static int INCREASESTEPS = 1000;
private static int MAXCOUNT = 6;
2.2 文件缓存
public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
boolean isBeta) {
String groupKey = GroupKey2.getKey(dataId, group, tenant);
dumpTaskMgr.addTask(groupKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
}
- DumpProcessor执行缓存任务
获取写锁
更新缓存配置文件
更新内存缓存cachItem的md5
ConfigService.dump(dataId, group, tenant, cf.getContent(), lastModified)
dump(String dataId, String group, String tenant, String content, long lastModifiedTs) {
final int lockResult = tryWriteLock(groupKey);
DiskUtil.saveToDisk(dataId, group, tenant, content);
updateMd5(groupKey, md5, lastModifiedTs);
}
2.3 变更监听响应
- 创建线程执行DataChangeTask
- final Queue<ClientLongPulling> allSubs,存储所有监听配置变更的客户端长轮询信息ClientLongPulling。
final Map<String, String> clientMd5Map;
客户端监听的配置集集合
- 遍历监听列表,监听当前变更的配置集,则发送响应
LongPollingService.DataChangeTask.run() {
for (Iterator<ClientLongPulling> iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPulling clientSub = iter.next();
if (clientSub.clientMd5Map.containsKey(groupKey)) {
iter.remove(); // 删除订阅关系
clientSub.sendResponse(Arrays.asList(groupKey));
}
}
}
- 组装变更的配置集信息,
asyncContext.complete();
发送响应
三 配置变更监听
3.1 短轮询
- 解析监听的配置集集合
- 检查客户端md5和当前缓存的cachItem的md5,放回变更配置集集合
3.2 长轮询
- 获取长轮询超时时间,和立即返回参数
- 非固定周期轮询,则先判断是否有配置变更或有LONG_PULLING_NO_HANG_UP_HEADER,有则立即返回变更的配置集集合
- http请求异步化
final AsyncContext asyncContext = req.startAsync()
- 启动异步线程ClientLongPulling
延迟线程,对超时无变更的客户端返回响应,避免请求超时。
延迟线程中,对isFixedPolling()
固定轮询的情况会先检查是否有配置变更再返回结果。
在allSubs中保存监听变更的客户端信息。 在2.3节配置变更时进行响应。
public void addLongPullingClient(HttpServletRequest req, HttpServletResponse rsp, Map<String, String> clientMd5Map,
int probeRequestSize) {
String str = req.getHeader(LongPollingService.LONG_PULLING_HEADER);
String noHangUpFlag = req.getHeader(LongPollingService.LONG_PULLING_NO_HANG_UP_HEADER);
String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
String tag = req.getHeader("Vipserver-Tag");
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
/**
* 提前500ms返回响应,为避免客户端超时 @qiaoyi.dingqy 2013.10.22改动 add delay time for LoadBalance
*/
long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
if (isFixedPolling()) {
timeout = Math.max(10000, getFixedPollingInterval());
// do nothing but set fix polling timeout
} else {
long start = System.currentTimeMillis();
List<String> changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
if (changedGroups.size() > 0) {
generateResponse(req, rsp, changedGroups);
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}",
System.currentTimeMillis() - start, "instant", RequestUtil.getRemoteIp(req), "polling",
clientMd5Map.size(), probeRequestSize, changedGroups.size());
return;
} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
LogUtil.clientLog.info("{}|{}|{}|{}|{}|{}|{}", System.currentTimeMillis() - start, "nohangup",
RequestUtil.getRemoteIp(req), "polling", clientMd5Map.size(), probeRequestSize,
changedGroups.size());
return;
}
}
String ip = RequestUtil.getRemoteIp(req);
// 一定要由HTTP线程调用,否则离开后容器会立即发送响应
final AsyncContext asyncContext = req.startAsync();
// AsyncContext.setTimeout()的超时时间不准,所以只能自己控制
asyncContext.setTimeout(0L);
scheduler.execute(
new ClientLongPulling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}
四 监听查询
4.1 获取监听指定配置集的客户端信息
- 入口
GET /v1/cs/configs/listener
获取监听配置集变更的客户端信息
- 遍历配置服务端列表,
GET /v1/cs/communication/configWatchers
从服务端的final Queue<ClientLongPulling> allSubs
中获取配置集的监听客户端信息
- 合并所有服务端数据。
4.2 获取指定客户端监听的所有配置集信息
- 入口
GET /v1/cs/listener
- 遍历服务端列表,
GET /v1/cs/communication/watcherConfigs
,从allSubs中获取客户端监听的所有配置集
- 合并所有服务端数据。
五 容量限制
- 使用aop方式对controller接口层的配置发布和删除进行控制。
- tenant_capacity 存储租户的配置容量
- group_capacity 存储分组的配置容量,若group_id=""则表示集群的配置容量
code | 说明
---- | ----
OVER_CLUSTER_QUOTA|超过集群配置个数上限
OVER_GROUP_QUOTA|超过该Group配置个数上限
OVER_TENANT_QUOTA|超过该租户配置个数上限
OVER_MAX_SIZE|超过配置的内容大小上限