注册debug追踪到InstanceRegistry
ApplicationResource #addInstance
registry.register(info, "true".equals(isReplication));
InstanceRegistry #
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
//springcloud发布一个事件EurekaInstanceRegisteredEvent
handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
//调用父类的register
super.register(info, isReplication);
}
PeerAwareInstanceRegistryImpl #
/**
* Registers the information about the {@link InstanceInfo} and replicates
* this information to all peer eureka nodes. If this is replication event
* from other replica nodes then it is not replicated.
*
* @param info
* the {@link InstanceInfo} to be registered and replicated.
* @param isReplication
* true if this is a replication event from other replica nodes,
* false otherwise.
*/
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
//服务过期时间默认90秒
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
//服务注册
super.register(info, leaseDuration, isReplication);
//集群同步
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
AbstractInstanceRegistry#register 服务注册
我们知道spring容器底层就是一个ConcurrentHashMap,那么eureka的底层注册是什么样的数据结构呢?没错一定也是一个map.
ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap();
registry的数据结构如上等价于<applicationName, Map<instanceId, Lease>>,为了进行相同服务的集群话,为上一层模块进行调用时方便负载均衡.
/**
* Registers a new instance with a given duration.
*
* @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
*/
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
//根据应用名称获取对应的服务,因为微服务的application name可以相同,
//服务实例instance id是不同的(方便集群,为负载均衡作准备),
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
//如果上面存在相同的服务的application name的微服务,那么就根据对应的服务的实例instance id来区分
//尝试通过id拿到一个微服务实例,
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
//已经存在的微服务实例最后修改时间
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
//要注册的微服务实例最后修改时间
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
//如果已存的微服务时间>要注册的(时间越大说明操作越新),用已存的覆盖要注册的
//即如果出现冲突的话拿最新的微服务实例
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
//期待发送心跳的客户端数量
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to register it, increase the number of clients sending renews
//要注册进来了,默认心跳30秒一次,每次心跳在原基础上加一,一分钟2次所以加2
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
//如果if(gMap == null)都没有进,说明微服务组内已经有微服务了,直接put(id,instance)即可
gMap.put(registrant.getId(), lease);
synchronized (recentRegisteredQueue) {
//最近注册队列添加此微服务
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
//标记微服务实例ADDED
registrant.setActionType(ActionType.ADDED);
//最近改变队列添加此微服务,此队列会保存近三分钟有改动的微服务,用于增量更新
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
//设置最后更新的时间戳
registrant.setLastUpdatedTimestamp();
// 放入缓存中
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
updateRenewsPerMinThreshold()
protected void updateRenewsPerMinThreshold() {
this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
* (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
* serverConfig.getRenewalPercentThreshold());
}
expectedNumberOfClientsSendingRenews:期待发送心跳的客户端数量
ExpectedClientRenewalIntervalSeconds:期待客户端发送心跳的间隔秒数
RenewalPercentThreshold:续期的百分比阈值85%
numberOfRenewsPerMinThreshold:客户端每分钟发送心跳数的阈值,如果server在一分钟内没有收到这么多的心跳数就会触发自我保护机制
举个例子就明白了:
假设有100个客户端,发送心跳间隔为30s,那么一分钟如果全部正常的话server收到的心跳应该是200次,
如果server一分钟收到的心跳<200*85%,即170个触发自我保护机制
/**
* Replicates all eureka actions to peer eureka nodes except for replication
* traffic to this node.
*
*/
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
// 判断是否是集群同步请求,如果是,则记录最后一分钟的同步次数
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
// 集群节点为空,或者这是一个Eureka Server 同步请求,直接return
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
// 循环相邻的Eureka Server Node, 分别发起请求同步
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
// 发起同步请求
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
Eureka Server集群数据同步
数据同步
Eureka Server之间会互相进行注册,构建Eureka Server集群,不同Eureka Server之间会进行服务同步,用来保证服务信息的一致性。当服务提供者发送注册请求到一个服务注册中心时, 它会将该请求转发给集群中相连的其他注册中心, 从而实现注册中心之间的服务同步。通过服务同步,两个服务提供者的服务信息就可以通过这两台服务注册中心中的任意一台获取到。
源码解析
Eureka Server 集群不区分主从节点,所有节点相同角色(也就是没有角色),完全对等。
提供集群功能的包路径:com.netflix.eureka.cluster
以下提到的同步,准确来说,就是复制(Replication)。
1、集群节点初始化
Eureka Server
封装了一个集群节点管理的类PeerEurekaNodes
。
/**
* Helper class to manage lifecycle of a collection of {@link PeerEurekaNode}s.
*
* @author Tomasz Bak
*/
@Singleton
public class PeerEurekaNodes {
//应用实例注册表
protected final PeerAwareInstanceRegistry registry;
//Eureka Server配置
protected final EurekaServerConfig serverConfig;
//Eureka Client配置
protected final EurekaClientConfig clientConfig;
//Eureka Server 编解码
protected final ServerCodecs serverCodecs;
//应用实例信息管理器
private final ApplicationInfoManager applicationInfoManager;
//集群节点集合
private volatile List<PeerEurekaNode> peerEurekaNodes = Collections.emptyList();
//集群节点URL集合
private volatile Set<String> peerEurekaNodeUrls = Collections.emptySet();
//定时任务线程池
private ScheduledExecutorService taskExecutor;
@Inject
public PeerEurekaNodes(
PeerAwareInstanceRegistry registry,
EurekaServerConfig serverConfig,
EurekaClientConfig clientConfig,
ServerCodecs serverCodecs,
ApplicationInfoManager applicationInfoManager) {
this.registry = registry;
this.serverConfig = serverConfig;
this.clientConfig = clientConfig;
this.serverCodecs = serverCodecs;
this.applicationInfoManager = applicationInfoManager;
}
<!---略--->
}
peerEurekaNodes、peerEurekaNodeUrls、taskExecutor 属性,在构造方法中未设置和初始化,而是在 PeerEurekaNodes#start() 方法中,设置和初始化,接下来我们看 start() 方法的实现。
PeerEurekaNodes#start() 集群节点启动方法,主要完成以下几件事:
- 初始化定时任务线程池
- 初始化集群节点信息 updatePeerEurekaNodes 方法
- 初始化固定周期(默认10分钟,可配置)更新集群节点信息的任务的线程
- 通过定时任务,线程池定时执行更新集群节点线程
public void start() {
//1、初始化定时任务线程池
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
//后台运行
thread.setDaemon(true);
return thread;
}
}
);
try {
//2、初始化集群节点信息
updatePeerEurekaNodes(resolvePeerUrls());
//3、初始化固定周期更新集群节点信息的任务(节点更新任务线程)
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
//4、创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止
//和下一次执行开始之间都存在给定的延迟。如果任务的任一执行遇到异常,就会取
//消后续执行,否则,只能通过执行程序的取消或终止方法来终止该任务。
//参数:command(第一个参数)-要执行的任务,initialdelay(第二个参数)-首次执行的延迟时间
//delay(第三个参数)-一次执行终止和下一次执行开始之间的延迟,默认10分钟
//unit(第四个参数)-initialdelay和delay参数的时间单位,默认10分钟
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: {}", node.getServiceUrl());
}
}
2、更新集群节点信息
通过 start() 方法可以看出,Eureka Server 是通过一个定时线程定时去更新集群的节点信息达到对集群节点的动态发现和感知,在上面我们可以看到更新操作主要由 updatePeerEurekaNodes(resolvePeerUrls()) 方法完成,下面查看此方法的实现。
集群同步步骤
- 判断集群节点是否为空,为空则返回
- isReplication 代表是否是一个复制请求, isReplication = true 表示是其他Eureka Server发过来的同步请求,这个时候是不需要继续往下同步的。否则会陷入同步死循环
- 循环集群节点,过滤掉自身的节点
- 发起同步请求 ,调用replicateInstanceActionsToPeers
PS: 这里提到了PeerEurekaNode , 对于PeerEurekaNodes的集群节点更新及数据读取,在服务启动的时候,对PeerEurekaNodes集群开启了线程更新集群节点信息。每10分钟一次
/**
* Given new set of replica URLs, destroy {@link PeerEurekaNode}s no longer available, and
* create new ones.
*
* @param newPeerUrls peer node URLs; this collection should have local node's URL filtered out
*/
protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
if (newPeerUrls.isEmpty()) {
logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry");
return;
}
//计算要删除的集群节点地址(从以前的地址中删除最新的地址信息,剩下的就是不可用的地址)
Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
toShutdown.removeAll(newPeerUrls);
//计算要新增的集群节点地址(从最新的地址中删除以前的地址信息,剩下的就是新增的地址)
Set<String> toAdd = new HashSet<>(newPeerUrls);
toAdd.removeAll(peerEurekaNodeUrls);
//如果这两个集合都为空,说明前后地址信息一致,既没有新增也没有删除,不需要更新直接返回
if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
return;
}
//这是以前的所有服务节点
// Remove peers no long available
List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
//移除旧集合中不可用的节点信息
if (!toShutdown.isEmpty()) {
logger.info("Removing no longer available peer nodes {}", toShutdown);
int i = 0;
while (i < newNodeList.size()) {
PeerEurekaNode eurekaNode = newNodeList.get(i);
if (toShutdown.contains(eurekaNode.getServiceUrl())) {
//删除不可用节点,并关闭
newNodeList.remove(i);
eurekaNode.shutDown();
} else {
i++;
}
}
}
// Add new peers
if (!toAdd.isEmpty()) {
logger.info("Adding new peer nodes {}", toAdd);
for (String peerUrl : toAdd) {
newNodeList.add(createPeerEurekaNode(peerUrl));
}
}
//重新赋值:集群节点集合、集群节点URL集合
this.peerEurekaNodes = newNodeList;
this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
}
updatePeerEurekaNodes(resolvePeerUrls()) 根据传入的新集群的 URL 集合完成节点的更新
- 校验传入的 URL 集合是否需要更新
- 移除新 URL 集合中没有的旧节点并关闭节点
- 创建旧节点集合中没有的新 URL 节点,通过 createPeerEurekaNode(peerUrl) 方法
- 重新赋值节点集合以及节点URL集合完成节点的更新
3、创建节点信息
updatePeerEurekaNodes(resolvePeerUrls()) 方法传入的新 URL 集合,是通过resolvePeerUrls() 方法获取,这个方法实际上就是解析配置文件中的 eureka.serviceUrl 前缀的配置,并动态监听配置的更新。下面我们看创建节点的方法
protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) {
//创建一个连接远程节点的客户端
HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl);
//获取新节点host信息
String targetHost = hostFromUrl(peerEurekaNodeUrl);
if (targetHost == null) {
targetHost = "host";
}
//创建新节点
return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig);
}
4、集群节点数据同步
我们来看看创建新节点方法 PeerEurekaNode,集群节点间都有哪些数据需要同步
PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl,
HttpReplicationClient replicationClient, EurekaServerConfig config,
int batchSize, long maxBatchingDelayMs,
long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {
this.registry = registry;
this.targetHost = targetHost;
this.replicationClient = replicationClient;
this.serviceUrl = serviceUrl;
this.config = config;
this.maxProcessingDelayMs = config.getMaxTimeForReplication();
String batcherName = getBatcherName();
//初始化:集群复制任务处理器
ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);
//初始化:批量任务分发器
this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
batcherName,
config.getMaxElementsInPeerReplicationPool(),
batchSize,
config.getMaxThreadsForPeerReplication(),
maxBatchingDelayMs,
serverUnavailableSleepTimeMs,
retrySleepTimeMs,
taskProcessor
);
//初始化:单任务分发器
this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher(
targetHost,
config.getMaxElementsInStatusReplicationPool(),
config.getMaxThreadsForStatusReplication(),
maxBatchingDelayMs,
serverUnavailableSleepTimeMs,
retrySleepTimeMs,
taskProcessor
);
}
PeerEurekaNode 完成以下几件事
- 创建数据同步的任务处理器 ReplicationTaskProcessor
- 创建批处理任务分发器
- 创建单任务分发器
说明:eureka 将节点间的数据同步工作包装成一个个细微的任务 ReplicationTask,每一个数据操作代表一个任务,将任务发送给任务调度器 TaskDispatcher 去异步处理。
接下来我们看看 PeerEurekaNode 都可以创建哪些同步任务:
- register:当 Eureka Server 注册新服务时,同时创建一个定时任务将新服务同步到集群其他节点
public void register(final InstanceInfo info) throws Exception {
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
//任务调度器中添加一个请求类型为注册register新服务的同步任务
batchingDispatcher.process(
taskId("register", info),
new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
public EurekaHttpResponse<Void> execute() {
return replicationClient.register(info);
}
},
expiryTime
);
}
- cancel:取消服务注册任务,当前节点有服务取消注册,将信息同步到集群远程节点
public void cancel(final String appName, final String id) throws Exception {
long expiryTime = System.currentTimeMillis() + maxProcessingDelayMs;
//任务调度器中添加一个请求类型为取消cancel服务的同步任务
batchingDispatcher.process(
taskId("cancel", appName, id),
new InstanceReplicationTask(targetHost, Action.Cancel, appName, id) {
@Override
public EurekaHttpResponse<Void> execute() {
return replicationClient.cancel(appName, id);
}
@Override
public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
super.handleFailure(statusCode, responseEntity);
if (statusCode == 404) {
logger.warn("{}: missing entry.", getTaskName());
}
}
},
expiryTime
);
}
- heartbeat:心跳同步任务,当前节点有服务发送心跳续租,将信息同步到集群远程节点
public void heartbeat(final String appName, final String id,
final InstanceInfo info, final InstanceStatus overriddenStatus,
boolean primeConnection) throws Throwable {
if (primeConnection) {
// We do not care about the result for priming request.
replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
return;
}
ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
@Override
public EurekaHttpResponse<InstanceInfo> execute() throws Throwable {
return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
}
@Override
public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
super.handleFailure(statusCode, responseEntity);
if (statusCode == 404) {
logger.warn("{}: missing entry.", getTaskName());
if (info != null) {
logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}",
getTaskName(), info.getId(), info.getStatus());
register(info);
}
} else if (config.shouldSyncWhenTimestampDiffers()) {
InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
if (peerInstanceInfo != null) {
syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
}
}
}
};
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
//任务调度器中添加一个请求类型为heartbeat服务的同步任务
batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime);
}
5、集群节点数据同步任务处理
在 PeerEurekaNode 的构造函数中可以看到同步任务处理由 ReplicationTaskProcessor 完成,下面看此类源码
/**
* 单个处理 ReplicationTask任务
*/
@Override
public ProcessingResult process(ReplicationTask task) {
try {
//调用任务execute方法,完成任务的执行
EurekaHttpResponse<?> httpResponse = task.execute();
//判断任务返回结果
int statusCode = httpResponse.getStatusCode();
Object entity = httpResponse.getEntity();
if (logger.isDebugEnabled()) {
logger.debug("Replication task {} completed with status {}, (includes entity {})", task.getTaskName(), statusCode, entity != null);
}
if (isSuccess(statusCode)) {
task.handleSuccess();
} else if (statusCode == 503) {
logger.debug("Server busy (503) reply for task {}", task.getTaskName());
return ProcessingResult.Congestion;
} else {
task.handleFailure(statusCode, entity);
return ProcessingResult.PermanentError;
}
} catch (Throwable e) {
if (maybeReadTimeOut(e)) {
logger.error("It seems to be a socket read timeout exception, it will retry later. if it continues to happen and some eureka node occupied all the cpu time, you should set property 'eureka.server.peer-node-read-timeout-ms' to a bigger value", e);
//read timeout exception is more Congestion then TransientError, return Congestion for longer delay
return ProcessingResult.Congestion;
} else if (isNetworkConnectException(e)) {
logNetworkErrorSample(task, e);
return ProcessingResult.TransientError;
} else {
logger.error("{}: {} Not re-trying this exception because it does not seem to be a network exception",
peerId, task.getTaskName(), e);
return ProcessingResult.PermanentError;
}
}
return ProcessingResult.Success;
}
单任务处理
- 调用任务 task 的 execute 完成远程数据同步
- 分析远程返回结果
/**
* 批量处理ReplicationTask任务
*/
@Override
public ProcessingResult process(List<ReplicationTask> tasks) {
ReplicationList list = createReplicationListOf(tasks);
try {
EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);
int statusCode = response.getStatusCode();
if (!isSuccess(statusCode)) {
if (statusCode == 503) {
logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);
return ProcessingResult.Congestion;
} else {
// Unexpected error returned from the server. This should ideally never happen.
logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());
return ProcessingResult.PermanentError;
}
} else {
handleBatchResponse(tasks, response.getEntity().getResponseList());
}
} catch (Throwable e) {
if (maybeReadTimeOut(e)) {
logger.error("It seems to be a socket read timeout exception, it will retry later. if it continues to happen and some eureka node occupied all the cpu time, you should set property 'eureka.server.peer-node-read-timeout-ms' to a bigger value", e);
//read timeout exception is more Congestion then TransientError, return Congestion for longer delay
return ProcessingResult.Congestion;
} else if (isNetworkConnectException(e)) {
logNetworkErrorSample(null, e);
return ProcessingResult.TransientError;
} else {
logger.error("Not re-trying this exception because it does not seem to be a network exception", e);
return ProcessingResult.PermanentError;
}
}
return ProcessingResult.Success;
}
批处理任务,将一组任务一次性发送到远程进行处理
- 根据task集合创建 ReplicationList
- 调用批量同步接口将同步集合发送到远端节点同步数据,即调用 rest API
- 分析远程返回结果
三、总结
数据同步功能主要由 PeerEurekaNodes
与 PeerEurekaNode
类实现。
1、集群节点初始化:PeerEurekaNodes#start() 方法
- 初始化定时任务线程池
- 初始化集群节点信息 updatePeerEurekaNodes 方法
- 初始化固定周期(默认10分钟,可配置)更新集群节点信息的任务的线程
- 通过定时任务,线程池定时执行更新集群节点线程
2、更新集群节点信息:updatePeerEurekaNodes(resolvePeerUrls()) 方法
- 校验传入的 URL 集合是否需要更新
- 移除新 URL 集合中没有的旧节点并关闭节点
- 创建旧节点集合中没有的新 URL 节点,通过 createPeerEurekaNode(peerUrl) 方法
- 重新赋值节点集合以及节点URL集合完成节点的更新
- resolvePeerUrls() 方法,实际上就是解析配置文件中的 eureka.serviceUrl 前缀的配置,并动态监听配置的更新。
3、创建节点信息:createPeerEurekaNode(String peerEurekaNodeUrl) 方法
4、集群节点数据同步:PeerEurekaNode 方法
- 创建数据同步的任务处理器 ReplicationTaskProcessor
- 创建批处理任务分发器
- 创建单任务分发器
- PeerEurekaNode 可以创建的同步任务:register、cancel、heartbeat、statusUpdate、deleteStatusOverride
5、集群节点数据同步任务处理:ReplicationTaskProcessor.process() 完成
- 单任务处理
- 批量任务处理
总结
1:@EnableEurekaServer注入一个Marker类,说明是一个注册中心
2:EurekaServerAutoConfiguration注入一个filter,来拦截jersey请求转发给resource
3:服务注册,就是把信息存到一个ConcurrentHashMap<name, Map<id,Lease>>
4:对于注册冲突拿最新的微服务实例
5:server每分钟内收到的心跳数低于理应收到的85%就会触发自我保护机制
6:Lease的renew bug, duration多加了一次,理应加一个expireTime表示过期时间
7:集群同步:先注册到一台server,然后遍历其他的集群的其他server节点调用register注册到其他server,
isReplication=true代表此次注册来源于集群同步的注册,代表此次注册不要再进行集群同步,避免无限注册