ElasticJob故障转移机制

    在ElasticJob中,会把一个任务分成多个分片,然后再把分片分配给集群中不同的节点实例进行作业任务的执行。但是如果集群中的某几台机器宕机,这些分片任务的执行就需要转移到其它正常节点机器进行继续执行分片任务,这就是任务分片的故障转移。在ElasticJob中有对应的节点故障转移的功能,我们在任务配置的时候配置failover参数即可。下面看下故障转移功能在ElasticJob中的实现流程:

FailoverListenerManager:这是故障转移功能开始的地方
@Override
    public void start() {
        addDataListener(new JobCrashedJobListener());
        addDataListener(new FailoverSettingsChangedJobListener());
    }

     FailoverListenerManager是故障转移监听管理器,在ElasticJob启动的时候,他启动了2个监听器JobCrashedJobListener(集群节点宕机监听器)和FailoverSettingsChangedJobListener(故障转移修改监听器),在集群中的节点发生变化的时候能立即监听到,进而能及时做故障转移操作。

class JobCrashedJobListener extends AbstractJobListener {
        
        @Override
        protected void dataChanged(final String path, final Type eventType, final String data) {
            //TODO 当前job运行实例没有关闭、并且config配置开启了故障转移功能、并且是节点删除事件、并且path路径是{jobName}/instances开头的
            if (!JobRegistry.getInstance().isShutdown(jobName) && isFailoverEnabled() && Type.NODE_DELETED == eventType && instanceNode.isInstancePath(path)) {
                //TODO 获取当前被删除的job节点实例id
                String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
                //TODO 被删除节点实例id是否为当前节点
                if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
                    return;
                }
                //TODO 获取故障转移到当前节点的分片信息,初始肯定为空
                List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
                if (!failoverItems.isEmpty()) {
                    for (int each : failoverItems) {
                        failoverService.setCrashedFailoverFlag(each);
                        failoverService.failoverIfNecessary();
                    }
                } else {
                    //TODO 获取当前删除节点对应的初始分片信息
                    for (int each : shardingService.getShardingItems(jobInstanceId)) {
                        //TODO 设置故障转移标记,其实就是创建了/leader/failover/items/{item}持久节点
                        failoverService.setCrashedFailoverFlag(each);
                        //TODO 进行故障转移
                        failoverService.failoverIfNecessary();
                    }
                }
            }
        }
    }
  • 通过zookeeper的watcher机制,ElasticJob能够感知监听到作业下集群节点删除事件。
  • 首先判断当前job作业实例没有关闭,并且开启了故障转移功能,并且当前是节点删除事件。
  • 获取被删除的job节点实例id,如果当前节点是被删除节点实例,忽略本次故障转移处理。
  • 获取节点的所有故障转移分片信息,这里是先获取该作业下的所有/{namespace}/sharding/子节点信息,然后遍历所有分片,获取已经发生故障转移的分片信息(就是获取/sharding/{item}/failover节点信息),然后比对该节点对应的作业实例是否为当前作业实例。
//TODO 获取故障转移到当前节点的分片信息,初始肯定为空
List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);

public List<Integer> getFailoverItems(final String jobInstanceId) {
        //TODO 获取当前{nameSpace}/sharding分片信息
        List<String> items = jobNodeStorage.getJobNodeChildrenKeys(ShardingNode.ROOT);
        List<Integer> result = new ArrayList<>(items.size());
        for (String each : items) {
            int item = Integer.parseInt(each);
            //TODO node=[sharding/{item}/failover]
            String node = FailoverNode.getExecutionFailoverNode(item);
            //TODO 查看node节点是否存在,并且job实例id是否为转移到当前节点
            if (jobNodeStorage.isJobNodeExisted(node) && jobInstanceId.equals(jobNodeStorage.getJobNodeDataDirectly(node))) {
                result.add(item);
            }
        }
        //TODO 分片做下排序
        Collections.sort(result);
        return result;
    }
  • 初次执行故障转移的话,该节点对应的故障转移分片信息肯定为空。然后就是获取分配该故障删除节点的所有分片信息,一个个遍历执行故障转移操作
//TODO 获取当前删除节点对应的初始分片信息
for (int each : shardingService.getShardingItems(jobInstanceId)) {
    //TODO 设置故障转移标记,其实就是创建了/leader/failover/items/{item}持久节点
    failoverService.setCrashedFailoverFlag(each);
    //TODO 进行故障转移
    failoverService.failoverIfNecessary();
}
  • 遍历一个故障节点分片信息,就把他设置为需要故障转移分片,其实就是在zookeeper下面创建一个/leader/failover/items/{item}节点,表示当前分片需要故障转移。在创建该节点前需要先判断当前节点是否已经执行了故障转移(判断/sharding/{item}/failover是否存在)
