序
本文主要研究一下eureka instance的lastDirtyTimestamp
server端
- lastDirtyTimestamp
last timestamp when this instance information was updated.
即该instance在client端最后被修改的时间戳
instance的接口,除了更新meta以及cancelLease操作外,其他修改的操作都要带上lastDirtyTimestamp,比如
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/resources/InstanceResource.java
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
//......
}
@PUT
@Path("status")
public Response statusUpdate(
@QueryParam("value") String newStatus,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
//......
}
@DELETE
@Path("status")
public Response deleteStatusUpdate(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("value") String newStatusValue,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
//......
}
除renewLease外,其他两个的lastDirtyTimestamp仅仅是用来做传递用。
renewLease
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
// Not found in the registry, immediately ask for a register
if (!isSuccess) {
logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
// Check if we need to sync based on dirty time stamp, the client
// instance might have changed some value
Response response = null;
if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
// Store the overridden status since the validation found out the node that replicates wins
if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
&& (overriddenStatus != null)
&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
&& isFromReplicaNode) {
registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
}
} else {
response = Response.ok().build();
}
logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
return response;
}
如果renew不成功,则返回404,client端则会执行register逻辑
如果renew成功,且lastDirtyTimestamp不为null,则判断是否需要SyncWhenTimestampDiffers,默认true
validateDirtyTimestamp
private Response validateDirtyTimestamp(Long lastDirtyTimestamp,
boolean isReplication) {
InstanceInfo appInfo = registry.getInstanceByAppAndId(app.getName(), id, false);
if (appInfo != null) {
if ((lastDirtyTimestamp != null) && (!lastDirtyTimestamp.equals(appInfo.getLastDirtyTimestamp()))) {
Object[] args = {id, appInfo.getLastDirtyTimestamp(), lastDirtyTimestamp, isReplication};
if (lastDirtyTimestamp > appInfo.getLastDirtyTimestamp()) {
logger.debug(
"Time to sync, since the last dirty timestamp differs -"
+ " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
args);
return Response.status(Status.NOT_FOUND).build();
} else if (appInfo.getLastDirtyTimestamp() > lastDirtyTimestamp) {
// In the case of replication, send the current instance info in the registry for the
// replicating node to sync itself with this one.
if (isReplication) {
logger.debug(
"Time to sync, since the last dirty timestamp differs -"
+ " ReplicationInstance id : {},Registry : {} Incoming: {} Replication: {}",
args);
return Response.status(Status.CONFLICT).entity(appInfo).build();
} else {
return Response.ok().build();
}
}
}
}
return Response.ok().build();
}
如果lastDirtyTimestamp参数大于server本地instance的lastDirtyTimestamp值,则response返回404;
如果是server本地大于lastDirtyTimestamp参数,不是replication模式则返回200,replication模式,则返回409 Conflict,让调用方同步自己的数据。
PeerEurekaNode
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/cluster/PeerEurekaNode.java
/**
* Send the heartbeat information of an instance to the node represented by
* this class. If the instance does not exist the node, the instance
* registration information is sent again to the peer node.
*
* @param appName
* the application name of the instance.
* @param id
* the unique identifier of the instance.
* @param info
* the instance info {@link InstanceInfo} of the instance.
* @param overriddenStatus
* the overridden status information if any of the instance.
* @throws Throwable
*/
public void heartbeat(final String appName, final String id,
final InstanceInfo info, final InstanceStatus overriddenStatus,
boolean primeConnection) throws Throwable {
if (primeConnection) {
// We do not care about the result for priming request.
replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
return;
}
ReplicationTask replicationTask = new InstanceReplicationTask(targetHost, Action.Heartbeat, info, overriddenStatus, false) {
@Override
public EurekaHttpResponse<InstanceInfo> execute() throws Throwable {
return replicationClient.sendHeartBeat(appName, id, info, overriddenStatus);
}
@Override
public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
super.handleFailure(statusCode, responseEntity);
if (statusCode == 404) {
logger.warn("{}: missing entry.", getTaskName());
if (info != null) {
logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}",
getTaskName(), info.getId(), info.getStatus());
register(info);
}
} else if (config.shouldSyncWhenTimestampDiffers()) {
InstanceInfo peerInstanceInfo = (InstanceInfo) responseEntity;
if (peerInstanceInfo != null) {
syncInstancesIfTimestampDiffers(appName, id, info, peerInstanceInfo);
}
}
}
};
long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
batchingDispatcher.process(taskId("heartbeat", info), replicationTask, expiryTime);
}
这里的handleFailure,对于404的处理就是重新register;其他的就是诸如409的情况,先判断是否开启shouldSyncWhenTimestampDiffers配置,这个默认是true,然后将peer返回的最新info信息覆盖到本地
syncInstancesIfTimestampDiffers
/**
* Synchronize {@link InstanceInfo} information if the timestamp between
* this node and the peer eureka nodes vary.
*/
private void syncInstancesIfTimestampDiffers(String appName, String id, InstanceInfo info, InstanceInfo infoFromPeer) {
try {
if (infoFromPeer != null) {
logger.warn("Peer wants us to take the instance information from it, since the timestamp differs,"
+ "Id : {} My Timestamp : {}, Peer's timestamp: {}", id, info.getLastDirtyTimestamp(), infoFromPeer.getLastDirtyTimestamp());
if (infoFromPeer.getOverriddenStatus() != null && !InstanceStatus.UNKNOWN.equals(infoFromPeer.getOverriddenStatus())) {
logger.warn("Overridden Status info -id {}, mine {}, peer's {}", id, info.getOverriddenStatus(), infoFromPeer.getOverriddenStatus());
registry.storeOverriddenStatusIfRequired(appName, id, infoFromPeer.getOverriddenStatus());
}
registry.register(infoFromPeer, true);
}
} catch (Throwable e) {
logger.warn("Exception when trying to set information from peer :", e);
}
}
这一段就是peerNode在replicate中遇到409的时候,根据返回的InstanceInfo覆盖本地的过程。
client端
eureka-client-1.8.8-sources.jar!/com/netflix/appinfo/InstanceInfo.java
/**
* Sets the dirty flag so that the instance information can be carried to
* the discovery server on the next heartbeat.
*/
public synchronized void setIsDirty() {
isInstanceInfoDirty = true;
lastDirtyTimestamp = System.currentTimeMillis();
}
这个是更改lastDirtyTimestamp的基本方法,调用这个方法的其他方法如下:
@Deprecated
public void setSID(String sid) {
this.sid = sid;
setIsDirty();
}
/**
* Set the status for this instance.
*
* @param status status for this instance.
* @return the prev status if a different status from the current was set, null otherwise
*/
public synchronized InstanceStatus setStatus(InstanceStatus status) {
if (this.status != status) {
InstanceStatus prev = this.status;
this.status = status;
setIsDirty();
return prev;
}
return null;
}
/**
* @param isDirty true if dirty, false otherwise.
* @deprecated use {@link #setIsDirty()} and {@link #unsetIsDirty(long)} to set and unset
* <p>
* Sets the dirty flag so that the instance information can be carried to
* the discovery server on the next heartbeat.
*/
@Deprecated
public synchronized void setIsDirty(boolean isDirty) {
if (isDirty) {
setIsDirty();
} else {
isInstanceInfoDirty = false;
// else don't update lastDirtyTimestamp as we are setting isDirty to false
}
}
/**
* Set the dirty flag, and also return the timestamp of the isDirty event
*
* @return the timestamp when the isDirty flag is set
*/
public synchronized long setIsDirtyWithTime() {
setIsDirty();
return lastDirtyTimestamp;
}
/**
* Register application specific metadata to be sent to the discovery
* server.
*
* @param runtimeMetadata
* Map containing key/value pairs.
*/
synchronized void registerRuntimeMetadata(
Map<String, String> runtimeMetadata) {
metadata.putAll(runtimeMetadata);
setIsDirty();
}
isInstanceInfoDirty
@JsonIgnore
public boolean isDirty() {
return isInstanceInfoDirty;
}
/**
* @return the lastDirtyTimestamp if is dirty, null otherwise.
*/
public synchronized Long isDirtyWithTime() {
if (isInstanceInfoDirty) {
return lastDirtyTimestamp;
} else {
return null;
}
}
/**
* Unset the dirty flag iff the unsetDirtyTimestamp matches the lastDirtyTimestamp. No-op if
* lastDirtyTimestamp > unsetDirtyTimestamp
*
* @param unsetDirtyTimestamp the expected lastDirtyTimestamp to unset.
*/
public synchronized void unsetIsDirty(long unsetDirtyTimestamp) {
if (lastDirtyTimestamp <= unsetDirtyTimestamp) {
isInstanceInfoDirty = false;
} else {
}
}
这个方法,如果lastDirtyTimestamp <= unsetDirtyTimestamp,则标识isInstanceInfoDirty为false
InstanceInfoReplicator
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/InstanceInfoReplicator.java
public void run() {
try {
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
这里首先refreshInstanceInfo,之后判断如果是isInstanceInfoDirty则会执行register操作
refreshInstanceInfo
/**
* Refresh the current local instanceInfo. Note that after a valid refresh where changes are observed, the
* isDirty flag on the instanceInfo is set to true
*/
void refreshInstanceInfo() {
applicationInfoManager.refreshDataCenterInfoIfRequired();
applicationInfoManager.refreshLeaseInfoIfRequired();
InstanceStatus status;
try {
status = getHealthCheckHandler().getStatus(instanceInfo.getStatus());
} catch (Exception e) {
logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN", e);
status = InstanceStatus.DOWN;
}
if (null != status) {
applicationInfoManager.setInstanceStatus(status);
}
}
refreshDataCenterInfoIfRequired
eureka-client-1.8.8-sources.jar!/com/netflix/appinfo/ApplicationInfoManager.java
/**
* Refetches the hostname to check if it has changed. If it has, the entire
* <code>DataCenterInfo</code> is refetched and passed on to the eureka
* server on next heartbeat.
*
* see {@link InstanceInfo#getHostName()} for explanation on why the hostname is used as the default address
*/
public void refreshDataCenterInfoIfRequired() {
String existingAddress = instanceInfo.getHostName();
String newAddress;
if (config instanceof RefreshableInstanceConfig) {
// Refresh data center info, and return up to date address
newAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(true);
} else {
newAddress = config.getHostName(true);
}
String newIp = config.getIpAddress();
if (newAddress != null && !newAddress.equals(existingAddress)) {
logger.warn("The address changed from : {} => {}", existingAddress, newAddress);
// :( in the legacy code here the builder is acting as a mutator.
// This is hard to fix as this same instanceInfo instance is referenced elsewhere.
// We will most likely re-write the client at sometime so not fixing for now.
InstanceInfo.Builder builder = new InstanceInfo.Builder(instanceInfo);
builder.setHostName(newAddress).setIPAddr(newIp).setDataCenterInfo(config.getDataCenterInfo());
instanceInfo.setIsDirty();
}
}
public void refreshLeaseInfoIfRequired() {
LeaseInfo leaseInfo = instanceInfo.getLeaseInfo();
if (leaseInfo == null) {
return;
}
int currentLeaseDuration = config.getLeaseExpirationDurationInSeconds();
int currentLeaseRenewal = config.getLeaseRenewalIntervalInSeconds();
if (leaseInfo.getDurationInSecs() != currentLeaseDuration || leaseInfo.getRenewalIntervalInSecs() != currentLeaseRenewal) {
LeaseInfo newLeaseInfo = LeaseInfo.Builder.newBuilder()
.setRenewalIntervalInSecs(currentLeaseRenewal)
.setDurationInSecs(currentLeaseDuration)
.build();
instanceInfo.setLeaseInfo(newLeaseInfo);
instanceInfo.setIsDirty();
}
}
从新刷新配置,如果有变更则instanceInfo.setIsDirty()
小结
在server端,处理renewLease的时候,对lastDirtyTimestamp参数进行判断,如果大于server本地instance的lastDirtyTimestamp值,则response返回404;如果是server本地大于lastDirtyTimestamp参数,不是replication模式则返回200,replication模式,则返回409 Conflict,让调用方同步自己的数据。
在client端,有两个属性,一个是lastDirtyTimestamp,一个是isInstanceInfoDirty。在更新instance的status的时候,会调用setIsDirty,即更新lastDirtyTimestamp以及设置isInstanceInfoDirty为true;然后client端还有个InstanceInfoReplicator定时任务,会定时读取配置文件,如果有变更则调用setIsDirty,之后是调用instanceInfo.isDirtyWithTime(),如果是dirty则会重新register并重置isInstanceInfoDirty,更新lastDirtyTimestamp。