服务注册中心
pom.xml 文件
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.10.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka-server</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Edgware.SR2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
通过@EnableEurekaServer
注解启动一个服务注册中心
@EnableEurekaServer
@SpringBootApplication
public class EurekaApplication {
public static void main(String[] args) {
SpringApplication.run(EurekaApplication.class, args);
}
}
常用配置文件:
spring.application.name=eureka-server
server.port=l111
#由于该应用为注册中心,所以设置为false,代表不向注册中心注册自己
eureka.client.register-with-eureka=false
#由于注册中心的职责就是维护服务实例,它并不需要去检索服务,所以也设置为false
eureka.client.fetch-registry=false
(多实例时,集群中 上述配置改为true)
服务提供者
新建Spring Boot
项目,主类上添加@EnableDiscoveryClient
注解,激活Eureka中的DiscoveryClient,同时配置服务名,同时指定服务注册中心地址
#设置服务名
spring.application.name=hello-service
#设置注册中心地址
eureka.client.service-url.defaultZone=http://localhost:1111/eureka/
高可用注册中心
上面所述,register-with-eureka
,fetch-registry
均改为true
, 同时添加几个注册中心地址,以,
分隔,如下所示:
eureka.client.service-url.defaultZone=http://localhost:1111/eureka/,http://localhost:1112/eureka/
Eureka内部结构
- 注册表基于纯内存结构,使用ConcurrentHashMap 存放注册信息
package com.netflix.eureka.registry;
/**
* Handles all registry requests from eureka clients.
*/
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
= new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
}
- 两个定时任务 一个更新服务注册信息,一个检测服务心跳,如未设置,默认都是30s
package com.netflix.discovery;
public class DiscoveryClient implements EurekaClient {
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
……
}
}
}
- 二级缓存 readOnlyCacheMap readWriteCacheMap
新服务注册,更新ConurrentHashMap结构的注册表,过期readWriteCacheMap中的数据,定时任务刷新同步readOnlyCacheMap的数据
package com.netflix.eureka.registry;
public class ResponseCacheImpl implements ResponseCache {
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
logger.debug("Updating the client cache from response cache");
for (Key key : readOnlyCacheMap.keySet()) {
if (logger.isDebugEnabled()) {
Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()};
logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args);
}
try {
//此处 更新缓存,将readWriteCacheMap中的cacheValue同步到readOnlyCacheMap中
CurrentRequestVersion.set(key.getVersion());
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
if (cacheValue != currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache", th);
}
}
}
};
}
//更新缓存
/**
* Invalidate the cache of a particular application.
*
* @param appName the application name of the application.
*/
@Override
public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
for (Key.KeyType type : Key.KeyType.values()) {
for (Version v : Version.values()) {
invalidate(
new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact),
new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact),
new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact)
);
if (null != vipAddress) {
invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full));
}
if (null != secureVipAddress) {
invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full));
}
}
}
}
/**
* Invalidate the cache information given the list of keys.
*
* @param keys the list of keys for which the cache information needs to be invalidated.
*/
public void invalidate(Key... keys) {
for (Key key : keys) {
logger.debug("Invalidating the response cache key : {} {} {} {}, {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
//此处只过期了readWriteCacheMap的缓存
readWriteCacheMap.invalidate(key);
Collection<Key> keysWithRegions = regionSpecificKeys.get(key);
if (null != keysWithRegions && !keysWithRegions.isEmpty()) {
for (Key keysWithRegion : keysWithRegions) {
logger.debug("Invalidating the response cache key : {} {} {} {} {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
readWriteCacheMap.invalidate(keysWithRegion);
}
}
}
}
}