1、为什么需要选举?
ElasticJob中,所有的节点服务器都是对等的,他们在整个集群当中的地位都是相同的。为了避免每个服务器节点获取的分片信息不统一,需要由一个主服务器来进行计算和下发分片信息,所以在集群中需要选举出一个主服务器,从服务器获取主服务器计算的分片信息。
2、ElasticJob选举流程实现?
在ElasticJob中的选举实现主要是LeaderService类实现,在ElasticJob的启动流程中,会调用LeaderService#electLeader完成leader选举过程.
public final class LeaderService {
/**
* 选举主节点.
*/
public void electLeader() {
log.debug("Elect a new leader now.");
//{namespace}/{jobname}/leader/election/latch
jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());
log.debug("Leader election completed.");
}
}
在集群中所有的节点服务器执行到electLeader方法时,都会去竞争一个latch分布式锁,这个分布式锁是通过在zookeeper下生成一个{namespace}/{jobname}/leader/election/latch临时节点,谁创建生成节点成功,谁就获取到分布式锁。
/**
* 在主节点执行操作.
*
* @param latchNode 分布式锁使用的作业节点名称
* @param callback 执行操作的回调
*/
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
latch.start();
latch.await();
callback.execute();
//CHECKSTYLE:OFF
} catch (final Exception ex) {
//CHECKSTYLE:ON
handleException(ex);
}
}
集群中的所有节点都去竞争锁,然后调用await方法,没有竞争到的节点会阻塞等待,竞争到锁的节点,await方法直接返回,然后执行回调方法。
public final class LeaderService {
/**
* 判断是否已经有主节点.
*
* @return 是否已经有主节点
*/
public boolean hasLeader() {
return jobNodeStorage.isJobNodeExisted(LeaderNode.INSTANCE);
}
class LeaderElectionExecutionCallback implements LeaderExecutionCallback {
@Override
public void execute() {
if (!hasLeader()) {
jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
}
}
}
}
在选举完成之后,获取到锁的节点和没有获取到锁的节点都会执行回调LeaderElectionExecutionCallback的execute方法:
- !hasLeader:判断是否存在leader节点,判断zk下是否有{namespace}/{jobname}/leader/election/instance节点。
- jobNodeStorage.fillEphemeralJobNode:如果没有leader节点,就在zk下填写临时节点{namespace}/{jobname}/leader/election/instance,值为当前服务器节点的instanceId实例id。
3、主节点宕机怎么办?
在正常情况下一开始启动的时候就会选举出以主节点,在运行一段时间,突然主节点宕机了怎么办?在ElasticJob中也考虑到了这种情况,在启动的时候创建了一个选举监听器,监听主节点下线或手动关闭主节点的情况,主要是在ElectionListenerManager中。
public final class ElectionListenerManager extends AbstractListenerManager {
@Override
public void start() {
addDataListener(new LeaderElectionJobListener());
addDataListener(new LeaderAbdicationJobListener());
}
}
在ElasticJob启动过程中,开启了一些列的监听器,这里面就包含了一个选举事件监听器ElectionListenerManager。
class LeaderElectionJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (!JobRegistry.getInstance().isShutdown(jobName) && (isActiveElection(path, data) || isPassiveElection(path, eventType))) {
leaderService.electLeader();
}
}
}
选举时间监听器开启了对{namespace}/{jobname}/leader/election/instance节点的监听,在主服务器宕机的时候,对应的latch临时节点就会被删除,对应的ElectionListenerManager就会监听到。
- JobRegistry.getInstance().isShutdown:判断当前节点的服务是否关闭。
- isActiveElection(path, data):判断当前节点不是主节点,并且当前服务器运行正常,运行正常的依据是存在{namespace}/{jobname}/servers/server-ip,并且节点内容不为DISABLED
- isPassiveElection(path, eventType):判断当前节点事件为主节点删除,并且当前节点对应的job实例{namespace}/{jobname}/instances/ip存在且不为DISABLED。
4、手动在后台设置主节点服务器为不可用状态?
class LeaderAbdicationJobListener extends AbstractJobListener {
@Override
protected void dataChanged(final String path, final Type eventType, final String data) {
if (leaderService.isLeader() && isLocalServerDisabled(path, data)) {
leaderService.removeLeader();
}
}
private boolean isLocalServerDisabled(final String path, final String data) {
return serverNode.isLocalServerPath(path) && ServerStatus.DISABLED.name().equals(data);
}
}
/**
* 删除主节点供重新选举.
*/
public void removeLeader() {
jobNodeStorage.removeJobNodeIfExisted(LeaderNode.INSTANCE);
}
在ElasticJob启动的过程中,还有个主节点手动失效的节点监听器,在手动设置主节点服务器不可用的时候,该监听器能够监听到节点的变化,然后会删除主服务器节点({namespace}/{jobname}/leader/election/instance)。