Eurake源码分析(十一) 增量获取

下面我们来说一下eureka的增量获取。
Applications.appsHashCode ,应用集合一致性哈希码。

增量获取注册的应用集合( Applications ) 时,Eureka-Client 会获取到:

Eureka-Server 近期变化( 注册、下线 )的应用集合
Eureka-Server 应用集合一致性哈希码
Eureka-Client 将变化的应用集合和本地缓存的应用集合进行合并后进行计算本地的应用集合一致性哈希码。若两个哈希码相等,意味着增量获取成功;若不相等,意味着增量获取失败,Eureka-Client 重新和 Eureka-Server 全量获取应用集合。
计算公式
appsHashCode = status+count

使用每个应用实例状态( status ) + 数量( count )拼接出一致性哈希码。若数量为 0 ,该应用实例状态不进行拼接。状态以字符串大小排序。

举个例子,8 个 UP ,0 个 DOWN ,则 appsHashCode = UP_8_ 。8 个 UP ,2 个 DOWN ,则 appsHashCode = DOWN_2_UP_8_ 。
看下Applications的getReconcileHashCode方法

public String getReconcileHashCode() {
    // 计数集合 key:应用实例状态
    TreeMap<String, AtomicInteger> instanceCountMap = new TreeMap<String, AtomicInteger>();
    populateInstanceCountMap(instanceCountMap);
    // 计算 hashcode
    return getReconcileHashCode(instanceCountMap);
}

调用 populateInstanceCountMap方法,计算每个应用实例状态的数量,看下具体的实现

public void populateInstanceCountMap(Map<String, AtomicInteger> instanceCountMap) {
    for (Application app : this.getRegisteredApplications()) {
        for (InstanceInfo info : app.getInstancesAsIsFromEureka()) {
            AtomicInteger instanceCount = instanceCountMap.computeIfAbsent(info.getStatus().name(),
                    k -> new AtomicInteger(0));
            instanceCount.incrementAndGet();
        }
    }
}

调用 getReconcileHashCode方法,计算 hashcode,看下具体的实现

public static String getReconcileHashCode(Map<String, AtomicInteger> instanceCountMap) {
    StringBuilder reconcileHashCode = new StringBuilder(75);
    for (Map.Entry<String, AtomicInteger> mapEntry : instanceCountMap.entrySet()) {
        reconcileHashCode.append(mapEntry.getKey()).append(STATUS_DELIMITER) // status
                .append(mapEntry.getValue().get()).append(STATUS_DELIMITER); // count
    }
    return reconcileHashCode.toString();
}

调用 DiscoveryClient的getAndUpdateDelta方法,增量获取注册信息,并刷新本地缓存,看下具体的实现

private void getAndUpdateDelta(Applications applications) throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();

    // 增量获取注册信息
    Applications delta = null;
    EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        delta = httpResponse.getEntity();
    }

    if (delta == null) {
        // 增量获取为空,全量获取
        logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                + "Hence got the full registry.");
        getAndStoreFullRegistry();
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
        String reconcileHashCode = "";
        if (fetchRegistryUpdateLock.tryLock()) {
            try {
                // 将变化的应用集合和本地缓存的应用集合进行合并
                updateDelta(delta);
                // 计算本地的应用集合一致性哈希码
                reconcileHashCode = getReconcileHashCode(applications);
            } finally {
                fetchRegistryUpdateLock.unlock();
            }
        } else {
            logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
        }
        // There is a diff in number of instances for some reason
        if (!reconcileHashCode.equals(delta.getAppsHashCode()) // 一致性哈希值不相等
                || clientConfig.shouldLogDeltaDiff()) { //
            reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
        }
    } else {
        logger.warn("Not updating application delta as another thread is updating it already");
        logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
    }
}

调用 updateDelta方法,将变化的应用集合和本地缓存的应用集合进行合并,看下具体的实现

private void updateDelta(Applications delta) {
    int deltaCount = 0;
    for (Application app : delta.getRegisteredApplications()) { // 循环增量(变化)应用集合
        for (InstanceInfo instance : app.getInstances()) {
            Applications applications = getApplications();
            // TODO[0009]:RemoteRegionRegistry
            String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
            if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
                Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
                if (null == remoteApps) {
                    remoteApps = new Applications();
                    remoteRegionVsApps.put(instanceRegion, remoteApps);
                }
                applications = remoteApps;
            }

            ++deltaCount;
            if (ActionType.ADDED.equals(instance.getActionType())) { // 添加
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp == null) {
                    applications.addApplication(app);
                }
                logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
                applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
            } else if (ActionType.MODIFIED.equals(instance.getActionType())) { // 修改
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp == null) {
                    applications.addApplication(app);
                }
                logger.debug("Modified instance {} to the existing apps ", instance.getId());

                applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
            } else if (ActionType.DELETED.equals(instance.getActionType())) { // 删除
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp == null) {
                    applications.addApplication(app);
                }
                logger.debug("Deleted instance {} to the existing apps ", instance.getId());
                applications.getRegisteredApplications(instance.getAppName()).removeInstance(instance);
            }
        }
    }
    logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);

    getApplications().setVersion(delta.getVersion());
    // 过滤、打乱应用集合
    getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());

    // TODO[0009]:RemoteRegionRegistry
    for (Applications applications : remoteRegionVsApps.values()) {
        applications.setVersion(delta.getVersion());
        applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
    }
}

