Eureka源码浅读---注册表增量拉取

Eureka源码采用1.7.2版本

本人小白,此文为本人阅读源码笔记,如果您读到本文,您需要自己甄别是否正确,文中的说明只代表本人理解,不一定是正确的!!!

注册表的增量拉取主要依赖Eureka Client的定时缓存更新任务,在进行new DiscoveryClient()的时候进行了缓存定时任务更新的创建。

            //初始化支持缓存刷新的线程池
            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

任务的初始化在同方法的initScheduledTasks()中

com.netflix.discovery.DiscoveryClient#initScheduledTasks

         //初始化拉取定时表定时调度任务 30S
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            //定时抓取增量注册表
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            //缓存刷新线程
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }

此定时任务默认30S执行一次增量拉取

com.netflix.discovery.DiscoveryClient#refreshRegistry

最后还是调用的:

com.netflix.discovery.DiscoveryClient#fetchRegistry

            } else {
                //增量拉取注册表
                getAndUpdateDelta(applications);
            }
    private void getAndUpdateDelta(Applications applications) throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();

        Applications delta = null;
        EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            delta = httpResponse.getEntity();
        }

        //如果增量的注册表为空,则获取全量注册表
        if (delta == null) {
            logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                    + "Hence got the full registry.");
            getAndStoreFullRegistry();
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
            String reconcileHashCode = "";
            if (fetchRegistryUpdateLock.tryLock()) {
                try {
                    //合并本地注册表和拉取的增量注册表
                    updateDelta(delta);
                    //计算合并后的本地注册表的Hash值
                    reconcileHashCode = getReconcileHashCode(applications);
                } finally {
                    fetchRegistryUpdateLock.unlock();
                }
            } else {
                logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
            }
            // There is a diff in number of instances for some reason
            //如果合并后本地注册的Hash值与Server端的全量注册表的Hash值不相等,则进行全量拉取
            if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
                reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
            }
        } else {
            logger.warn("Not updating application delta as another thread is updating it already");
            logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
        }
    }

先不讨论服务端对增量拉取的处理逻辑,在客户端,主要处理逻辑如下:

  • http请求获取增量注册表信息
  • 判断增量拉取是否为空,如果为空走一次全量拉取
  • 如果增量拉取不为空,合并本地注册表和拉取的增量注册表,计算本地合并后的注册表,将注册中心的全量注册表的HashCode与本地合并后的HashCode进行比较,如果不相等,走一次全量拉取

接下来我们看看服务端对增量拉取的请求处理逻辑

com.netflix.eureka.resources.ApplicationsResource#getContainerDifferential

        //增量拉取的Key ALL_APPS_DELTA
        Key cacheKey = new Key(Key.EntityType.Application,
                ResponseCacheImpl.ALL_APPS_DELTA,
                keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
        );

        if (acceptEncoding != null
                && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
            return Response.ok(responseCache.getGZIP(cacheKey))
                    .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                    .header(HEADER_CONTENT_TYPE, returnMediaType)
                    .build();
        } else {
            return Response.ok(responseCache.get(cacheKey))
                    .build();
        }

可以看出也是走的多级缓存那一套机制

com.netflix.eureka.registry.ResponseCacheImpl#getValue

    @VisibleForTesting
    Value getValue(final Key key, boolean useReadOnlyCache) {
        Value payload = null;
        try {
            if (useReadOnlyCache) {
                //readOnlyCacheMap 只读缓存
                final Value currentPayload = readOnlyCacheMap.get(key);
                if (currentPayload != null) {
                    payload = currentPayload;
                } else {
                    //readWriteCacheMap 读写缓存
                    payload = readWriteCacheMap.get(key);
                    //再放入只读缓存中
                    readOnlyCacheMap.put(key, payload);
                }
            } else {
                payload = readWriteCacheMap.get(key);
            }
        } catch (Throwable t) {
            logger.error("Cannot get value for key :" + key, t);
        }
        return payload;
    }

但是有区别的是读写缓存中,缓存重建的方法是不同的

