Eureka 是什么?要解决什么问题?
Eureka 是 REST 风格的服务,主要是用于定位服务,目的是解决中间层服务的负载均衡和故障转移。
Eureka 整体结构
角色划分
Eureka Server: Eureka 服务端
-
Eureka Client:Eureka 客户端
Application Service: 服务生产者
Application Client:服务消费者
功能描述
服务注册 Register:Client 端主动向 Server 端注册。提供自身的信息,比如 IP 地址、端口,主页…等
心跳续约 Renew:Client 默认会每隔30秒发送一次心跳到 Server 来完成续约。这样来告诉Server说我还活着
主动下线 Cancel:Client 端在停机时(正常停止)主动向 Server 发送下线,告诉 Server 把自己剔除
获取服务列表 Get Registry:Client 从 Server 获取注册表,并将其缓存在本地。缓存信息默认每 30s 更新一次。
注册表同步 Replicate:多个 Server 之间通过 P2P 复制的方式完成服务注册表的同步。
服务剔除 Eviction:在默认的情况下,当 Client 连续 90 秒没有向 Server 发送服务续约,Server 会将该 Client 从服务注册列表删除。
Eureka 核心概念
实例 Instance: 对应类:InstanceInfo,InstanceInfo 代表一个实例的信息,趋近 POJO。
应用 Application: 对应类:Application,同名 InstanceInfo 集合
注册表 Applications: 对应类:Applications,Application 集合
Eureka 组件
定时任务组件 TimedSupervisorTask
TimedSupervisorTask 是一个固定间隔的周期性任务(继承了Runable接口),一旦遇到超时,下一次执行的间隔时间就会翻倍,如果继续超时,继续翻倍,直到达到设置的上限(达到上限之后,以上限作为固定时间间隔),当任务正常执行之后,时间间隔又会还原成初始值
public class TimedSupervisorTask extends TimerTask {
private final String name;
private final ScheduledExecutorService scheduler; // 延迟执行 this 线程池
private final ThreadPoolExecutor executor; // task执行线程池
private final long timeoutMillis; // 默认超时时间 也是默认延迟时间
private final Runnable task; // 具体task
private final AtomicLong delay; // 延迟时间
private final long maxDelay; // 最大延迟时间
@Override
public void run() {
Future<?> future = null;
try {
future = executor.submit(task);
future.get(timeoutMillis, TimeUnit.MILLISECONDS);
delay.set(timeoutMillis);
} catch (TimeoutException e) {
long currentDelay = delay.get();
long newDelay = Math.min(maxDelay, currentDelay * 2);
delay.compareAndSet(currentDelay, newDelay);
} catch (RejectedExecutionException e) { // 略
} catch (Throwable e) { // 略
} finally {
if (future != null) { future.cancel(true); }
if (!scheduler.isShutdown()) {
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}
}
例如起始间隔是2秒,上限是15秒,效果如下
集群解析器 ClusterResolver
// 接口定义, 可以获取 EurekaEndpoint 列表,即服务接口地址
public interface ClusterResolver<T extends EurekaEndpoint> {
String getRegion();
List<T> getClusterEndpoints();
}
EurekaEndpoint 可以理解成 Server 接口地址的描述
基于配置的集群解析器 ConfigClusterResolver
ConfigClusterResolver 基于配置service-url,实现获取 EurekaEndpoint 列表
@Override
public List<AwsEndpoint> getClusterEndpoints() {
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
return getClusterEndpointsFromDns();
} else {
return getClusterEndpointsFromConfig();
}
}
private List<AwsEndpoint> getClusterEndpointsFromConfig() {
String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
String myZone = InstanceInfo.getZone(availZones, myInstanceInfo);
Map<String, List<String>> serviceUrls = EndpointUtils
.getServiceUrlsMapFromConfig(clientConfig, myZone, clientConfig.shouldPreferSameZoneEureka());
List<AwsEndpoint> endpoints = new ArrayList<>();
for (String zone : serviceUrls.keySet()) {
for (String url : serviceUrls.get(zone)) {
try {
endpoints.add(new AwsEndpoint(url, getRegion(), zone));
} catch (Exception ignore) {
logger.warn("Invalid eureka server URI: {}; removing from the server pool", url);
}
}
}
}
Zone亲和的集群解析器 ZoneAffinityClusterResolver
ZoneAffinityClusterResolver 持有 ConfigClusterResolver 对象,是静态代理,对 ConfigClusterResolver 返回的 EurekaEndpoint 做了随机排序。
public List<AwsEndpoint> getClusterEndpoints() {
// 获取当前zone和其它zone的Endpoint
List<AwsEndpoint>[] parts = ResolverUtils.splitByZone(delegate.getClusterEndpoints(), myZone);
List<AwsEndpoint> myZoneEndpoints = parts[0];
List<AwsEndpoint> remainingEndpoints = parts[1];
List<AwsEndpoint> randomizedList = randomizeAndMerge(myZoneEndpoints, remainingEndpoints);
// 如果非zone亲和,倒序排列
if (!zoneAffinity) {
Collections.reverse(randomizedList);
}
return randomizedList;
}
// 将Endpoint随机排列并合并,合并策略是将当前zone的Endpoint放在其它zone的Endpoint前面
private List<AwsEndpoint> randomizeAndMerge(List<AwsEndpoint> myZoneEndpoints, List<AwsEndpoint> remainingEndpoints) {
if (myZoneEndpoints.isEmpty()) {
return randomizer.randomize(remainingEndpoints);
}
if (remainingEndpoints.isEmpty()) {
return randomizer.randomize(myZoneEndpoints);
}
List<AwsEndpoint> mergedList = randomizer.randomize(myZoneEndpoints);
mergedList.addAll(randomizer.randomize(remainingEndpoints));
return mergedList;
}
异步解析器 AsyncResolver
异步使用定时任务组件 TimedSupervisorTask 实现
具体解析过程由委托对象 ZoneAffinityClusterResolver 完成
public class AsyncResolver<T extends EurekaEndpoint> implements ClosableResolver<T> {
private static final Logger logger = LoggerFactory.getLogger(AsyncResolver.class);
private final AtomicBoolean warmedUp = new AtomicBoolean(false);
private final AtomicBoolean scheduled = new AtomicBoolean(false);
private final String name; // a name for metric purposes
private final ClusterResolver<T> delegate; // 上文提到 ZoneAffinityClusterResolver
private final ScheduledExecutorService executorService;
private final ThreadPoolExecutor threadPoolExecutor;
private final TimedSupervisorTask backgroundTask; // 上文提到 TimedSupervisorTask
private final AtomicReference<List<T>> resultsRef; // 缓存
private final int refreshIntervalMs;
private final int warmUpTimeoutMs;
private volatile long lastLoadTimestamp = -1;
// 构造方法,初始化各种线程池
AsyncResolver(String name,
ClusterResolver<T> delegate,
List<T> initialValue,
int executorThreadPoolSize,
int refreshIntervalMs,
int warmUpTimeoutMs) {
this.name = name;
this.delegate = delegate;
this.refreshIntervalMs = refreshIntervalMs;
this.warmUpTimeoutMs = warmUpTimeoutMs;
this.executorService = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder()
.setNameFormat("AsyncResolver-" + name + "-%d")
.setDaemon(true)
.build());
this.threadPoolExecutor = new ThreadPoolExecutor(
1, executorThreadPoolSize, 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), // use direct handoff
new ThreadFactoryBuilder()
.setNameFormat("AsyncResolver-" + name + "-executor-%d")
.setDaemon(true)
.build()
);
this.backgroundTask = new TimedSupervisorTask(
this.getClass().getSimpleName(),
executorService,
threadPoolExecutor,
refreshIntervalMs,
TimeUnit.MILLISECONDS,
5,
updateTask
);
this.resultsRef = new AtomicReference<>(initialValue);
Monitors.registerObject(name, this);
}
@Override
public List<T> getClusterEndpoints() {
long delay = refreshIntervalMs;
// 如果未预热,开始预热,并标记已预热
if (warmedUp.compareAndSet(false, true)) {
if (!doWarmUp()) {
delay = 0;
}
}
// 如果未开始过定时任务,开始定时任务并做标记已开始
if (scheduled.compareAndSet(false, true)) {
scheduleTask(delay);
}
return resultsRef.get();
}
void scheduleTask(long delay) {
executorService.schedule(backgroundTask, delay, TimeUnit.MILLISECONDS);
}
// 具体任务,获取 EurekaEndpoint
private final Runnable updateTask = new Runnable() {
@Override
public void run() {
try {
// ZoneAffinityClusterResolver 获取 EurekaEndpoint 列表
List<T> newList = delegate.getClusterEndpoints();
if (newList != null) {
resultsRef.getAndSet(newList);
lastLoadTimestamp = System.currentTimeMillis();
} else {
logger.warn("Delegate returned null list of cluster endpoints");
}
logger.debug("Resolved to {}", newList);
} catch (Exception e) {
logger.warn("Failed to retrieve cluster endpoints from the delegate", e);
}
}
};
}
优秀的代理使用
通信
底层通信接口 EurekaHttpClient
EurekaHttpClient 系统的实现图 如上,左边是 Eureka 自身使用 Jersey 实现的Rest风格, 右边是 Spring Cloud 使用 RestTemplate 实现 Rest 风格
http 访问实际是通过 apache http client 实现,此处忽略。
右边这一块通过命名可以看出使用的是一个装饰器模式。
SessionedEurekaHttpClient:实现重连接,隔一段时间会重新断开连接,并重新建立,实现客户端负载均衡
RetryableEurekaHttpClient: 重试一定次数,并有对 server 的黑名单机制
RedirectingEurekaHttpClient:实现重定向
RestTemplateTransportClient: 基于RestTemplate的 Rest 风格的访问
SessionedEurekaHttpClient 核心代码
@Override
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
long now = System.currentTimeMillis();
long delay = now - lastReconnectTimeStamp;
// 当 (now - lastReconnectTimeStamp)大于 currentSessionDurationMs 时,
// 端开连接并将eurekaHttpClientRef置null
// currentSessionDurationMs 取值范围:sessionDurationMs+-[0, sessionDurationMs/2]
if (delay >= currentSessionDurationMs) {
logger.debug("Ending a session and starting anew");
lastReconnectTimeStamp = now;
currentSessionDurationMs = randomizeSessionDuration(sessionDurationMs);
TransportUtils.shutdown(eurekaHttpClientRef.getAndSet(null));
}
EurekaHttpClient eurekaHttpClient = eurekaHttpClientRef.get();
if (eurekaHttpClient == null) {
eurekaHttpClient = TransportUtils.getOrSetAnotherClient(eurekaHttpClientRef, clientFactory.newClient());
}
return requestExecutor.execute(eurekaHttpClient);
}
RetryableEurekaHttpClient 核心代码
private List<EurekaEndpoint> getHostCandidates() {
// 黑名单: quarantineSet 候选名单:candidateHosts
// 候选名单顺序是随机的 clusterResolver 为 ZoneAffinityClusterResolver
// 黑名单对候选名单求交集,确保黑名单元素都在候选名单中
List<EurekaEndpoint> candidateHosts = clusterResolver.getClusterEndpoints();
quarantineSet.retainAll(candidateHosts);
// 算出阈值, 默认阈值比例 0.66
// 如果计算结果过大,则为candidateHosts.size
int threshold = (int) (candidateHosts.size() * transportConfig.getRetryableClientQuarantineRefreshPercentage());
if (threshold > candidateHosts.size()) {
threshold = candidateHosts.size();
}
// 如黑名单多余阈值,则将黑名单清空,即如Server损坏太多,清空黑名单,这就是黑名单恢复机制
// 如没有达到阈值,正常过滤
if (quarantineSet.isEmpty()) {
// no-op
} else if (quarantineSet.size() >= threshold) {
logger.debug("Clearing quarantined list of size {}", quarantineSet.size());
quarantineSet.clear();
} else {
List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size());
for (EurekaEndpoint endpoint : candidateHosts) {
if (!quarantineSet.contains(endpoint)) {
remainingHosts.add(endpoint);
}
}
candidateHosts = remainingHosts;
}
return candidateHosts;
}
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
List<EurekaEndpoint> candidateHosts = null;
// 按顺序连接 Server
int endpointIdx = 0;
// 重试次数限制 numberOfRetries 默认3
for (int retry = 0; retry < numberOfRetries; retry++) {
// 如果缓存的 currentHttpClient 不存在, 则通过候选名单建立新连接
EurekaHttpClient currentHttpClient = delegate.get();
EurekaEndpoint currentEndpoint = null;
if (currentHttpClient == null) {
if (candidateHosts == null) {
candidateHosts = getHostCandidates();
if (candidateHosts.isEmpty()) {
throw new TransportException("cluster server list is empty");
}
}
if (endpointIdx >= candidateHosts.size()) {
throw new TransportException("Cannot execute request on any known server");
}
// 首次 或者 上一次请求失败 会重新复制endpointIdx,连接候选列表中的下个实例
currentEndpoint = candidateHosts.get(endpointIdx++);
currentHttpClient = clientFactory.newClient(currentEndpoint);
}
try {
EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
// 访问成功, 返回结束,否则会继续下次循环,并将当前server放入黑名单
if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
delegate.set(currentHttpClient);
return response;
}
} catch (Exception e) {
logger.warn("Request execution failed with message: {}", e.getMessage());
}
delegate.compareAndSet(currentHttpClient, null);
if (currentEndpoint != null) {
quarantineSet.add(currentEndpoint);
}
}
// 如果重试次数达到限制仍然失败,抛出异常
throw new TransportException("Retry limit reached; giving up on completing the request");
}
DiscoveryClient 静态内部类 EurekaTransport 集合了所有访问 Server 的路径。
初始化代码在 DiscoveryClient # scheduleServerEndpointTask, 初始化结果是
结果是
private static final class EurekaTransport {
private ClosableResolver bootstrapResolver; // 即异步解析器 AsyncResolver
// 工厂类RestTemplateTransportClientFactory
private TransportClientFactory transportClientFactory;
// 注册使用的EurekaHttpClient 与对应工厂, 实际为 SessionedEurekaHttpClient
private EurekaHttpClient registrationClient;
private EurekaHttpClientFactory registrationClientFactory;
// 同上
private EurekaHttpClient queryClient;
private EurekaHttpClientFactory queryClientFactory;
}
最终由EurekaHttpClients#canonicalClientFactory 实现,可以看到此处返回的是 SessionedEurekaHttpClient SessionedEurekaHttpClient 持有 RetryableEurekaHttpClientFactory,
RetryableEurekaHttpClientFactory 持有 RedirectingEurekaHttpClientFactory,
RedirectingEurekaHttpClientFactory持有 TransportClientFactory(RestTemplateTransportClientFactory)
static EurekaHttpClientFactory canonicalClientFactory(final String name,
final EurekaTransportConfig transportConfig,
final ClusterResolver<EurekaEndpoint> clusterResolver,
final TransportClientFactory transportClientFactory) {
return new EurekaHttpClientFactory() {
@Override
public EurekaHttpClient newClient() {
return new SessionedEurekaHttpClient(
name,
RetryableEurekaHttpClient.createFactory(
name,
transportConfig,
clusterResolver, RedirectingEurekaHttpClient.createFactory(transportClientFactory),
ServerStatusEvaluators.legacyEvaluator()),
transportConfig.getSessionedClientReconnectIntervalSeconds() * 1000
);
}
@Override
public void shutdown() {
wrapClosable(clusterResolver).shutdown();
}
};
}
Eureka Instance
InstanceInfo 描述实例本身, 如instanceId, 应用名称,host、租约信息等。
public class InstanceInfo {
private volatile String instanceId; // 唯一
private volatile String appName;
private volatile String appGroupName;
private volatile String ipAddr;
private volatile String sid = SID_DEFAULT;
private volatile int port = DEFAULT_PORT;
private volatile int securePort = DEFAULT_SECURE_PORT;
private volatile String homePageUrl;
private volatile String statusPageUrl;
private volatile String healthCheckUrl;
private volatile String secureHealthCheckUrl;
private volatile String vipAddress;
private volatile String secureVipAddress;
private String statusPageRelativeUrl;
private String statusPageExplicitUrl;
private String healthCheckRelativeUrl;
private String healthCheckSecureExplicitUrl;
private String vipAddressUnresolved;
private String secureVipAddressUnresolved;
private String healthCheckExplicitUrl;
private volatile int countryId = DEFAULT_COUNTRY_ID; // Defaults to US
private volatile boolean isSecurePortEnabled = false;
private volatile boolean isUnsecurePortEnabled = true;
private volatile DataCenterInfo dataCenterInfo;
private volatile String hostName;
// starting 初始化、down健康检查失败、up正常服务、out_of_service 正常服务但是不接受请求 unkown 未知状态
private volatile InstanceStatus status = InstanceStatus.UP;
// 覆盖状态
private volatile InstanceStatus overriddenStatus = InstanceStatus.UNKNOWN;
// 标注实例是否脏(client对比server) true表示需要向服务端注册
private volatile boolean isInstanceInfoDirty = false;
// 租约信息
private volatile LeaseInfo leaseInfo;
private volatile Boolean isCoordinatingDiscoveryServer = Boolean.FALSE;
// 元数据
private volatile Map<String, String> metadata;
private volatile Long lastUpdatedTimestamp;
private volatile Long lastDirtyTimestamp;
private volatile ActionType actionType;
private volatile String asgName;
private String version = VERSION_UNKNOWN;
}
public class InstanceInfoFactory {
public InstanceInfo create(EurekaInstanceConfig config) {
。。。
}
}
Eureka Client 实现
核心逻辑DiscoveryClient
// 初始化所有定时任务
private void initScheduledTasks() {
// 拉取注册表
if (clientConfig.shouldFetchRegistry()) {
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
// TimedSupervisorTask 定时任务, task 是 CacheRefreshThread
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
);
// scheduler 触发延时 cacheRefreshTask
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
// 需要注册到 Server
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// TimedSupervisorTask 定时任务, task 是 HeartbeatThread
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
);
// scheduler 触发延时 cacheRefreshTask
scheduler.schedule(
heartbeatTask,
renewalIntervalInSecs, TimeUnit.SECONDS);
// 服务复制器
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
// 状态变更监听器
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// 服务复制开始
instanceInfoReplicator.start(
clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
缓存刷新 CacheRefreshThread
定时拉取增量数据更新本地缓存
心跳任务 HeartbeatThread
定时发送续约
服务复制器 InstanceInfoReplicator
当dirty时,重新注册
状态变更监听器 ApplicationInfoManager.StatusChangeListener
执行一次 InstanceInfoReplicator#run
Eureka Server 实现
Eureka 可视化仪表盘 EurekaController
Spring Cloud 提供了可视化的页面,可以通过 EurekaDashboardProperties 配置。
资源层
使用Jersey实现的Restful web 服务,类似 @RestController
ApplicationsResource
ApplicationResource
InstanceResource
InstancesResource
PeerReplicationResource
响应缓存实现-二级缓存 ResponseCacheImpl
逻辑图
一级缓存 只读缓存 ConcurrentMap:ConcurrentMap<Key, Value> readOnlyCacheMap
二级缓存 读写缓存 guaua:LoadingCache<Key, Value> readWriteCacheMap
先看构造代码
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
this.serverConfig = serverConfig;
this.serverCodecs = serverCodecs;
this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
this.registry = registry;
long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
this.readWriteCacheMap =CacheBuilder.newBuilder()
.initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
// key 删除监听,将 Key 从 regionSpecificKeys 移除
.removalListener(new RemovalListener<Key, Value>() {
@Override
public void onRemoval(RemovalNotification<Key, Value> notification) {
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
})
// CacheLoader 缓存加载
// Key 放入 regionSpecificKeys
// 从registry 获取 Value 放入缓存
.build(new CacheLoader<Key, Value>() {
@Override
public Value load(Key key) throws Exception {
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
// 从注册表获取Value缓存
Value value = generatePayload(key);
return value;
}
});
// 如果使用只读缓存更新定时任务
if (shouldUseReadOnlyResponseCache) {
timer.schedule(getCacheUpdateTask(),
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
+ responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
}
}
再看获取缓存方法
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
// 如果使用了只读缓存 readOnlyCacheMap,则优先从只读缓存读取,如果不存在,在从读写缓存读取,并更新只读缓存
// 如果不使用只读缓存, 则直接从 读写缓存中读取
if (useReadOnlyCache) {
final Value currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
payload = readWriteCacheMap.get(key);
readOnlyCacheMap.put(key, payload);
}
} else {
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
logger.error("Cannot get value for key : {}", key, t);
}
return payload;
}
注册表实现
LookupService 服务发现
LeaseManager 租约行为
AbstractInstanceRegistry 接口核心逻辑实现
PeerAwareInstanceRegistryImpl 同步复制
InstanceRegistry 发布 Spring 事件
public interface LookupService<T> {
Application getApplication(String appName);
Applications getApplications();
List<InstanceInfo> getInstancesById(String id);
InstanceInfo getNextServerFromEureka(String virtualHostname, boolean secure);
}
public interface LeaseManager<T> {
void register(T r, int leaseDuration, boolean isReplication);
boolean cancel(String appName, String id, boolean isReplication);
boolean renew(String appName, String id, boolean isReplication);
void evict();
}
注册表存储registry 本质上是一个双层的 ConcurrentHashMap,存储在内存中的。AbstractInstanceRegistry#registry
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
- 第一层的 key 是appName,value 是第二层 ConcurrentHashMap;
- 第二层 ConcurrentHashMap 的 key 是服务的 InstanceId,value 是 Lease 对象,Lease 对象包含了实例详情和服务治理相关的属性。
服务注册 register
InstanceRegistry
public void register(InstanceInfo info, boolean isReplication) {
// 发送事件 EurekaInstanceRegisteredEvent
handleRegistration(info, leaseDuration, isReplication);
super.register(info, leaseDuration, isReplication);
}
PeerAwareInstanceRegistryImpl
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication);
// 复制同步到其它节点
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
AbstractInstanceRegistry
// registrant 注册对象、leaseDuration 租约时间、isReplication 是否来自eureka-server复制
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
// 获取读锁,即读取操作不受阻塞,写操作会阻塞。
read.lock();
try {
//gMap是一个CurrentHashMap, 注册表获取应用所有租约信息
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
// 如果 gMap 为null, 则新建gNewMap放入注册表registry,并赋值给 gMap
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
// 获取对应实例的租约信息,
// 如果存在且已存在的lastdirtytime > 待注册的实例lastdirtytime, 使用已存在的实例
// 如果不存在,认为是新注册实例,
// expectedNumberOfClientsSendingRenews+1并更新numberOfRenewsPerMinThreshold
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
registrant = existingLease.getHolder();
}
} else {
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
updateRenewsPerMinThreshold();
}
}
}
// 用注册对象,构建Lease,并放入gMap
Lease<InstanceInfo> lease = new Lease<>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
// This is where the initial state transfer of overridden status happens
// 如果覆盖状态不是 UNKNOWN,则需要处理覆盖状态
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
// 覆盖状态overriddenInstanceStatusMap不包含注册实例,则添加
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
// 如果 注册实例存在覆盖状态,则注册实例使用覆盖状态替换自身的覆盖状态
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// 根据覆盖状态规则,获取状态,并无脏更新状态
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
// 如果注册实例状态为UP, 更新lease serviceUpTimestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
// 将lease 封装成最近变更对象 RecentlyChangedItem,放入最近更新队列
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
// 使 二级缓存中的读写缓存失效
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
} finally {
read.unlock();
}
}
服务续约 renew
InstanceRegistry
public boolean renew(final String appName, final String serverId, boolean isReplication) {
// 获取字典排序的注册表
// 遍历注册表 找到对应实例,并发布事件 EurekaInstanceRenewedEvent
List<Application> applications = getSortedApplications();
for (Application input : applications) {
if (input.getName().equals(appName)) {
InstanceInfo instance = null;
for (InstanceInfo info : input.getInstances()) {
if (info.getId().equals(serverId)) {
instance = info;
break;
}
}
publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId, instance, isReplication));
break;
}
}
return super.renew(appName, serverId, isReplication);
}
PeerAwareInstanceRegistryImpl
public boolean renew(final String appName, final String id, final boolean isReplication) {
if (super.renew(appName, id, isReplication)) {
// 续约动作成功,则复制同步到其它eureka-server节点
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}
AbstractInstanceRegistry
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
// 如果注册表中 renew 对象的Lease 为空,则返回false
// 如果 Lease 非空, 则继续 renew 动作
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
return false;
} else {
// 获取 renew 对应的实例
//
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// 获取覆盖状态,如果覆盖状态为UNKNOWN, 返回false
// 如果覆盖状态非UNKNOWN,则更新实例状态为覆盖状态
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
renewsLastMin.increment();
// 续约动作 lastUpdateTimestamp = System.currentTimeMillis() + duration
leaseToRenew.renew();
return true;
}
}
服务下线 cancel
InstanceRegistry
public boolean cancel(String appName, String serverId, boolean isReplication) {
// 发送事件 EurekaInstanceCanceledEvent
handleCancelation(appName, serverId, isReplication);
return super.cancel(appName, serverId, isReplication);
}
PeerAwareInstanceRegistryImpl
public boolean cancel(final String appName, final String id,
final boolean isReplication) {
if (super.cancel(appName, id, isReplication)) {
// 成功撤销服务,则复制同步其它eureka-server
replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
return true;
}
return false;
}
AbstractInstanceRegistry
protected boolean internalCancel(String appName, String id, boolean isReplication) {
// 获取读锁,即读取操作不受阻塞,写操作会阻塞。
read.lock();
try {
CANCEL.increment(isReplication);
// 从注册表找到 cancel 的Lease,并从注册表删除
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToCancel = null;
if (gMap != null) {
leaseToCancel = gMap.remove(id);
}
recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
// 从overriddenInstanceStatusMap删除cancel的id
InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
if (instanceStatus != null) {
}
if (leaseToCancel == null) {
CANCEL_NOT_FOUND.increment(isReplication);
return false;
} else {
// 下线动作 evictionTimestamp = System.currentTimeMillis()
// 将下线的lease封装成RecentlyChangedItem,放入最近变更队列
leaseToCancel.cancel();
InstanceInfo instanceInfo = leaseToCancel.getHolder();
String vip = null;
String svip = null;
if (instanceInfo != null) {
instanceInfo.setActionType(ActionType.DELETED);
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
instanceInfo.setLastUpdatedTimestamp();
vip = instanceInfo.getVIPAddress();
svip = instanceInfo.getSecureVipAddress();
}
// 使 二级缓存中的读写缓存失效
invalidateCache(appName, vip, svip);
}
} finally {
read.unlock();
}
// 下线之后,
// expectedNumberOfClientsSendingRenews-1并更新numberOfRenewsPerMinThreshold
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
updateRenewsPerMinThreshold();
}
}
return true;
}
增量获取
最近变更队列 ConcurrentLinkedQueue, 由定时任务更新,定时任务遍历增量队列找到保留时间达到限制的元素删除。
private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue = new ConcurrentLinkedQueue<>();
// 构造代码
this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
serverConfig.getDeltaRetentionTimerIntervalInMs(),
serverConfig.getDeltaRetentionTimerIntervalInMs())
private TimerTask getDeltaRetentionTask() {
return new TimerTask() {
@Override
public void run() {
Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
while (it.hasNext()) {
// 如果最近更新时间 < 当前时间 - 增量保留时间配置,删除
if (it.next().getLastUpdateTime() <
System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
it.remove();
} else {
break;
}
}
}
};
}
获取增量, 遍历增量队列,将增量队列中实例添加到返回值中
public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions) {
if (null == remoteRegions) {
remoteRegions = allKnownRemoteRegions; // null means all remote regions.
}
boolean includeRemoteRegion = remoteRegions.length != 0;
if (includeRemoteRegion) {
GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS_DELTA.increment();
} else {
GET_ALL_CACHE_MISS_DELTA.increment();
}
Applications apps = new Applications();
apps.setVersion(responseCache.getVersionDeltaWithRegions().get());
Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
// 获取写锁,禁止其它线程读写
write.lock();
try {
// 遍历增量队列,获取实例信息放入返回结果
Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
while (iter.hasNext()) {
Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
InstanceInfo instanceInfo = lease.getHolder();
Application app = applicationInstancesMap.get(instanceInfo.getAppName());
if (app == null) {
app = new Application(instanceInfo.getAppName());
applicationInstancesMap.put(instanceInfo.getAppName(), app);
apps.addApplication(app);
}
app.addInstance(new InstanceInfo(decorateInstanceInfo(lease)));
}
// 获取远程数据
if (includeRemoteRegion) {
for (String remoteRegion : remoteRegions) {
RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
if (null != remoteRegistry) {
Applications remoteAppsDelta = remoteRegistry.getApplicationDeltas();
if (null != remoteAppsDelta) {
for (Application application : remoteAppsDelta.getRegisteredApplications()) {
if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
Application appInstanceTillNow =
apps.getRegisteredApplications(application.getName());
if (appInstanceTillNow == null) {
appInstanceTillNow = new Application(application.getName());
apps.addApplication(appInstanceTillNow);
}
for (InstanceInfo instanceInfo : application.getInstances()) {
appInstanceTillNow.addInstance(new InstanceInfo(instanceInfo));
}
}
}
}
}
}
}
Applications allApps = getApplicationsFromMultipleRegions(remoteRegions);
apps.setAppsHashCode(allApps.getReconcileHashCode());
return apps;
} finally {
write.unlock();
}
}
服务剔除 evict
class EvictionTask extends TimerTask {
// 上次执行事件时间
private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
@Override
public void run() {
try {
long compensationTimeMs = getCompensationTimeMs();
evict(compensationTimeMs);
} catch (Throwable e) {
}
}
// 计算补偿时间(额外补偿的租约时间) (System.nanoTime() - lastExecutionNanos - evictionIntervalTimerInMs)
// 如果补偿时间小于0,设置为0
// 如: 上次执行时间使10s, 当前时间(这次执行时间)是80s, 服务剔除周期是60s, 则补偿时间是(80-10-60) = 10s
long getCompensationTimeMs() {
long currNanos = getCurrentTimeNano();
long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
if (lastNanos == 0l) {
return 0l;
}
long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
return compensationTime <= 0l ? 0l : compensationTime;
}
long getCurrentTimeNano() {
return System.nanoTime();
}
}
public void evict(long additionalLeaseMs) {
// !(shouldEnableSelfPreservation 配置不开启自我保护或最近renew次数大于自我保护的阈值)
if (!isLeaseExpirationEnabled()) {
return;
}
// 遍历注册表所有Lease,将满足剔除条件的Lease放入expiredLeases中
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
// (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs))
// 实际过期时间应该是 两倍的 duration,这是一个已知的官方bug,
// 但是因为只影响不雅下线的服务,所以没有修复
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
// 计算最大剔除个数:注册表个数 - 注册表个数 * 自我保护阈值比例
int registrySize = (int) getLocalRegistrySize();
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
int evictionLimit = registrySize - registrySizeThreshold;
// 本次执行剔除个数:最大剔除个数和符合剔除条件Lease个数较小值
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
// 随机剔除符合剔除条件Lease列表中的toEvict个
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
internalCancel(appName, id, false);
}
}
}
同步复制
上文提到 PeerAwareInstanceRegistryImpl 在服务注册、服务续约和服务下线时,都会通过 replicateToPeers() 方法将数据同步到 eureka-server 的其它节点,看下代码。
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
// 来自其它server的同步复制
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// 如果来自其它server的同步复制或者对等server节点列表为空 不需要同步复制
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
// 遍历对等server节点列表,非当前节点,同步复制数据
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
// 通过PeerEurekaNode根据不同的Action 发出不同的同步程序
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
} finally {
CurrentRequestVersion.remove();
}
}
由上代码可知,
对等节点管理器 PeerEurekaNodes
eureka 原生 PeerEurekaNodes
public class PeerEurekaNodes {
protected final PeerAwareInstanceRegistry registry; // 注册表实现
protected final EurekaServerConfig serverConfig; // server配置
protected final EurekaClientConfig clientConfig; // client配置
protected final ServerCodecs serverCodecs;
private final ApplicationInfoManager applicationInfoManager; // 应用管理器
// 管理维护的对等节点
private volatile List<PeerEurekaNode> peerEurekaNodes = Collections.emptyList();
private volatile Set<String> peerEurekaNodeUrls = Collections.emptySet();
// 线程池,用于定时更新对等节点列表
private ScheduledExecutorService taskExecutor;
public void start() {
// 构造线程池
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
// 更新管理对等节点列表
// resolvePeerUrls 从配置中获取非当前节点的server配置url列表
// updatePeerEurekaNodes 删除、创建节点,并更新缓存列表
updatePeerEurekaNodes(resolvePeerUrls());
// 构造更新对等节点列表线程Runable,并开始定时任务
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}
}
};
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
}
Spring Cloud 定制 RefreshablePeerEurekaNodes,允许刷新对等节点,增加实现ApplicationListener接口,监听EnvironmentChangeEvent事件,更新对等节点列表
// 增加了ApplicationListener
static class RefreshablePeerEurekaNodes extends PeerEurekaNodes
implements ApplicationListener<EnvironmentChangeEvent> {
@Override
public void onApplicationEvent(final EnvironmentChangeEvent event) {
if (shouldUpdate(event.getKeys())) {
updatePeerEurekaNodes(resolvePeerUrls());
}
}
/*
* 判断是否需要更新
*/
protected boolean shouldUpdate(final Set<String> changedKeys) {
assert changedKeys != null;
// dns 不需要更新
if (this.clientConfig.shouldUseDnsForFetchingServiceUrls()) {
return false;
}
// 如果配置变更的key包含“eureka.client.region”,则需要更新
if (changedKeys.contains("eureka.client.region")) {
return true;
}
// 如果配置变更的Key包含“eureka.client.service-url.”
// 或“eureka.client.availability-zones.”前缀,在需要更新
for (final String key : changedKeys) {
if (key.startsWith("eureka.client.service-url.")
|| key.startsWith("eureka.client.availability-zones.")) {
return true;
}
}
return false;
}
}
综上:对等节点管理器 PeerEurekaNodes 管理维护者对等节点列表,有两种更新方式
- 定时任务 (PeerEurekaNodes)
- EnvironmentChangeEvent 事件监听 (RefreshablePeerEurekaNodes)
对等节点 PeerEurekaNode
看PeerEurekaNode成员对象可知,主要是对等节点信息,还有就是批量调度器(batchingDispatcher)和非批量调度器(nonBatchingDispatcher),后文仅介绍批量调度器(batchingDispatcher)。
public class PeerEurekaNode {
public static final String BATCH_URL_PATH = "peerreplication/batch/";
private final String serviceUrl; // 当前节点地址
private final EurekaServerConfig config; // server配置
private final long maxProcessingDelayMs; // 最大延迟时间
private final PeerAwareInstanceRegistry registry; // 注册表实现,见上文
private final String targetHost; // 目标host
private final HttpReplicationClient replicationClient; // http访问客户端
/**
* batchingDispatcher 与 nonBatchingDispatcher为批量和非批量的调度器
**/
private final TaskDispatcher<String, ReplicationTask> batchingDispatcher;
private final TaskDispatcher<String, ReplicationTask> nonBatchingDispatcher;
// 构造函数
PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl,
HttpReplicationClient replicationClient, EurekaServerConfig config,
int batchSize, long maxBatchingDelayMs,
long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {
this.registry = registry;
this.targetHost = targetHost;
this.replicationClient = replicationClient;
this.serviceUrl = serviceUrl;
this.config = config;
this.maxProcessingDelayMs = config.getMaxTimeForReplication();
String batcherName = getBatcherName();
ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);
this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(
batcherName,
config.getMaxElementsInPeerReplicationPool(),
batchSize,
config.getMaxThreadsForPeerReplication(),
maxBatchingDelayMs,
serverUnavailableSleepTimeMs,
retrySleepTimeMs,
taskProcessor
);
this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher(
targetHost,
config.getMaxElementsInStatusReplicationPool(),
config.getMaxThreadsForStatusReplication(),
maxBatchingDelayMs,
serverUnavailableSleepTimeMs,
retrySleepTimeMs,
taskProcessor
);
}
}
TaskDispatcher 任务调度器创建代码,极其简单,创建了两个对象,AcceptorExecutor和TaskExecutors。其中process方法,就是执行AcceptorExecutor的process方法。
public static <ID, T> TaskDispatcher<ID, T> createBatchingTaskDispatcher(String id, int maxBufferSize, int workloadSize, int workerCount, long maxBatchingDelay, long congestionRetryDelayMs, long networkFailureRetryMs, TaskProcessor<T> taskProcessor) {
final AcceptorExecutor<ID, T> acceptorExecutor = new AcceptorExecutor(id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs);
final TaskExecutors<ID, T> taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);
return new TaskDispatcher<ID, T>() {
public void process(ID id, T task, long expiryTime) {
acceptorExecutor.process(id, task, expiryTime);
}
public void shutdown() {
acceptorExecutor.shutdown();
taskExecutor.shutdown();
}
};
}
接收处理器 AcceptorExecutor
数据结构
// 新任务队列,一段进一段出
BlockingQueue<TaskHolder<ID, T>> acceptorQueue = new LinkedBlockingQueue<>();
// 重新处理任务队列,双端进出
BlockingDeque<TaskHolder<ID, T>> reprocessQueue = new LinkedBlockingDeque<>();
//
// 等待处理的任务,定时任务
Map<ID, TaskHolder<ID, T>> pendingTasks = new HashMap<>();
Deque<ID> processingOrder = new LinkedList<>();
// 即将处理的任务,单个
Semaphore singleItemWorkRequests = new Semaphore(0);
BlockingQueue<TaskHolder<ID, T>> singleItemWorkQueue = new LinkedBlockingQueue<>();
// 即将要处理的任务,批量
Semaphore batchWorkRequests = new Semaphore(0);
BlockingQueue<List<TaskHolder<ID, T>>> batchWorkQueue = new LinkedBlockingQueue<>();
提供的方法
// 提供给注册表实现的方法,接受任务,放入队列
void process(ID id, T task, long expiryTime) {
acceptorQueue.add(new TaskHolder<ID, T>(id, task, expiryTime));
acceptedTasks++;
}
// 提供给work线程TaskExecutors的方法,重新添加失败的任务,批量
void reprocess(List<TaskHolder<ID, T>> holders, ProcessingResult processingResult) {
reprocessQueue.addAll(holders);
replayedTasks += holders.size();
trafficShaper.registerFailure(processingResult);
}
// 提供给work线程TaskExecutors的方法,重新添加失败的任务,单个
void reprocess(TaskHolder<ID, T> taskHolder, ProcessingResult processingResult) {
reprocessQueue.add(taskHolder);
replayedTasks++;
trafficShaper.registerFailure(processingResult);
}
// 提供给work线程TaskExecutors的方法,获取任务,单个
BlockingQueue<TaskHolder<ID, T>> requestWorkItem() {
singleItemWorkRequests.release(); // 释放锁
return singleItemWorkQueue;
}
// 提供给work线程TaskExecutors的方法,获取任务,批量
BlockingQueue<List<TaskHolder<ID, T>>> requestWorkItems() {
batchWorkRequests.release(); // 释放锁
return batchWorkQueue;
}
定时任务
class AcceptorRunner implements Runnable {
@Override
public void run() {
long scheduleTime = 0;
while (!isShutdown.get()) {
try {
// 将reprocessQueue队列中元素挪至pendingTasks和processingOrder中,
// 并记录超时未处理的
// 将acceptorQueue队列中元素挪着pendingTasks和processingOrder中
// 在放置pendingTasks元素过程中,完成相同任务覆盖
// pendingTasks 放满即停止
// 如果放置之后acceptorQueue、reprocessQueue、pendingTasks都为空,
// 阻塞等待获取10毫秒
// 每次都取满pendingTasks或acceptorQueue、reprocessQueue被取完
drainInputQueues();
int totalItems = processingOrder.size();
long now = System.currentTimeMillis();
if (scheduleTime < now) {
scheduleTime = now + trafficShaper.transmissionDelay();
}
if (scheduleTime <= now) {
// 将pendingTasks中任务取出批量放入batchWorkQueue中
assignBatchWork();
// 将pendingTasks中任务取出一个放入singleItemWorkQueue中
assignSingleItemWork();
}
// 没有任务从pendingTasks移走,说明work线程阻塞或延迟,等待10毫秒
if (totalItems == processingOrder.size()) {
Thread.sleep(10);
}
} catch (InterruptedException ex) {
// Ignore
} catch (Throwable e) {
logger.warn("Discovery AcceptorThread error", e);
}
}
}
任务处理器 TaskExecutors
在构造函数中开启work线程
TaskExecutors(WorkerRunnableFactory<ID, T> workerRunnableFactory, int workerCount, AtomicBoolean isShutdown) {
this.isShutdown = isShutdown;
this.workerThreads = new ArrayList<>();
ThreadGroup threadGroup = new ThreadGroup("eurekaTaskExecutors");
for (int i = 0; i < workerCount; i++) {
WorkerRunnable<ID, T> runnable = workerRunnableFactory.create(i);
Thread workerThread = new Thread(threadGroup, runnable, runnable.getWorkerName());
workerThreads.add(workerThread);
workerThread.setDaemon(true);
workerThread.start();
}
}
work线程实现,已批量方式为例,
static class BatchWorkerRunnable<ID, T> extends WorkerRunnable<ID, T> {
@Override
public void run() {
try {
// 只要没有关闭 就一直重试
while (!isShutdown.get()) {
// 通过 AcceptorExecutor#requestWorkItems 获取任务,如果为空,poll 1秒
List<TaskHolder<ID, T>> holders = getWork();
List<T> tasks = getTasksOf(holders);
// 由ReplicationTaskProcessor执行任务,
// 如果是TransientError错误,如超时或连接失败,则放入重试队列
ProcessingResult result = processor.process(tasks);
switch (result) {
case Success:
break;
case Congestion:
case TransientError:
taskDispatcher.reprocess(holders, result);
break;
case PermanentError:
logger.warn();
}
}
} catch (InterruptedException e) {
// Ignore
} catch (Throwable e) {
logger.warn("Discovery WorkerThread error", e);
}
}
}
Eureka 配置
Eureka Instance 配置
public interface EurekaInstanceConfig {
// eureka 中注册实例的id, 必须是惟一的,
String getInstanceId();
// eureka 中注册实例的应用名称
String getAppname();
// eureka 中注册实例的应用组, 多个应用可分组,很少用,一般为null
String getAppGroupName();
// 这个属性用来决定应用服务是否一注册上就可以开始接收请求, 默认是false
// 应用启动时,可以根据eureka.instance.instance-enabled-onit配置设定(默认为false),
// 来配置初始注册到eureka server的时候,其status是UP,还是STARTING。
// 默认初始化的时候是STARTING,之后自动注册的时候,变更为UP,表示可以开始接收请求。
boolean isInstanceEnabledOnit();
// 非安全端口
int getNonSecurePort();
// 安全端口
int getSecurePort();
// 非安全端口是否启用
boolean isNonSecurePortEnabled();
// 安全端口是否启用
boolean getSecurePortEnabled();
// 心跳续约间隔 默认30s
int getLeaseRenewalIntervalInSeconds();
// 租约到期时间 默认90秒
int getLeaseExpirationDurationInSeconds();
// 虚拟域名,其它实例可以通过虚拟域名找到这个实例,类似全限定域名访问
String getVirtualHostName();
// 安全虚拟域名
String getSecureVirtualHostName();
// 域名
String getHostName(boolean refresh);
// 元数据, K-V 结构,会被发送到 Server 端
Map<String, String> getMetadataMap();
// ip
String getIpAddress();
// 获取状态路径, /actuator/info
String getStatusPageUrlPath();
// 获取状态url, / ,配合 getStatusPageUrlPath 使用
String getStatusPageUrl();
// 获取home page路径, /
String getHomePageUrlPath();
// 获取home page路径, / , 配合 getHomePageUrlPath 使用
String getHomePageUrl();
// 健康检查url path /actuator/health
// 通常用来做依据实例健康做决策
String getHealthCheckUrlPath();
// 健康检查url, / , 配合 getHealthCheckUrlPath 使用
String getHealthCheckUrl();
// 健康检查url, / , 配合 getHealthCheckUrlPath 使用
String getSecureHealthCheckUrl();
// 配置命名空间,一般是 "eureka"
String getNamespace();
}
对应 Springboot 环境 EurekaInstanceConfig 类的实现是 EurekaInstanceConfigBean 类
@ConfigurationProperties("eureka.instance")
public class EurekaInstanceConfigBean implements CloudEurekaInstanceConfig, EnvironmentAware {
}
EurekaInstanceConfig 被 InstanceInfoFactory 用来创建 InstanceInfo 对象。
public class InstanceInfoFactory {
public InstanceInfo create(EurekaInstanceConfig config) {
LeaseInfo.Builder leaseInfoBuilder = LeaseInfo.Builder.newBuilder()
.setRenewalIntervalInSecs(config.getLeaseRenewalIntervalInSeconds())
.setDurationInSecs(config.getLeaseExpirationDurationInSeconds());
InstanceInfo.Builder builder = InstanceInfo.Builder.newBuilder();
String namespace = config.getNamespace();
if (!namespace.endsWith(".")) {
namespace = namespace + ".";
}
builder.setNamespace(namespace).setAppName(config.getAppname())
.setInstanceId(config.getInstanceId())
.setAppGroupName(config.getAppGroupName())
.setDataCenterInfo(config.getDataCenterInfo())
.setIPAddr(config.getIpAddress())
.setHostName(config.getHostName(false))
.setPort(config.getNonSecurePort())
.enablePort(InstanceInfo.PortType.UNSECURE,
config.isNonSecurePortEnabled())
.setSecurePort(config.getSecurePort())
.enablePort(InstanceInfo.PortType.SECURE, config.getSecurePortEnabled())
.setVIPAddress(config.getVirtualHostName())
.setSecureVIPAddress(config.getSecureVirtualHostName())
.setHomePageUrl(config.getHomePageUrlPath(), config.getHomePageUrl())
.setStatusPageUrl(config.getStatusPageUrlPath(),
config.getStatusPageUrl())
.setHealthCheckUrls(config.getHealthCheckUrlPath(),
config.getHealthCheckUrl(), config.getSecureHealthCheckUrl())
.setASGName(config.getASGName());
// Start off with the STARTING state to avoid traffic
// 如果 isInstanceEnabledOnit 配置为 false, 则启动状态为 STARTING
// 否则启动状态为 UP
if (!config.isInstanceEnabledOnit()) {
InstanceInfo.InstanceStatus initialStatus = InstanceInfo.InstanceStatus.STARTING;
if (log.isInfoEnabled()) {
log.info("Setting initial instance status as: " + initialStatus);
}
builder.setStatus(initialStatus);
}
else {
if (log.isInfoEnabled()) {
log.info("Setting initial instance status as: "
+ InstanceInfo.InstanceStatus.UP
+ ". This may be too early for the instance to advertise itself as available. "
+ "You would instead want to control this via a healthcheck handler.");
}
}
// 添加任意的元数据
for (Map.Entry<String, String> mapEntry : config.getMetadataMap().entrySet()) {
String key = mapEntry.getKey();
String value = mapEntry.getValue();
// only add the metadata if the value is present
if (value != null && !value.isEmpty()) {
builder.add(key, value);
}
}
InstanceInfo instanceInfo = builder.build();
instanceInfo.setLeaseInfo(leaseInfoBuilder.build());
return instanceInfo;
}
}
Eureka Client 配置
public interface EurekaClientConfig {
// 获取注册列表时间间隔 默认30s
int getRegistryFetchIntervalSeconds();
// 更新实例信息的变化到 Server 的间隔时间,默认值30s
int getInstanceInfoReplicationIntervalSeconds();
// 行上面任务的初始化Delay延迟 默认值40s
int getInitialInstanceInfoReplicationIntervalSeconds();
// 轮询eureka服务器信息更改的频率(动态获取serviceUrl列表)
// Server 可能会增加或减少,这个配置控制 Client 可以多块知晓变更
// 默认是5分钟
int getEurekaServiceUrlPollIntervalSeconds();
// Server 的代理
String getProxyHost();
String getProxyPort();
String getProxyUserName();
String getProxyPassword();
// 是否要对 Server 拉取的内容容进行 GZip 压缩 默认值是 true
// true:会加过滤器 GZIPContentEncodingFilter,在请求上加请求头:`Accept-Encoding:gzip`
boolean shouldGZipContent();
// Client 访问 Server, 控制发送请求时的 readTimeout 值,默认是8s
int getEurekaServerReadTimeoutSeconds();
// Client 访问 Server, 控制发送请求时的 connectionTimeout 值,默认是8s
int getEurekaServerConnectTimeoutSeconds();
// 获取备注册中心的实现类(若你的EurekaClient链接的Server挂了,就使用它去连其它的)
// 若你想做兜底,那么可以使用它(比如使用Nacos做备用)
// 说明:内部并未提供任何实现,若有需要请自己提供实现类,且配置好就可生效
String getBackupRegistryImpl();
// Client 是可以并发发出N多个请求请求 Server 的,
// 这里就给出了限制,避免单个 Client 实例把 Server 就给搞垮了
// 控制 maxTotalConnections,也就是发送请求的连接池的最大容量,默认值200
// 如果是Apache的HC,那就是控制它的连接池大小
int getEurekaServerTotalConnections();
// 单台host的允许的连接总数 默认值50
int getEurekaServerTotalConnectionsPerHost();
// ========下面这些配置均只有在eureka服务器ip地址列表是在DNS中才会用到,默认为null======= //
// 表示eureka注册中心的路径,如果配置为eureka,则为http://x.x.x.x:x/eureka/
// 在eureka的配置文件中加入此配置表示eureka作为客户端向注册中心注册,从而构成eureka集群
String getEurekaServerURLContext();
// 获取eureka服务器的端口
String getEurekaServerPort();
// 获取要查询的DNS名称来获得eureka服务器
String getEurekaServerDNSName();
// 是否使用 DNS 方式获取 Eureka-Server URL 地址 默认是false
// 在获取到Server集群的节点后,会解析出url们。这里决定如何解析(要不要用DNS)
boolean shouldUseDnsForFetchingServiceUrls();
// 是否注册自己这个实例到Server上 默认是true
// 有些情况,你不希望腻子注册到 Server, 可以选择不注册,如服务消费者
// 如果不注册自己,很多定时任务是不需要的,因此Server端建议关闭此项
// 当然你若想看看自己是否还“活着”,让其注册上去也无妨
boolean shouldRegisterWithEureka();
// 在 Client shutdown 时,是否从 Server 上主动下线, 默认true
// 它是在EurekaClient#shutdown方法里被调用的
default boolean shouldUnregisterOnShutdown() {
return true;
}
// 实例是否使用同一zone里的eureka服务器,默认为true
// 理想状态下,eureka客户端与服务端是在同一zone下
boolean shouldPreferSameZoneEureka();
// 简单的说:是否允许Server端给你返回302重定向其它机器去处理(比如自己负载太高了不想处理)
// 它的实现方式是发送请求时添加请求头:`X-Discovery-AllowRedirect:true`
// 默认值是false
boolean allowRedirects();
// 当Client本地的实例们和Server返回的实例们出现差时(比如状态变更、元数据变更等),是否记录log日志
// 默认值是false,不记录。毕竟这种日志意义并不大~~~没必要消耗性能
boolean shouldLogDeltaDiff();
// 是否禁用增量获取 true:每次全量获取,false:每次增量获取
// 默认值是false
boolean shouldDisableDelta();
// Eureka-Server 的 URL 集合
// 默认为http://XXXX:X/eureka/,但是如果采用DNS方式获取服务地址,则不需要配置此设置。
List<String> getEurekaServerServiceUrls(String myZone);
// 获取实例时是否过滤,仅保留UP状态的实例 默认值是true
boolean shouldFilterOnlyUpInstances();
// 控制连接线程池的。最大空闲时间,默认值30s
int getEurekaConnectionIdleTimeoutSeconds();
// 是否从Eureka服务端获取注册信息 默认true
boolean shouldFetchRegistry();
// 只去只获得一个 vipAddress 对应的应用实例们的注册信息。默认值null
// 若指定来该值:那么只会从这个单一的vipAddress里获取
@Nullable
String getRegistryRefreshSingleVipAddress();
// 心跳执行程序(续约线程)线程池的大小 默认为5
int getHeartbeatExecutorThreadPoolSize();
// 心跳超时重试延迟时间的最大乘数值。默认值是10
// 也就是超时了的话 5*10 50s后再去试一把
int getHeartbeatExecutorExponentialBackOffBound();
// 缓存刷新线程池的初始化线程数 默认值5
int getCacheRefreshExecutorThreadPoolSize();
// 缓存刷新重试延迟时间的最大乘数值。默认值是10
// 也就是超时了的话 5*10 50s后再去试一把
int getCacheRefreshExecutorExponentialBackOffBound();
// eureka服务器序列化/反序列化的信息中获取“$”符号的的替换字符串。默认为“_-”
String getDollarReplacement();
// eureka服务器序列化/反序列化的信息中获取“_”符号的的替换字符串。默认为“__”
String getEscapeCharReplacement();
// true:通过ApplicationInfoManager本地实例状态时,立即触发更新到远程server
// false:不立即触发,依赖于心跳,默认值true
boolean shouldOnDemandUpdateStatusChange();
// 客户端是否应在初始化期间强制注册 默认值false
// 毕竟初始化期间不一定全部搞定了,所以为false较好
default boolean shouldEnforceRegistrationAtInit() {
return false;
}
String getEncoderName();
String getDecoderName();
// 加上请求头:`X-Eureka-Accept = xxx`
// 可选值是:full/compact
String getClientDataAccept();
// experimental:试验的
// 当尝试新功能迁移过程时,为了避免配置API污染,试验的这部分配置即可通过此接口传递
// 可见:JerseyEurekaHttpClientFactory#buildExperimental用于构建试验的配置
// 用于灰度一些实验性的配置比较好用
String getExperimental(String name);
// 为了兼容性,返回传输层配置类
// EurekaTransportConfig会放在详解传输的时候解析
// 也是在DiscoveryClient里会使用到
EurekaTransportConfig getTransportConfig();
}
Springboot 环境,EurekaClientConfig 的实现类是 EurekaClientConfigBean。
@ConfigurationProperties(EurekaClientConfigBean.PREFIX)
public class EurekaClientConfigBean implements EurekaClientConfig, Ordered {
}
public static final String PREFIX = "eureka.client";
Eureka Server 配置
public interface EurekaServerConfig {
// 是否开启自我保护机制 默认开启true
boolean shouldEnableSelfPreservation();
// 开启自我保护模式比例,超过该比例后开启自我保护模式。默认0.85
double getRenewalPercentThreshold();
// 自我保护模式比例更新定时任务执行频率。
int getRenewalThresholdUpdateIntervalMs();
// 期望客户端发送心跳的间隔。
// 默认值为30秒。
// 如果客户端以不同的频率发送心跳,比如说,每15秒发送一次,那么应该相应地调整此参数,
// 否则,自我保护将无法按预期工作。
int getExpectedClientRenewalIntervalSeconds();
// server 节点间相互发现周期
int getPeerEurekaNodesUpdateIntervalMs();
// 复制数据使用压缩,通过 Accept-Encoding 实现。
boolean shouldEnableReplicatedRequestCompression();
// 复制同步重试次数
int getNumberOfReplicationRetries();
// server 节点状态刷新周期
int getPeerEurekaStatusRefreshTimeIntervalMs();
// 设置如果Eureka Server启动时无法从临近Eureka Server节点获取注册信息,
// 它多久不对外提供注册服务
int getWaitTimeInMsWhenSyncEmpty();
// server 节点间访问连接超时时间
int getPeerNodeConnectTimeoutMs();
// server 节点间读超时时间
int getPeerNodeReadTimeoutMs();
// 节点间 最大连接数
int getPeerNodeTotalConnections();
// 节点间对某一节点最大连接数
int getPeerNodeTotalConnectionsPerHost();
// 连接空闲时间
int getPeerNodeConnectionIdleTimeoutSeconds();
// 增量队列缓存保留时间
long getRetentionTimeInMSInDeltaQueue();
// 增量队列检查缓存过期时间周期
long getDeltaRetentionTimerIntervalInMs();
// 剔除心跳周期
long getEvictionIntervalTimerInMs();
// 只读缓存过期时间
long getResponseCacheAutoExpirationInSeconds();
// 二级缓存过期时间
long getResponseCacheUpdateIntervalMs();
// 使用一级缓存
boolean shouldUseReadOnlyResponseCache();
// 是否关闭使用增量信息
boolean shouldDisableDelta();
// 同步线程池 线程空闲时间
long getMaxIdleThreadInMinutesAgeForStatusReplication();
// 同步线程池 最小线程数
int getMinThreadsForStatusReplication();
// 同步线程池 最大线程数
int getMaxThreadsForStatusReplication();
// 同步线程池 同步队列最大值
int getMaxElementsInStatusReplicationPool();
// 当时间戳不同时 是否同步
boolean shouldSyncWhenTimestampDiffers();
// server启动时,从其他节点拉取注册表 重试次数
int getRegistrySyncRetries();
// server启动时,从其他节点拉取注册表 重试等待时间
long getRegistrySyncRetryWaitMs();
// 同步线程 批量大小
int getMaxElementsInPeerReplicationPool();
// 最小up状态的server节点数目
int getHealthStatusMinNumberOfAvailablePeers();
// 批量同步最大延迟时间,超时不处理
int getMaxTimeForReplication();
// 是否使用批量同步
boolean shouldBatchReplication();
// 当前serverUrl
String getMyUrl();
// 是否启动限流
boolean isRateLimiterEnabled();
// 总限流值
int getRateLimiterBurstSize();
// 拉取请求限流值
int getRateLimiterRegistryFetchAverageRate();
// 全量拉取限流值
int getRateLimiterFullFetchAverageRate();
}