Eureka源码之二:服务注册

Eureka源码剖析之一:初始化-启动

EurekaClient在启动时会进行一系列初始化操作,本篇文章主要解析EurekaClient端向EurekaServer端发起注册请求的具体过程,具体分为EurekaClient端发送请求和EurekaServer端接收请求。在较新的版本看到代码进行了优化,所以还是以某一版本为准进行剖析。这里是1.X版本最新版本1.9.15。

〓Eureka Client发出注册请求

    // DiscoveryClient构造器中:     if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
    //配置了开启注册和强制注册初始化时,则会立马调用注册方法         try {
            if (!register() ) {
                throw new IllegalStateException("Registration error at startup. Invalid server response.");
            }
        } catch (Throwable th) {
            logger.error("Registration error at startup: {}", th.getMessage());
            throw new IllegalStateException(th);
        }
    }

    -----------------------------------
    // 实例信息复制器初始化,并且启动。     instanceInfoReplicator = new InstanceInfoReplicator(
            this,
            instanceInfo,
            clientConfig.getInstanceInfoReplicationIntervalSeconds(),
            2); // burstSize     instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    // InstanceInfoReplicator.java实现了RUNNABLE,接着看start方法     public void start(int initialDelayMs) {
        if (started.compareAndSet(false, true)) {
            instanceInfo.setIsDirty();  // for initial register             // 30秒后执行当前对象任务             Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

    public void run() {
        try {
            // 刷新实例信息             discoveryClient.refreshInstanceInfo();
            // 判断实例信息是否发生改变             // isInstanceInfoDirty默认为false,实例信息发生改变时会被设为true             // 被设为ture,则dirtyTimestamp不为null,则发起注册请求             Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                // 注册                 discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            // 40秒后执行             Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

    ------------------------------------
     /**
     * 通过REST调用注册eureka服务
     */     boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        // 根据响应状态码204判断是否注册成功         return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
    }   

Eureka Client启动时会进行注册:①根据配置是否立即进行注册请求;②启动实例信息复制器时会延迟30秒进行注册操作,之后每隔40秒延迟之后判断实例信息是否改变而继续执行。

〓Eureka Server接收注册请求

// 从eureka-core包看EurekaBootStrap可以看出这是服务端启动的入口, // 不出意外看到EurekaBootStrap继承了ServletContextListener, // 从而得知随着Web容器的启动,Eureka服务端也同时启动,并且进行了一系列的初始化动作, // 下面看下PeerAwareInstanceRegistryImpl这个类,这是后端服务的核心类。 // 查看其register方法,跟踪进去可以发现是被ApplicationResource调用, // 也说明ApplicationResource提供了给客户端进行注册的http接口,跟上面client端请求http接口对应上了。 public class ApplicationResource {
    ...

    @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        // validate that the instanceinfo contains all the necessary required fields         // 参数校验         if (isBlank(info.getId())) {
            return Response.status(400).entity("Missing instanceId").build();
        } else if (isBlank(info.getHostName())) {
            return Response.status(400).entity("Missing hostname").build();
        } else if (isBlank(info.getIPAddr())) {
            return Response.status(400).entity("Missing ip address").build();
        } else if (isBlank(info.getAppName())) {
            return Response.status(400).entity("Missing appName").build();
        } else if (!appName.equals(info.getAppName())) {
            return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
        } else if (info.getDataCenterInfo() == null) {
            return Response.status(400).entity("Missing dataCenterInfo").build();
        } else if (info.getDataCenterInfo().getName() == null) {
            return Response.status(400).entity("Missing dataCenterInfo Name").build();
        }

        // handle cases where clients may be registering with bad DataCenterInfo with missing data         DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
        if (dataCenterInfo instanceof UniqueIdentifier) {
            String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
            if (isBlank(dataCenterInfoId)) {
                boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                if (experimental) {
                    String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                    return Response.status(400).entity(entity).build();
                } else if (dataCenterInfo instanceof AmazonInfo) {
                    AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                    String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                    if (effectiveId == null) {
                        amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                    }
                } else {
                    logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                }
            }
        }
        // 服务端处理注册的入口         registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible     }
    ...
 }

 // 服务端注册 peerAwareInstanceRegistryImpl#register  public void register(final InstanceInfo info, final boolean isReplication) {
        // 默认租约时间90秒         // 自己设置的租约有效时间         int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        // 注册         super.register(info, leaseDuration, isReplication);
        // 注册信息复制到其它eureka节点         replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    } // 父类注册:AbstractInstanceRegistry#register,指定续约时间进行注册 public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            // 获得读锁             read.lock();
            // 从注册表中获取实例信息映射             // 根据实例名称获取租约信息             // registry是ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>存储结构             // 由于同一名称的实例有可能有多个,所以registry的Key设计为实例的名称,具体的实例信息用Value里的Id区分             // Key为实例集合的名称,Value是一个Map,Value的Key是实例的Id,Value的Value是实例的租约信息             Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            // 注册计数             REGISTER.increment(isReplication);
            // 首次注册,初始化对应注册表信息             if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            // 获得存在的续约实例信息             Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // Retain the last dirty timestamp without overwriting it, if there is already a lease             if (existingLease != null && (existingLease.getHolder() != null)) {
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

                // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted                 // InstanceInfo instead of the server local copy.                 // 如果EurekaServer端已存在实例信息且时间戳大于新传入实例信息中的时间戳,则用EurekaServer端的数据替换新传入的实例信息                 // 防止过期的实例信息注册到EurekaServer端                 if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                }
            } else {
                // The lease does not exist and hence it is a new registration                 // 续约信息不存在                 synchronized (lock) {
                    // 自我保护机制                     if (this.expectedNumberOfClientsSendingRenews > 0) {
                        // Since the client wants to register it, increase the number of clients sending renews                         // 客户端发送续约数增1,后更新其阈值                         this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                        updateRenewsPerMinThreshold();
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            // 包装租赁实例             Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            // 将租约实例放在内存中             gMap.put(registrant.getId(), lease);
            // 将注册信息添加到近期注册队列             recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));
            // This is where the initial state transfer of overridden status happens             // 这是发生覆盖状态的初始状态转移的地方,将覆盖状态进行存储到内存中,即最终状态             if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "                                 + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            // 更新注册者的覆盖状态             InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

            // Set the status based on the overridden status rules             // 基于覆盖状态规则设置状态             InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            // If the lease is registered with UP status, set lease service up timestamp             if (InstanceStatus.UP.equals(registrant.getStatus())) {
                // 设置服务启动时间                 lease.serviceUp();
            }
            // 设置行为类型为新增             registrant.setActionType(ActionType.ADDED);
            // 最近变化续约队列增加当前续约实例信息             recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            // 设置最后更新时间戳             registrant.setLastUpdatedTimestamp();
            // 使响应缓存失效             invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            // 释放读锁             read.unlock();
        }
    }

     /**
     * Replicates all eureka actions to peer eureka nodes except for replication
     * traffic to this node.
     * 复制全部eureka操作复制到其它对等节点,除了当前节点本身
     */     private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                // 复制计数                 numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication             // 已经复制过则无需再次复制             if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }
            // 遍历eureka节点             for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.                 // 排除复制自身                 if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                // 复制实例行为到对等节点                 replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }

    /**
     * Replicates all instance changes to peer eureka nodes except for
     * replication traffic to this node.
     *
     */     private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry;
            CurrentRequestVersion.set(Version.V2);
             // 复制行为             switch (action) {
                case Cancel: // 取消                     node.cancel(appName, id);
                    break;
                case Heartbeat:  // 心跳                     InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:  // 注册                     node.register(info);
                    break;
                case StatusUpdate: // 状态更新                     infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride: // 删除                     infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        } finally {
            CurrentRequestVersion.remove();
        }
    }

    // 节点注册:PeerEurekaNode#register     public void register(final InstanceInfo info) throws Exception {
        long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
        // 任务批次分发器         batchingDispatcher.process(
                // 生成任务id                 taskId("register", info),
                // 复制任务                 new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                    public EurekaHttpResponse<Void> execute() {
                        // 使用HTTP调用注册接口                         return replicationClient.register(info);
                    }
                },
                expiryTime
        );
    }

    // 使用Jersey调用注册接口:AbstractJerseyEurekaHttpClient#register     public EurekaHttpResponse<Void> register(InstanceInfo info) {
        String urlPath = "apps/" + info.getAppName();
        ClientResponse response = null;
        try {
            Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
            addExtraHeaders(resourceBuilder);
            response = resourceBuilder
                    .header("Accept-Encoding", "gzip")
                    .type(MediaType.APPLICATION_JSON_TYPE)
                    .accept(MediaType.APPLICATION_JSON)
                    .post(ClientResponse.class, info);
            return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                        response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    } 

总结

■Eureka Client
1)注册1:DisoverClient构造器里会根据是否开启立即调用注册Eureka Server接口进行注册;
2)注册2:实例信息复制器启动时会使用定时调度30秒延迟调用注册方法,如果实例信息发生改变,则之后会每隔40秒进行重新注册;
3)注册方式:使用Jersey HTTP向Eureka Server发起注册请求。
■Eureka Server
1)启动:EurekaBootStrap启动入口,继承ServletContextListener。核心类:PeerAwareInstanceRegistryImpl;
2)Eureka Server注册逻辑:服务端本地缓存(队列)进行处理注册,比如:状态的更新、添加到近期变化队列等;
3)复制:集群环境下,节点之间需要进行注册信息的同步。除了本身节点,将会复制注册信息到其它对等节点,最后同样是使用Jersey进行请求同步注册。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343

推荐阅读更多精彩内容