//TODO 设置故障转移标记,其实就是创建了/leader/failover/items/{item}持久节点
failoverService.setCrashedFailoverFlag(each);

public void setCrashedFailoverFlag(final int item) {
        //TODO 查看当前分片是否执行了故障转移 /sharding/{item}/failover
        if (!isFailoverAssigned(item)) {
            //TODO 创建需要故障转移节点标记 /leader/failover/items/{item}
            jobNodeStorage.createJobNodeIfNeeded(FailoverNode.getItemsNode(item));
        }
    }

private boolean isFailoverAssigned(final Integer item) {
        //TODO 判断/sharding/{item}/failover节点是否存在
        return jobNodeStorage.isJobNodeExisted(FailoverNode.getExecutionFailoverNode(item));
    }
  • 执行当前遍历节点的故障转移操作。
public void failoverIfNecessary() {
        //TODO 是否进行故障转移 /leader/failover/items节点存在并且下面存在子节点信息、并且当前job非运行状态
        if (needFailover()) {
            //TODO 获取故障转移分布式锁/leader/failover/latch,在主节点中进行故障转移
            jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
        }
    }

class FailoverLeaderExecutionCallback implements LeaderExecutionCallback {
        
        @Override
        public void execute() {
            if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) {
                return;
            }
            //TODO 获取节点/leader/failover/items/下的一个子节点信息
            int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
            log.debug("Failover job '{}' begin, crashed item '{}'", jobName, crashedItem);
            //TODO 创建临时节点/sharding/{item}/failover ,value为jobInstanceId
            jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
            //TODO 删除/leader/failover/items/{item}节点
            jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
            // TODO Instead of using triggerJob, use executor for unified scheduling
            JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
            if (null != jobScheduleController) {
                //TODO 触发下任务调度
                jobScheduleController.triggerJob();
            }
        }
    }

1、首先判断当前分片是否能进行故障转移。判断/leader/failover/items节点存在并且存在子节点信息,并且当前job非运行状态。这里解释了一个疑惑,就是为什么作业执行完之后需要在手动调用一下故障转移操作,就是因为在前一次执行故障转移的时候有分片任务正在执行,导致故障转移操作没有执行,所有在分片任务执行完之后再手动执行下故障转移。
2、获取故障转移分布式锁/leader/failover/latch,获取锁的节点即可往下执行当前分片的故障转移操作。
3、执行故障转移操作是在一个zookeeper事务中执行,它通过LeaderExecutionCallback回调完成故障转移操作,在这个事务中,所有的操作要么都执行要么都不执行。
4、获取节点/leader/failover/items/下的一个子节点信息
5、创建临时节点/sharding/{item}/failover,value为jobInstanceId
6、删除/leader/failover/items/{item}节点
7、手动触发一下任务的调度,防止下次下次周期任务没有及时到来,没有即时执行故障节点的任务执行。

以上是执行了故障节点的分片转移,但是对应的故障分片的任务没有被执行,这个是在任务调度执行的时候触发。
    /**
     * 执行作业.
     */
    public final void execute() {
        //...

        ShardingContexts shardingContexts = jobFacade.getShardingContexts();
        
        //...
    }

    @Override
    public ShardingContexts getShardingContexts() {
        //...
        boolean isFailover = configService.load(true).isFailover();
        if (isFailover) {
            //获取分配给当前作业实例的故障转移分片,遍历作业的所有分片信息,获取/sharding/{item}/failover
            List<Integer> failoverShardingItems = failoverService.getLocalFailoverItems();
            if (!failoverShardingItems.isEmpty()) {
                return executionContextService.getJobShardingContext(failoverShardingItems);
            }
        }
    }

    在作业执行流程中获取分片信息的时候,如果开启了故障转移,本次作业的执行,会去优先执行故障转移到当前节点的分片任务。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • ElasticJob的幂等机制,是指作业分片执行的幂等,他需要做到以下两点: 同一个分片在当前作业实例上不会被重复...
    圣村的希望阅读 6,251评论 0 0
  • ElasticJob是一个弹性的分布式任务调度框架,这里的分布式就是采用分片的来进行任务调度和业务执行的解耦,分片...
    圣村的希望阅读 10,195评论 0 0
  • 故障转移 Redis集群自身实现了高可用。高可用首先需要解决集群部分失败的场景:当集群内少量节点出现故障时通过自动...
    linuxzw阅读 3,785评论 0 2
  • 1. 故障发现 当集群内某个节点出现问题时,需要通过一种健壮的方式保证识别出节点是否发生了故障。Redis集群内节...
    CoderJed阅读 7,967评论 0 3
  • 在上篇文章中docker-compose搭建redis-sentinel成功的搭建了1主2从3哨兵。 Sentin...
    gaobinzhan阅读 4,659评论 0 0