soul从入门到放弃17--长轮询同步(二)

一、前戏

上篇以服务端(soul-admin)视角,分析了增量同步(推),数据监听被拉的过程。 本篇以客户端(soul-bootstrap)视角,分析下他是怎么拉数据的。

二、网关启动拉去数据

  • 启动配置与初始化

soul-spring-boot-starter-sync-data-http项目下HttpSyncDataConfiguration$httpSyncDataService方法

  • 启动开启线程

HttpSyncDataService中会调用start方法,会全量拉去信息 + 开启线程池监听数据变化。

private void start() {
    // 防止启动多次的cas锁操作
    if (RUNNING.compareAndSet(false, true)) {
        // 启动时,全量拉取数据
        this.fetchGroupConfig(ConfigGroupEnum.values());
        int threadSize = serverList.size();
        // 根据soul-admin数量开启相同线程数的线程线程池,轮询监听
        this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(),
                SoulThreadFactory.create("http-long-polling", true));
        // 开启监听线程,每一个线程监听soul-admin
        this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
    } else {
        log.info("soul http long polling was started, executor=[{}]", executor);
    }
}
  • 循环向所有soul-admin去拉去数据,具体拉数据的请求逻辑在doFetchGroupConfig
private void fetchGroupConfig(final ConfigGroupEnum... groups) throws SoulException {
    for (int index = 0; index < this.serverList.size(); index++) {
        String server = serverList.get(index);
        try {
            this.doFetchGroupConfig(server, groups);
            break;
        } catch (SoulException e) {
            // no available server, throw exception.
            if (index >= serverList.size() - 1) {
                throw e;
            }
            log.warn("fetch config fail, try another one: {}", serverList.get(index + 1));
        }
    }
}
  • doFetchGroupConfig中对单一soul-admin的多组,进行循环调用后台的 /configs/fetch 接口, 拿到某个类型的数据, 并更新缓存。
  • 更新缓存前会检测是否变动, 如果变动则结束, 数据未发生变动、远端数据过期则睡眠30s ,由于是第一次启动, 数据为空的情况下肯定会更新缓存, 所以会直接结束)
private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
    StringBuilder params = new StringBuilder();
    // 根据ConfigGroupEnum循环拉取每组的数据,组指的是plugin、rule、selector 等
    for (ConfigGroupEnum groupKey : groups) {
        params.append("groupKeys").append("=").append(groupKey.name()).append("&");
    }
    // 请求后台soul-admin的地址
    String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");
    log.info("request configs: [{}]", url);
    String json = null;
    try {
        json = this.httpClient.getForObject(url, String.class);
    } catch (RestClientException e) {
        String message = String.format("fetch config fail from server[%s], %s", url, e.getMessage());
        log.warn(message);
        throw new SoulException(message, e);
    }
    // 后台拉取数据成功,更新缓存数据,并返回是否更新成功
    boolean updated = this.updateCacheWithJson(json);
    if (updated) {
        log.info("get latest configs: [{}]", json);
        return;
    }
    // 没有发生更新,或是远端已经过期,则睡30s
    log.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server);
    ThreadUtils.sleep(TimeUnit.SECONDS, 30);
}
  • updateCacheWithJson反序列化json
/**
 * update local cache.
 * @param json the response from config server.
 * @return true: the local cache was updated. false: not updated.
 */
private boolean updateCacheWithJson(final String json) {
    JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);
    JsonObject data = jsonObject.getAsJsonObject("data");
    // 调用更新方法
    return factory.executor(data);
}
  • DataRefreshFactory$executor,调用各类型数据刷新类
public boolean executor(final JsonObject data) {
    final boolean[] success = {false};
    // 所有数据类型循环的 DataRefresh 全调用
    ENUM_MAP.values().parallelStream().forEach(dataRefresh -> success[0] = dataRefresh.refresh(data));
    return success[0];
  } 
  • AbstractDataRefresh$refresh, 调用各AbstractDataRefresh实现类的updateCacheIfNeed判断数据是否有更新,如果有更新调用各自的refresh方法更新数据
  • refresh中调用响应的数据更新事件
