1 概述
ElasticSearch中Discovery
负责Mater选举、Cluster发现、向Cluster中的node发布状态等。
其源码注释如下:
A pluggable module allowing to implement discovery of other nodes, publishing of the cluster state to all nodes, electing a master of the cluster that raises cluster state change events.
Discovery
在ElasticSearch中有两个实现:
-
SingleNodeDiscovery
: 只有一个节点的集群。 -
ZenDiscovery
: 有多个节点的集群。
本文的重点在于介绍ZenDiscovery
,介绍Master选举、Cluster发现等相关实现。
2 ZenDiscovery
实例化和启动
ZenDiscovery
在DiscoveryModule
中获取并初始化,DiscoveryModule
在Node
类的构造函数中初始化。
Node.start
函数会触发ZenDiscovery.doStart
函数的调用:
//ZenDiscovery
@Override
protected void doStart() {
DiscoveryNode localNode = transportService.getLocalNode();
assert localNode != null;
synchronized (stateMutex) {
// set initial state
assert committedState.get() == null;
assert localNode != null;
ClusterState.Builder builder = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings));
ClusterState initialState = builder
.blocks(ClusterBlocks.builder()
//这里其实有个需要注意的地方,STATE_NOT_RECOVERED_BLOCK
//表示节点刚启动没有进行集群、索引元数据等的选举、恢复
//等,主节点在对新ClusterState响应时会触发集群、索引
//元数据选举,完成后移除STATE_NOT_RECOVERED_BLOCK
//具体过程后续文章会有介绍
.addGlobalBlock(STATE_NOT_RECOVERED_BLOCK)
.addGlobalBlock(discoverySettings.getNoMasterBlock()))
.nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()))
.build();
committedState.set(initialState);
clusterApplier.setInitialState(initialState);
//nodesFD主要是主节点定时ping集群内其他非主节点并发现节点故障(下线)的工具类
nodesFD.setLocalNode(localNode);
//加入Cluster的工具类,后文具体介绍
joinThreadControl.start();
}
//zenPing主要用于主节点选举时ping,以此其他节点获取其他节点状态
zenPing.start();
}
3 集群启动时的初始join
每个节点的Node.start
函数中会调用ZenDiscovery.startInitialJoin
进行第一次join操作。
[pic-startInitialJoin调用轨迹]
上面列出的代码有一行为joinThreadControl.start()
,这里我们先看下具体实现:
//ZenDiscovery.JoinThreadControl
public void start() {
//设置running=true,标识已启动,后续使用JoinThreadControl进行Master选举等都会判断该标志位。
running.set(true);
}
下面看ZenDiscovery.startInitialJoin
的具体实现
//ZenDiscovery.startInitialJoin
@Override
public void startInitialJoin() {
// start the join thread from a cluster state update. See {@link JoinThreadControl} for details.
synchronized (stateMutex) {
// do the join on a different thread, the caller of this method waits for 30s anyhow till it is discovered
joinThreadControl.startNewThreadIfNotRunning();
}
}
上面是初始join的实现,最终都是调用joinThreadControl.startNewThreadIfNotRunning();
实现选主,后续如果主节点发生故障等也是使用同样的逻辑。下面具体讨论joinThreadControl.startNewThreadIfNotRunning()
实现。
3.1 Master选举以及加入集群大致流程
//ZenDiscovery.JoinThreadControl
/** starts a new joining thread if there is no currently active one and join thread controlling is started */
public void startNewThreadIfNotRunning() {
assert Thread.holdsLock(stateMutex);
//判断是否本节点是否已经有其他线程启动了join操作,如果是的话则直接返回
if (joinThreadActive()) {
return;
}
//加入join任务
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
Thread currentThread = Thread.currentThread();
//CAS设置当前线程为当前join线程,如果成功则继续后续操作,
//如果失败,则表示已经有其他线程抢先jion操作了,此时则直接返回
if (!currentJoinThread.compareAndSet(null, currentThread)) {
return;
}
//这里while进行循环,直到加入成功为止,后面可能因为当前当选为
//主节点等待投票超时、可选主节点个数不足等失败
while (running.get() && joinThreadActive(currentThread)) {
try {
//具体加入集群和Master选举函数
innerJoinCluster();
return;
} catch (Exception e) {
...
}
}
// cleaning the current thread from currentJoinThread is done by explicit calls.
}
});
}
JoinThreadControl
是ZenDiscovery
内部类,innerJoinCluster
是ZenDiscovery
方法,Master选举的具体实现就在innerJoinCluster
中:
//ZenDiscovery
private void innerJoinCluster() {
DiscoveryNode masterNode = null;
final Thread currentThread = Thread.currentThread();
//nodeJoinController是负责处理其他节点加入当前当选的主节点,其他节点
//加入主机点之后会触发reroute,进行shard分配
nodeJoinController.startElectionContext();
while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
//选举master节点的实现所在
masterNode = findMaster();
}
if (!joinThreadControl.joinThreadActive(currentThread)) {
logger.trace("thread is no longer in currentJoinThread. Stopping.");
return;
}
//如果当前节点就是此次选举出的Master节点,则等待其他节点对其选举结果
//进行确认,即等待其他节点加入该节点。这里也属于一个投票过程
if (transportService.getLocalNode().equals(masterNode)) {
//根据配置获取选举成功的最少参与节点数,当加入当前节点的其
//他节点数目大于等于此值时则此节点才会真正当选master成功
final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
//等待其他节点加入此节点
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
new NodeJoinController.ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
//成功当选为master节点,标识流程成功结束
//markThreadAsDone会清空joinThreadControl中记录的选举线程,
//以后续有需要时rejoin
synchronized (stateMutex) {
joinThreadControl.markThreadAsDone(currentThread);
}
}
@Override
public void onFailure(Throwable t) {
logger.trace("failed while waiting for nodes to join, rejoining", t);
synchronized (stateMutex) {
//等待投票失败,重新启动整个join逻辑
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
}
}
}
);
} else {
//如果当前节点不是此次选举中被选中的主节点,则
//停止选举流程,开始加入刚选举出的主节点(即投票过程)
// process any incoming joins (they will fail because we are not the master)
//停止选举过程
nodeJoinController.stopElectionContext(masterNode + " elected");
// send join request
//向此次选举出的master节点发起join请求,即承认其master身份
final boolean success = joinElectedMaster(masterNode);
synchronized (stateMutex) {
//成功加入刚选举出的master节点
if (success) {
DiscoveryNode currentMasterNode = this.clusterState().getNodes().getMasterNode();
//没有主节点异常
if (currentMasterNode == null) {
// Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
// a valid master.
logger.debug("no master node is set, despite of join request completing. retrying pings.");
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
} else if (currentMasterNode.equals(masterNode) == false) {
// update cluster state
joinThreadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join");
}
//标识加入成功
joinThreadControl.markThreadAsDone(currentThread);
} else {
// failed to join. Try again...
//加入当选的主节点失败则重新启动join过程
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
}
}
}
}
3.1.2 Master选举实现:选举算法
Master选举主要逻辑在ZenDiscovery.findMaster
中:
//ZenDiscovery
private DiscoveryNode findMaster() {
logger.trace("starting to ping");
//pingAndWait用于获取其他节点的状态,这里只介绍下大致实现,不再展开具体源码:
//pingAndWait主要是使用上面介绍的ZenPing去ping配置中的所有host,具体实现逻辑可以
//参考ZenPing的默认实现UnicastZenPing。
//通过函数名称可以知道这是个同步调用,同步的具体实现和ElasticSearch大部分需要等待
//远程通信返回的行为类似,采用计数器记录发送的请求个数,每次有请求响应时递减计数器,
//当计数器递减为0时表示所有请求都得到了响应。
List<ZenPing.PingResponse> fullPingResponses = pingAndWait(pingTimeout).toList();
if (fullPingResponses == null) {
logger.trace("No full ping responses");
return null;
}
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder();
if (fullPingResponses.size() == 0) {
sb.append(" {none}");
} else {
for (ZenPing.PingResponse pingResponse : fullPingResponses) {
sb.append("\n\t--> ").append(pingResponse);
}
}
logger.trace("full ping responses:{}", sb);
}
final DiscoveryNode localNode = transportService.getLocalNode();
// add our selves
//在获取的装填集中加入当前节点自己的状态,因为自己也需要加入选举,也可能被选举为主节点
assert fullPingResponses.stream().map(ZenPing.PingResponse::node)
.filter(n -> n.equals(localNode)).findAny().isPresent() == false;
fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterState()));
// filter responses
final List<ZenPing.PingResponse> pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);
//activeMasters用来记录当前已经存在的主节点
List<DiscoveryNode> activeMasters = new ArrayList<>();
for (ZenPing.PingResponse pingResponse : pingResponses) {
// We can't include the local node in pingMasters list, otherwise we may up electing ourselves without
// any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
//如果返回的节点表明自己当前已经是主节点(这里可以看上面的因为注释,避免了自己选自己的情况)
if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {
activeMasters.add(pingResponse.master());
}
}
// nodes discovered during pinging
//masterCandidates用来记录配置为可以成为主节点的候选节点
List<ElectMasterService.MasterCandidate> masterCandidates = new ArrayList<>();
//这里将返回节点中配置为可以作为主节点的节点加入候选节点中
for (ZenPing.PingResponse pingResponse : pingResponses) {
//这里要注意isMasterNode并不是说明该节点是不是主节点,而是表明该节点能不能成为主节点
if (pingResponse.node().isMasterNode()) {
masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
}
}
//如果当前存在的主节点列表activeMasters为空,则从候选节点列表masterCandidates中选取主节点
if (activeMasters.isEmpty()) {
//判断是否有足够的候选节点
if (electMaster.hasEnoughCandidates(masterCandidates)) {
//进行节点选举
final ElectMasterService.MasterCandidate winner =
electMaster.electMaster(masterCandidates);
logger.trace("candidate {} won election", winner);
return winner.getNode();
} else {
// if we don't have enough master nodes, we bail, because there are not enough master to elect from
logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again",
masterCandidates, electMaster.minimumMasterNodes());
return null;
}
} else {
//如果当前存在的主节点列表activeMasters不为空,则从中选取主节点
assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
// lets tie break between discovered nodes
return electMaster.tieBreakActiveMasters(activeMasters);
}
}
根据上述代码可知,ElasticSearch在选取主节点时,首先使用electMaster.tieBreakActiveMasters
在当前已经是主节点的列表中选取,如果该列表为空则再使用electMaster.electMaster
在候选列表中选取。
electMaster.electMaster
和electMaster.tieBreakActiveMasters
是选举的逻辑所在,但是比较简单,展开源码可以发现最终选举过程是首先对列表中的节点做一个基数排序,排序原则有两个
- 先判断对比的两个节点是否配置为可当选为主节点,配置可以当选为主节点的获胜。
- 如果上述不能区分两个节点的顺序,则再根据节点ID进行排序,ID小的获胜。
上述说明了ZenDiscovery.innerJoinCluster
定义了选举和加入集群的逻辑,在findMaster
选举出主节点后,主节点则等待其他节点加入自己;其他节点发现自己不是此次选举出的主节点,则向此次选举出的主节点发送join请求,加入该节点。
electMaster.electMaster
和electMaster.tieBreakActiveMasters
最终都是通过调用如下函数对列表中的节点进行排序,然后取第一个节点做为主节点的:
/** master nodes go before other nodes, with a secondary sort by id **/
//ElectMasterService
private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) {
//首选判断该节点是否配置为可当选为主节点
if (o1.isMasterNode() && !o2.isMasterNode()) {
return -1;
}
if (!o1.isMasterNode() && o2.isMasterNode()) {
return 1;
}
//在比较节点ID
return o1.getId().compareTo(o2.getId());
}
3.1.3 Master选举实现:等待其他节点加入自己
当一个节点当选为主节点时,通过上面的ZenDiscovery.innerJoinCluster
可知,当选为主节点的节点和落选的节点会执行不同的操作,当选为主节点的节点会等待足够数量的其他节点节点加入自己,而落选的节点则向当选为主节点的节点发送DISCOVERY_JOIN_ACTION_NAM请求加入主节点,下面我们分别看一下相关实现:
3.1.3.1 当选为主节点等待其他节点加入
当选为主节点的节点等待其他节点加入,其实就是一个投票过程,先再次看一下ZenDiscovery.innerJoinCluster
实现:
//ZenDiscovery
private void innerJoinCluster() {
...
使用findMaster选举主节点
//如果自己当选为主节点
if (transportService.getLocalNode().equals(masterNode)) {
//配置discovery.zen.minimum_master_nodes要求的最少加入自己的节点数
final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
...
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
new NodeJoinController.ElectionCallback() {
//当选为主节点,清空状态,以备下一次选举
@Override
public void onElectedAsMaster(ClusterState state) {
synchronized (stateMutex) {
joinThreadControl.markThreadAsDone(currentThread);
}
}
//此次选举失败,重新开启下一轮选举
@Override
public void onFailure(Throwable t) {
logger.trace("failed while waiting for nodes to join, rejoining", t);
synchronized (stateMutex) {
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
}
}
}
);
} else {
//本次选举落选的节点,首先停止选举操作
// process any incoming joins (they will fail because we are not the master)
nodeJoinController.stopElectionContext(masterNode + " elected");
//向本次当选为主节点的节点发送join请求
// send join request
final boolean success = joinElectedMaster(masterNode);
synchronized (stateMutex) {
...一些异常处理
}
}
}
当选为主节点的节点会调用nodeJoinController.waitToBeElectedAsMaster
等待其他节点加入自己(投票)。这里不再展开其实现原理,和其他同步操作一样,采用计数器记录要求加入自己的节点个数,等待其他节点发送DISCOVERY_JOIN_ACTION_NAME请求加入自己,每成功收到一个请求即递减计数器,计算器等于0时表示满足所需的最少票数,此时才表示自己真正成为主节点。
3.1.3.2 落选节点加入主节点
这里其实就是一个投票过程,落选节点通过调用joinElectedMaster
加入主节点:
private boolean joinElectedMaster(DiscoveryNode masterNode) {
try {
// first, make sure we can connect to the master
//先尝试是否能够连接次轮选举中当选的主节点
transportService.connectToNode(masterNode);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to connect to master [{}], retrying...", masterNode), e);
return false;
}
int joinAttempt = 0; // we retry on illegal state if the master is not yet ready
//while循环,表示如果发送可接受的异常时,会重复发送join请求
while (true) {
try {
logger.trace("joining master {}", masterNode);
membership.sendJoinRequestBlocking(masterNode, transportService.getLocalNode(), joinTimeout);
return true;
} catch (Exception e) {
final Throwable unwrap = ExceptionsHelper.unwrapCause(e);
if (unwrap instanceof NotMasterException) {
//统计失败次数,如果达到指定此时则不再发送
if (++joinAttempt == this.joinRetryAttempts) {
logger.info("failed to send join request to master [{}], reason [{}], tried [{}] times", masterNode, ExceptionsHelper.detailedMessage(e), joinAttempt);
return false;
} else {
logger.trace("master {} failed with [{}]. retrying... (attempts done: [{}])", masterNode, ExceptionsHelper.detailedMessage(e), joinAttempt);
}
} else {
if (logger.isTraceEnabled()) {
logger.trace(() -> new ParameterizedMessage("failed to send join request to master [{}]", masterNode), e);
} else {
logger.info("failed to send join request to master [{}], reason [{}]", masterNode, ExceptionsHelper.detailedMessage(e));
}
return false;
}
}
try {
//一次失败之后等待一定时间之后再次重试
Thread.sleep(this.joinRetryDelay.millis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
上面代码中可以看出,当前节点会首先尝试连接当选的主节点,transportService.connectToNode
实现原理可以参考这篇ElasticSearch 基于Nettty的通信原理,其实就是克隆客户端Bootstrap
模板,注册handler,然后调用connect
函数,成功则表示可以连接。
连接成功之后会发送DISCOVERY_JOIN_ACTION_NAME请求,上面代码中失败重试次数通过discovery.zen.join_retry_attempts
进行配置,每次重试之间的等待时间通过discovery.zen.join_retry_delay
进行配置。
在MembershipAction
可以看到当选为主节点的节点处理DISCOVERY_JOIN_ACTION_NAME请求的handler为JoinRequestRequestHandler
,大概处理方式在3.1.3.1已经介绍过,这里不再赘述。
4 ElasticSearch运行过程中的rejoin
ElasticSearch运行过程中,主节点可能会发送故障,也可以使用api使ElasticSearch重新进行主节点选举,运行过程中的重新选举是通过调用ZenDiscovery.rejoin
实现的:
protected void rejoin(String reason) {
assert Thread.holdsLock(stateMutex);
ClusterState clusterState = committedState.get();
logger.warn("{}, current nodes: {}", reason, clusterState.nodes());
//停止负责互ping的服务
nodesFD.stop();
masterFD.stop(reason);
// TODO: do we want to force a new thread if we actively removed the master? this is to give a full pinging cycle
// before a decision is made.
//开始一轮新的选举
joinThreadControl.startNewThreadIfNotRunning();
if (clusterState.nodes().getMasterNodeId() != null) {
// remove block if it already exists before adding new one
assert clusterState.blocks().hasGlobalBlock(discoverySettings.getNoMasterBlock().id()) == false :
"NO_MASTER_BLOCK should only be added by ZenDiscovery";
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(clusterState.blocks())
.addGlobalBlock(discoverySettings.getNoMasterBlock())
.build();
DiscoveryNodes discoveryNodes = new DiscoveryNodes.Builder(clusterState.nodes()).masterNodeId(null).build();
clusterState = ClusterState.builder(clusterState)
.blocks(clusterBlocks)
.nodes(discoveryNodes)
.build();
committedState.set(clusterState);
clusterApplier.onNewClusterState(reason, this::clusterState, (source, e) -> {}); // don't wait for state to be applied
}
}
通过rejoin调用轨迹可以大概知道何时会触发rejoin进行新一轮的选举:
通过上图可以看出收到主动rejoin请求、主节点失联(故障)、最小主节点数发生变化、集群状态Publish异常等都会触发rejoin。
5 自动Cluster发现
其实就是上述join的过程。