简介
Elasticsearch目前在使用的服务发现算法,是基于zen的服务发现的第二代实现
节点模式
节点有三种模式,分别是CANDIDATE
, LEADER
, FOLLOWER
。
流程分析
doStart
主要是服务发现的基本配置的工作,做了几件事:
- 获取持久化的协调状态
- 初始化节点发现器(探测器)
- 选举配置
- 初始化集群状态
protected void doStart() {
synchronized (mutex) {
// 获取持久化的协调状态
CoordinationState.PersistedState persistedState = persistedStateSupplier.get();
// 当前协调状态
coordinationState.set(new CoordinationState(settings, getLocalNode(), persistedState));
// 节点发现的数量
peerFinder.setCurrentTerm(getCurrentTerm());
// hosts解析器
configuredHostsResolver.start();
// 最近一次的状态
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
if (lastAcceptedState.metaData().clusterUUIDCommitted()) {
logger.info("cluster UUID [{}]", lastAcceptedState.metaData().clusterUUID());
}
// 选举的节点的id,当集群状态变更的时候用到的配置 这里获取最近保存的配置信息
final VotingConfiguration votingConfiguration = lastAcceptedState.getLastCommittedConfiguration();
// 单节点发现,而且选举列表不为空,同时没有足够的选举节点就会引发异常
if (singleNodeDiscovery &&
votingConfiguration.isEmpty() == false &&
votingConfiguration.hasQuorum(Collections.singleton(getLocalNode().getId())) == false) {
throw new IllegalStateException("cannot start with [" + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() + "] set to [" +
DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE + "] when local node " + getLocalNode() +
" does not have quorum in voting configuration " + votingConfiguration);
}
// 初始化集群状态
ClusterState initialState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))
.blocks(ClusterBlocks.builder()
.addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)
.addGlobalBlock(noMasterBlockService.getNoMasterBlock()))
.nodes(DiscoveryNodes.builder().add(getLocalNode()).localNodeId(getLocalNode().getId()))
.build();
applierState = initialState;
clusterApplier.setInitialState(initialState);
}
}
startInitialJoin
主要用于第一次启动时进行节点发现及选举加入集群。
synchronized (mutex) {
// 成为候选人,Mode初始化是null,也即是什么都不是,这里会改成CANDIDATE.
becomeCandidate("startInitialJoin");
}
// 如果是master,启动集群
clusterBootstrapService.scheduleUnconfiguredBootstrap();
becomeCandidate(String method)
void becomeCandidate(String method) {
assert Thread.holdsLock(mutex) : "Coordinator mutex not held";
logger.debug("{}: coordinator becoming CANDIDATE in term {} (was {}, lastKnownLeader was [{}])",
method, getCurrentTerm(), mode, lastKnownLeader);
// 第一次启动节点mode是null
if (mode != Mode.CANDIDATE) {
final Mode prevMode = mode;
mode = Mode.CANDIDATE;
cancelActivePublication("become candidate: " + method);
joinAccumulator.close(mode);
joinAccumulator = joinHelper.new CandidateJoinAccumulator();
peerFinder.activate(coordinationState.get().getLastAcceptedState().nodes());
clusterFormationFailureHelper.start();
if (getCurrentTerm() == ZEN1_BWC_TERM) {
discoveryUpgradeService.activate(lastKnownLeader, coordinationState.get().getLastAcceptedState());
}
leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES);
leaderChecker.updateLeader(null);
followersChecker.clearCurrentNodes();
followersChecker.updateFastResponseState(getCurrentTerm(), mode);
lagDetector.clearTrackedNodes();
if (prevMode == Mode.LEADER) {
cleanMasterService();
}
if (applierState.nodes().getMasterNodeId() != null) {
applierState = clusterStateWithNoMasterBlock(applierState);
clusterApplier.onNewClusterState("becoming candidate: " + method, () -> applierState, (source, e) -> {
});
}
}
preVoteCollector.update(getPreVoteResponse(), null);
}