com.netflix.eureka.registry.ResponseCacheImpl#generatePayload

                    } else if (ALL_APPS_DELTA.equals(key.getName())) {
                        if (isRemoteRegionRequested) {
                            tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
                            versionDeltaWithRegions.incrementAndGet();
                            versionDeltaWithRegionsLegacy.incrementAndGet();
                            payload = getPayLoad(key,
                                    registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
                        } else {
                            tracer = serializeDeltaAppsTimer.start();
                            versionDelta.incrementAndGet();
                            versionDeltaLegacy.incrementAndGet();
                            payload = getPayLoad(key, registry.getApplicationDeltas());
                        }
                    } 

最终指向的是这个方法

com.netflix.eureka.registry.AbstractInstanceRegistry#getApplicationDeltasFromMultipleRegions

    public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions) {
        if (null == remoteRegions) {
            remoteRegions = allKnownRemoteRegions; // null means all remote regions.
        }

        boolean includeRemoteRegion = remoteRegions.length != 0;

        if (includeRemoteRegion) {
            GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS_DELTA.increment();
        } else {
            GET_ALL_CACHE_MISS_DELTA.increment();
        }

        Applications apps = new Applications();
        apps.setVersion(responseCache.getVersionDeltaWithRegions().get());
        Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
        try {
            write.lock();
            //获取到最近修改队列中的服务实例的 3分钟内变化的实例
            Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
            logger.debug("The number of elements in the delta queue is :" + this.recentlyChangedQueue.size());
            while (iter.hasNext()) {
                Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
                InstanceInfo instanceInfo = lease.getHolder();
                Object[] args = {instanceInfo.getId(),
                        instanceInfo.getStatus().name(),
                        instanceInfo.getActionType().name()};
                logger.debug("The instance id %s is found with status %s and actiontype %s", args);
                Application app = applicationInstancesMap.get(instanceInfo.getAppName());
                if (app == null) {
                    app = new Application(instanceInfo.getAppName());
                    applicationInstancesMap.put(instanceInfo.getAppName(), app);
                    apps.addApplication(app);
                }
                app.addInstance(decorateInstanceInfo(lease));
            }

            if (includeRemoteRegion) {
                for (String remoteRegion : remoteRegions) {
                    RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
                    if (null != remoteRegistry) {
                        Applications remoteAppsDelta = remoteRegistry.getApplicationDeltas();
                        if (null != remoteAppsDelta) {
                            for (Application application : remoteAppsDelta.getRegisteredApplications()) {
                                if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
                                    Application appInstanceTillNow =
                                            apps.getRegisteredApplications(application.getName());
                                    if (appInstanceTillNow == null) {
                                        appInstanceTillNow = new Application(application.getName());
                                        apps.addApplication(appInstanceTillNow);
                                    }
                                    for (InstanceInfo instanceInfo : application.getInstances()) {
                                        appInstanceTillNow.addInstance(instanceInfo);
                                    }
                                }
                            }
                        }
                    }
                }
            }

            Applications allApps = getApplicationsFromMultipleRegions(remoteRegions);
            //获取全量注册表的Hash值
            apps.setAppsHashCode(allApps.getReconcileHashCode());
            return apps;
        } finally {
            write.unlock();
        }
    }

方法的核心就是两步

  • 从recentlyChangedQueue获取到变化实例,封装成Applications
  • 计算当前注册表的HashCode,并赋值给Applications

由此可以看出,客户端获取到的增量注册表就是从recentlyChangedQueue中获取的。

recentlyChangedQueue 最近修改队列,全文搜索下可以发现,如果出现实例状态变化都会加入该队列中

  1. 注册
                if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
                    }
                    registrant.setActionType(ActionType.ADDED);
                    //加入到最近修改队列中
                    recentlyChangedQueue.add(new RecentlyChangedItem(lease));
                    registrant.setLastUpdatedTimestamp();
  1. 下线\故障移除
                if (instanceInfo != null) {
                    //设置服务实例的行为 删除
                    instanceInfo.setActionType(ActionType.DELETED);
                    //加入最近修改的队列中
                    recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                    instanceInfo.setLastUpdatedTimestamp();
                    vip = instanceInfo.getVIPAddress();
                    svip = instanceInfo.getSecureVipAddress();
                }
  1. 状态变更
                if (replicaDirtyTimestamp > info.getLastDirtyTimestamp()) {
                        info.setLastDirtyTimestamp(replicaDirtyTimestamp);
                    }
                    info.setActionType(ActionType.MODIFIED);
                    recentlyChangedQueue.add(new RecentlyChangedItem(lease));
                    info.setLastUpdatedTimestamp();

那么recentlyChangedQueue就保存着所有修改的服务实例,但是该队列的元素也是有有效期的,主要靠定时过期任务完成的。

com.netflix.eureka.registry.AbstractInstanceRegistry#AbstractInstanceRegistry

        //初始化心跳计数器 和自我保护机制相关
        this.renewsLastMin = new MeasuredRate(1000 * 60 * 1);

        //30s执行一次recentlyChangedQueue的清理
        this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
                serverConfig.getDeltaRetentionTimerIntervalInMs(),
                serverConfig.getDeltaRetentionTimerIntervalInMs());
    //定时清理recentlyChangedQueue表中的过期变动实例
    private TimerTask getDeltaRetentionTask() {
        return new TimerTask() {

            @Override
            public void run() {
                Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
                while (it.hasNext()) {
                    //时间比对 保留最近3分钟的数据 默认
                    if (it.next().getLastUpdateTime() <
                            System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
                        it.remove();
                    } else {
                        break;
                    }
                }
            }

        };
    }

默认只保留最近3分钟的变动实例

简单流程图

Eureka增量注册表拉取简易流程.png

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容