Discovery模块负责发现集群中的节点,以及选择主节点。ES支持多种不同Discovery类型选择,内置的实现有两种:Zen Discovery和Coordinator,其他的包括公有云平台亚马逊的EC2、谷歌的GCE等。
Zen Discovery和Coordinator
- 1、Coordinator
ES 7.x 重构了一个新的集群协调层Coordinator,他实际上是 Raft 的实现,但并非严格按照 Raft 论文实现,而是做了一些调整。
可以在配置文件中指定,代码如下:
org.elasticsearch.discovery.DiscoveryModule.DiscoveryModule(....)
public DiscoveryModule(...) {
if (ZEN2_DISCOVERY_TYPE.equals(discoveryType) || SINGLE_NODE_DISCOVERY_TYPE.equals(discoveryType)) {
discovery = new Coordinator(NODE_NAME_SETTING.get(settings),
settings, clusterSettings,
transportService, namedWriteableRegistry, allocationService, masterService,
() -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), seedHostsProvider,
clusterApplier, joinValidators, new Random(Randomness.get().nextLong()), rerouteService, electionStrategy);
} else if (ZEN_DISCOVERY_TYPE.equals(discoveryType)) {
discovery = new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
clusterSettings, seedHostsProvider, allocationService, joinValidators, gatewayMetaState, rerouteService);
} else {
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
}
}
- 2、Zen Discovery
采用Bully算法
它假定所有节点都有一个唯一的ID,使用该ID对节点进行排序。任何时候的当前Leader都是参与集群的最高ID节点。该算法的优点是易于实现。但是,当拥有最大ID的节点处于不稳定状态的场景下会有问题。例如,Master负载过重而假死,集群拥有第二大ID的节点被选为新主,这时原来的Master恢复,再次被选为新主,然后又假死
ES 通过推迟选举,直到当前的 Master 失效来解决上述问题,只要当前主节点不挂掉,就不重新选主。但是容易产生脑裂(双主),为此,再通过“法定得票人数过半”解决脑裂问题
- 3、算法比较
1、raft算法与Bully算法
相同点
1、多数派原则:必须得到超过半数的选票才能成为master。
选出的leader一定拥有最新已提交数据:在raft中,数据更新的节点不会给数据旧的节点投选票,而当选需要多数派的选票,则当选人一定有最新已提交数据。在es中,version大的节点排序优先级高,同样用于保证这一点。
不同点
正确性论证:raft是一个被论证过正确性的算法,而ES的算法是一个没有经过论证的算法,只能在实践中发现问题,做bug fix,这是我认为最大的不同。
是否有选举周期term:raft引入了选举周期的概念,每轮选举term加1,保证了在同一个term下每个参与人只能投1票。ES在选举时没有term的概念,不能保证每轮每个节点只投一票。
选举的倾向性:raft中只要一个节点拥有最新的已提交的数据,则有机会选举成为master。在ES中,version相同时会按照NodeId排序,总是NodeId小的人优先级高。
2、Paxos算法
Paxos非常强大,尤其在什么时机,以及如何进行选举方面的灵活性比简单的Bully算法有很大的优势,因为在现实生活中,存在比网络连接异常更多的故障模式。但 Paxos 实现起来非常复杂
本篇只讨论内置的Zen Discovery
流程分析
整体流程可以概括为:选举临时Master,如果本节点当选,则等待确立Master,如果其他节点当选,则尝试加入集群,然后启动节点失效探测器。
1、节点启动
如果集群刚启动则参与选主,否则加入集群
org.elasticsearch.node.Node.start()
public Node start() throws NodeValidationException {
discovery.start();
discovery.startInitialJoin();
}
2、选举临时Master
选举过程的实现位于 org.elasticsearch.discovery.zen.ZenDiscovery.findMaster()
,该函数查找当前集群的活跃 Master,或者从候选者中选择新的Master。如果选主成功,则返回选定的Master,否则返回空
private DiscoveryNode findMaster() {
logger.trace("starting to ping");
List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();
List<DiscoveryNode> activeMasters = new ArrayList<>();
for (ZenPing.PingResponse pingResponse : pingResponses) {
// We can't include the local node in pingMasters list, otherwise we may up electing ourselves without
// any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {
activeMasters.add(pingResponse.master());
}
}
// nodes discovered during pinging
List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();
for (ZenPing.PingResponse pingResponse : pingResponses) {
if (pingResponse.node().isMasterNode()) {
masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
}
}
if (activeMasters.isEmpty()) {
//判断当前候选者人数是否达到法定人数
if (electMaster.hasEnoughCandidates(masterCandidates)) {
final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
logger.trace("candidate {} won election", winner);
return winner.getNode();
} else {
// if we don't have enough master nodes, we bail, because there are not enough master to elect from
logger.trace("not enough master nodes [{}]", masterCandidates);
return null;
}
} else {
assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
// lets tie break between discovered nodes
return electMaster.tieBreakActiveMasters(activeMasters);
}
}
上面选择临时主节点非常简单,
首先需要判断当前候选者人数是否达到法定人数,否则选主失败。
public boolean hasEnoughCandidates(Collection<MasterCandidate> candidates) {
if (candidates.isEmpty()) {
return false;
}
if (minimumMasterNodes < 1) {
return true;
}
assert candidates.stream().map(MasterCandidate::getNode).collect(Collectors.toSet()).size() == candidates.size() :
"duplicates ahead: " + candidates;
return candidates.size() >= minimumMasterNodes;
}
取列表中的最小值,比较函数通过compareNodes实现,只是对节点 ID 进行排序
public MasterCandidate electMaster(Collection<MasterCandidate> candidates) {
assert hasEnoughCandidates(candidates);
List<MasterCandidate> sortedCandidates = new ArrayList<>(candidates);
sortedCandidates.sort(MasterCandidate::compare);
return sortedCandidates.get(0);
}
public static int compare(MasterCandidate c1, MasterCandidate c2) {
// we explicitly swap c1 and c2 here. the code expects "better" is lower in a sorted
// list, so if c2 has a higher cluster state version, it needs to come first.
int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);
if (ret == 0) {
ret = compareNodes(c1.getNode(), c2.getNode());
}
return ret;
}
3、确立Master或加入集群
选举出的临时Master有两种情况:该临时Master是本节点或非本节点。
- 1、如果临时Master是本节点:
(1)等待足够多的具备Master资格的节点加入本节点(投票达到法定人数),以完成选举。
if (clusterService.localNode().equals(masterNode)) {
final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
new NodeJoinController.ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
joinThreadControl.markThreadAsDone(currentThread);
nodesFD.updateNodesAndPing(state); // start the odes FD
}
@Override
public void onFailure(Throwable t) { joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
}
}
);
}
private synchronized void checkPendingJoinsAndElectIfNeeded() {
assert electionContext != null : "election check requested but no active context";
final int pendingMasterJoins = electionContext.getPendingMasterJoinsCount();
//是否有足够的选票
if (electionContext.isEnoughPendingJoins(pendingMasterJoins) == false) {
}
} else {
electionContext.closeAndBecomeMaster();
electionContext = null; // clear this out so future joins won't be accumulated
}
}
(2)超时(默认为30秒,可配置)后还没有满足数量的join请求,则选举失败,需要进行新一轮选举。
public void waitToBeElectedAsMaster(int requiredMasterJoins, TimeValue timeValue, final ElectionCallback callback) {
final CountDownLatch done = new CountDownLatch(1);
try {
if (done.await(timeValue.millis(), TimeUnit.MILLISECONDS)) {
// callback handles everything
return;
}
} catch (InterruptedException e) {
}
}
超时后直接return,当非临时节点加入集群不成功时,重新发起选主流程
org.elasticsearch.discovery.zen.ZenDiscovery.innerJoinCluster()
// send join request
final boolean success = joinElectedMaster(masterNode);
// finalize join through the cluster state update thread
final DiscoveryNode finalMasterNode = masterNode;
clusterService.submitStateUpdateTask("finalize_join (" + masterNode + ")", new ClusterStateUpdateTask() {
@Override
public boolean runOnlyOnMaster() {
return false;
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
if (!success) {
// failed to join. Try again...
//重新发起选主流程
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
return currentState;
}
(3)成功后发布新的clusterState。
实现如下:
public synchronized void closeAndBecomeMaster() {
Map<DiscoveryNode, ClusterStateTaskListener> tasks = getPendingAsTasks();
final String source = "zen-disco-elected-as-master ([" + tasks.size() + "] nodes joined)";
tasks.put(BECOME_MASTER_TASK, (source1, e) -> {}); // noop listener, the election finished listener determines result
tasks.put(FINISH_ELECTION_TASK, electionFinishedListener);
clusterService.submitStateUpdateTasks(source, tasks, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
}
submitStateUpdateTask最终通过TaskBatcher# submitTasks来提交任务。执行任务并发布集群状态的总体过程在 MasterService#runTasks 方法中实现。
protected void runTasks(TaskInputs taskInputs) {
publish(clusterChangedEvent, taskOutputs, startTimeNS);
}
- 2、如果其他节点被选为Master:
(1)不再接受其他节点的join请求。
// process any incoming joins (they will fail because we are not the master)
nodeJoinController.stopElectionContext(masterNode + " elected");
(2)向Master发送加入请求,并等待回复。超时时间默认为1分钟(可配置),如果遇到异常,则默认重试3次(可配置)。这个步骤在joinElectedMaster方法中实现。
private boolean joinElectedMaster(DiscoveryNode masterNode) {
try {
int joinAttempt = 0; // we retry on illegal state if the master is not yet ready
while (true) {
try {
membership.sendJoinRequestBlocking(masterNode, clusterService.localNode(), joinTimeout);
return true;
} catch (Exception e) {
final Throwable unwrap = ExceptionsHelper.unwrapCause(e);
if (unwrap instanceof NotMasterException) {
if (++joinAttempt == this.joinRetryAttempts) {
logger.info("failed to send join request to master [{}], reason [{}], tried [{}] times", masterNode, ExceptionsHelper.detailedMessage(e), joinAttempt);
return false;
} else {
logger.trace("master {} failed with [{}]. retrying... (attempts done: [{}])", masterNode, ExceptionsHelper.detailedMessage(e), joinAttempt);
}
} else {
}
return false;
}
}
try {
Thread.sleep(this.joinRetryDelay.millis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
最终当选的Master会先发布集群状态,才确认客户的join请求,因此,joinElectedMaster返回代表收到了join请求的确认,并且已经收到了集群状态。所以如果返回不成功,则重新发起选主流程
(3)检查收到的集群状态中的Master节点如果为空,或者当选的Master不是之前选择的节点,则重新选举。
if (currentState.getNodes().getMasterNode() == null) {
// Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
// a valid master.
logger.debug("no master node is set, despite of join request completing. retrying pings.");
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
return currentState;
}
if (!currentState.getNodes().getMasterNode().equals(finalMasterNode)) {
return joinThreadControl.stopRunningThreadAndRejoin(currentState, "master_switched_while_finalizing_join");
}
// Note: we do not have to start master fault detection here because it's set at {@link #processNextPendingClusterState }
// when the first cluster state arrives.
joinThreadControl.markThreadAsDone(currentThread);
return currentState;
总结
1、es通过主从模式以及发现机制保证节点之间的负载均衡,但是es使用量的急剧增加暴露了很多问题,例如,Zen的minimum_master_nodes设置经常配置错误,这会使群集更容易出现裂脑和丢失数据的风险
2、7.x以上版本Coordinator提供了安全的亚秒级的master选举时间,而Zen可能要花几秒钟来选择一个新的master
3、es的master挂了,数据节点在这区间还能对外提供服务吗?
参考
Elasticsearch分布式一致性原理剖析