ApplicationsResource,处理所有应用的请求操作的 Resource ( Controller )。

接收增量获取请求,映射 ApplicationsResource#getContainers() 方法。
AbstractInstanceRegistry.recentlyChangedQueue,最近租约变更记录队列。看下具体的实现

/**
 * 最近租约变更记录队列
 */
private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue = new ConcurrentLinkedQueue<RecentlyChangedItem>();
/**
 * 最近租约变更记录
 */
private static final class RecentlyChangedItem {
    /**
     * 最后更新时间戳
     */
    private long lastUpdateTime;
    /**
     * 租约
     */
    private Lease<InstanceInfo> leaseInfo;

    public RecentlyChangedItem(Lease<InstanceInfo> lease) {
        this.leaseInfo = lease;
        lastUpdateTime = System.currentTimeMillis();
    }

    public long getLastUpdateTime() {
        return this.lastUpdateTime;
    }

    public Lease<InstanceInfo> getLeaseInfo() {
        return this.leaseInfo;
    }
}

当应用实例注册、下线、状态变更时,创建最近租约变更记录( RecentlyChangedItem ) 到队列。

后台任务定时顺序扫描队列,当 lastUpdateTime 超过一定时长后进行移除。

private TimerTask getDeltaRetentionTask() {
    return new TimerTask() {

        @Override
        public void run() {
            Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
            while (it.hasNext()) {
                RecentlyChangedItem item = it.next();
                if (item.getLastUpdateTime() < System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
                    it.remove();
                } else {
                    break;
                }
            }
        }

    };
}

配置 eureka.deltaRetentionTimerIntervalInMs, 移除队列里过期的租约变更记录的定时任务执行频率,单位:毫秒。默认值 :30 * 1000 毫秒。
配置 eureka.retentionTimeInMSInDeltaQueue,租约变更记录过期时长,单位:毫秒。默认值 : 3 * 60 * 1000 毫秒。
在 generatePayload方法里,调用 AbstractInstanceRegistry的getApplicationDeltas方法,获取近期变化的应用集合,看下具体的实现

public Applications getApplicationDeltas() {
    // 添加 增量获取次数 到 监控
    GET_ALL_CACHE_MISS_DELTA.increment();
    // 初始化 变化的应用集合
    Applications apps = new Applications();
    apps.setVersion(responseCache.getVersionDelta().get());
    Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
    try {
        // 获取写锁
        write.lock();
        // 获取 最近租约变更记录队列
        Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
        logger.debug("The number of elements in the delta queue is :" + this.recentlyChangedQueue.size());
        // 拼装 变化的应用集合
        while (iter.hasNext()) {
            Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
            InstanceInfo instanceInfo = lease.getHolder();
            Object[] args = {instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name()};
            logger.debug("The instance id %s is found with status %s and actiontype %s", args);
            Application app = applicationInstancesMap.get(instanceInfo.getAppName());
            if (app == null) {
                app = new Application(instanceInfo.getAppName());
                applicationInstancesMap.put(instanceInfo.getAppName(), app);
                apps.addApplication(app);
            }
            app.addInstance(decorateInstanceInfo(lease));
        }

        // TODO[0009]:RemoteRegionRegistry
        boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();
        if (!disableTransparentFallback) {
            Applications allAppsInLocalRegion = getApplications(false);

            for (RemoteRegionRegistry remoteRegistry : this.regionNameVSRemoteRegistry.values()) {
                Applications applications = remoteRegistry.getApplicationDeltas();
                for (Application application : applications.getRegisteredApplications()) {
                    Application appInLocalRegistry =
                            allAppsInLocalRegion.getRegisteredApplications(application.getName());
                    if (appInLocalRegistry == null) {
                        apps.addApplication(application);
                    }
                }
            }
        }

        // 获取全量应用集合,通过它计算一致性哈希值
        Applications allApps = getApplications(!disableTransparentFallback);
        apps.setAppsHashCode(allApps.getReconcileHashCode());
        return apps;
    } finally {
        write.unlock();
    }
}

eureka的增量获取过程就完成了。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,185评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,652评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,524评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,339评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,387评论 6 391
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,287评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,130评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,985评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,420评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,617评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,779评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,477评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,088评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,716评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,857评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,876评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,700评论 2 354

推荐阅读更多精彩内容