ElasticJob选举实现

1、为什么需要选举?
   ElasticJob中,所有的节点服务器都是对等的,他们在整个集群当中的地位都是相同的。为了避免每个服务器节点获取的分片信息不统一,需要由一个主服务器来进行计算和下发分片信息,所以在集群中需要选举出一个主服务器,从服务器获取主服务器计算的分片信息。

2、ElasticJob选举流程实现?
   在ElasticJob中的选举实现主要是LeaderService类实现,在ElasticJob的启动流程中,会调用LeaderService#electLeader完成leader选举过程.

public final class LeaderService {
    /**
     * 选举主节点.
     */
    public void electLeader() {
        log.debug("Elect a new leader now.");
        //{namespace}/{jobname}/leader/election/latch
        jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());
        log.debug("Leader election completed.");
    }
}

    在集群中所有的节点服务器执行到electLeader方法时,都会去竞争一个latch分布式锁,这个分布式锁是通过在zookeeper下生成一个{namespace}/{jobname}/leader/election/latch临时节点,谁创建生成节点成功,谁就获取到分布式锁。

/**
     * 在主节点执行操作.
     * 
     * @param latchNode 分布式锁使用的作业节点名称
     * @param callback 执行操作的回调
     */
    public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
        try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
            latch.start();
            latch.await();
            callback.execute();
        //CHECKSTYLE:OFF
        } catch (final Exception ex) {
        //CHECKSTYLE:ON
            handleException(ex);
        }
    }

    集群中的所有节点都去竞争锁,然后调用await方法,没有竞争到的节点会阻塞等待,竞争到锁的节点,await方法直接返回,然后执行回调方法。

public final class LeaderService {

    /**
     * 判断是否已经有主节点.
     * 
     * @return 是否已经有主节点
     */
    public boolean hasLeader() {
        return jobNodeStorage.isJobNodeExisted(LeaderNode.INSTANCE);
    }

    class LeaderElectionExecutionCallback implements LeaderExecutionCallback {
        
        @Override
        public void execute() {
            if (!hasLeader()) {
                jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
            }
        }
    }
}

    在选举完成之后,获取到锁的节点和没有获取到锁的节点都会执行回调LeaderElectionExecutionCallback的execute方法:

  • !hasLeader:判断是否存在leader节点,判断zk下是否有{namespace}/{jobname}/leader/election/instance节点。
  • jobNodeStorage.fillEphemeralJobNode:如果没有leader节点,就在zk下填写临时节点{namespace}/{jobname}/leader/election/instance,值为当前服务器节点的instanceId实例id。

3、主节点宕机怎么办?
    在正常情况下一开始启动的时候就会选举出以主节点,在运行一段时间,突然主节点宕机了怎么办?在ElasticJob中也考虑到了这种情况,在启动的时候创建了一个选举监听器,监听主节点下线或手动关闭主节点的情况,主要是在ElectionListenerManager中。

public final class ElectionListenerManager extends AbstractListenerManager {
    
    @Override
    public void start() {
        addDataListener(new LeaderElectionJobListener());
        addDataListener(new LeaderAbdicationJobListener());
    }
}

    在ElasticJob启动过程中,开启了一些列的监听器,这里面就包含了一个选举事件监听器ElectionListenerManager。

class LeaderElectionJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (!JobRegistry.getInstance().isShutdown(jobName) && (isActiveElection(path, data) || isPassiveElection(path, eventType))) {
                leaderService.electLeader();
            }
        }
}

    选举时间监听器开启了对{namespace}/{jobname}/leader/election/instance节点的监听,在主服务器宕机的时候,对应的latch临时节点就会被删除,对应的ElectionListenerManager就会监听到。

  • JobRegistry.getInstance().isShutdown:判断当前节点的服务是否关闭。
  • isActiveElection(path, data):判断当前节点不是主节点,并且当前服务器运行正常,运行正常的依据是存在{namespace}/{jobname}/servers/server-ip,并且节点内容不为DISABLED
  • isPassiveElection(path, eventType):判断当前节点事件为主节点删除,并且当前节点对应的job实例{namespace}/{jobname}/instances/ip存在且不为DISABLED。

4、手动在后台设置主节点服务器为不可用状态?

class LeaderAbdicationJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            if (leaderService.isLeader() && isLocalServerDisabled(path, data)) {
                leaderService.removeLeader();
            }
        }
        
        private boolean isLocalServerDisabled(final String path, final String data) {
            return serverNode.isLocalServerPath(path) && ServerStatus.DISABLED.name().equals(data);
        }
    }

/**
     * 删除主节点供重新选举.
     */
    public void removeLeader() {
        jobNodeStorage.removeJobNodeIfExisted(LeaderNode.INSTANCE);
    }

    在ElasticJob启动的过程中,还有个主节点手动失效的节点监听器,在手动设置主节点服务器不可用的时候,该监听器能够监听到节点的变化,然后会删除主服务器节点({namespace}/{jobname}/leader/election/instance)。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容