public Boolean refresh(final JsonObject data) {
    boolean updated = false;
    JsonObject jsonObject = convert(data);
    if (null != jsonObject) {
        ConfigData<T> result = fromJson(jsonObject);
        // 判断不同类型数据是否更新
        if (this.updateCacheIfNeed(result)) {
            updated = true;
            // 实际刷新逻辑
            refresh(result.getData());
        }
    }
    return updated;
}
image
image

三、网关轮询监听变化

  • 网关启动时调用start方法,start中开启线程池调用HttpLongPollingTask
  • 在HttpLongPollingTask$run中循环调用doLongPolling,实现重试3次的调用。每次低啊用失败,则睡五秒再调用。
public void run() {
    while (RUNNING.get()) {
    // 循环重试测试调用
        for (int time = 1; time <= retryTimes; time++) {
            try {
                doLongPolling(server);
            } catch (Exception e) {
                // 调用失败,就沉睡5秒,进行重试再次调用
                if (time < retryTimes) {
                    log.warn("Long polling failed, tried {} times, {} times left, will be suspended for a while! {}",
                            time, retryTimes - time, e.getMessage());
                    ThreadUtils.sleep(TimeUnit.SECONDS, 5);
                    continue;
                }
                // 
                log.error("Long polling failed, try again after 5 minutes!", e);
                ThreadUtils.sleep(TimeUnit.MINUTES, 5);
            }
        }
    }
    log.warn("Stop http long polling.");
}
  • HttpLongPollingTask$doLongPolling,发送数据更新签名到soul-admin进行验证,判断数据是否更新,如果已更新,网关得到相应结果,则调用数据拉去方法进行更新
private void doLongPolling(final String server) {
  // 从缓存中获取数据
  MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);
  for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
    ConfigData<?> cacheConfig = factory.cacheConfigData(group);
    // 生成本地数据更新签名,用于与远端验证,减少传输数据量
    String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));
    params.put(group.name(), Lists.newArrayList(value));
  }
  // 构建 http 请求信息
  HttpHeaders headers = new HttpHeaders();
  headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
  HttpEntity httpEntity = new HttpEntity(params, headers);
  String listenerUrl = server + "/configs/listener";
  log.debug("request listener configs: [{}]", listenerUrl);
  JsonArray groupJson = null;
  try {
    String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();
    groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");
  } catch (RestClientException e) {
    String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());
    throw new SoulException(message, e);
  }
  // 得到变化的类型
  if (groupJson != null) {
    ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);
    if (ArrayUtils.isNotEmpty(changedGroups)) {
      log.info("Group config changed: {}", Arrays.toString(changedGroups));
      // 拉取后台对应类型的数据
      this.doFetchGroupConfig(server, changedGroups);
    }
  }
}

四、小结

  • 通过这两篇的分析,发现数据更新主要是三个触发点

1.网关启动时的全量拉

2.网关定时检验数据更新,增量拉

3.后台管理的某类型数据变化,远端推送

  • 日拱一卒,收拾下上班
  • soul的四大块: 数据同步、插件、客户端注解、指标监控,下一个写点啥?
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,033评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,725评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,473评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,846评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,848评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,691评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,053评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,700评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,856评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,676评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,787评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,430评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,034评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,990评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,218评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,174评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,526评论 2 343

推荐阅读更多精彩内容

  • 零、概述 用于工作、学习的需要开始学习soul,希望能坚持下去,不像专栏的名字一样 ------ soul从入门到...
    滴流乱转的小胖子阅读 1,507评论 0 0
  • 2021.2.3 工作总结 1、午会,卫生打扫 2、微信查找小学数学老师,安排刘思琦家教数学课,家长工作很忙,白天...
    雨滴教育Aria阅读 288评论 0 0
  • 我们知道协议转换也是API网关常见的一个功能,这次我们看下soul网关是如何实现协议转换的。 请求流图 大致流程:...
    niuxin阅读 796评论 0 0
  • 久违的晴天,家长会。 家长大会开好到教室时,离放学已经没多少时间了。班主任说已经安排了三个家长分享经验。 放学铃声...
    飘雪儿5阅读 7,485评论 16 22
  • 今天感恩节哎,感谢一直在我身边的亲朋好友。感恩相遇!感恩不离不弃。 中午开了第一次的党会,身份的转变要...
    迷月闪星情阅读 10,551评论 0 11