Elastic job doAsfterJobExecutedAtLastCompleted多次执行以及完成超时 源码解析

主要涉及到两个类 AbstractDistributeOnceElasticJobListener , 以及guaranteeService
关键点还是在afterJobExecuted 方法这块
用* 表示 重要注释 // 在简书里面 很难看到

  /*
 * Copyright 1999-2015 dangdang.com.
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * </p>
 */

package io.elasticjob.lite.api.listener;

import io.elasticjob.lite.exception.JobSystemException;
import io.elasticjob.lite.executor.ShardingContexts;
import io.elasticjob.lite.internal.guarantee.GuaranteeService;
import io.elasticjob.lite.util.env.TimeService;
import lombok.Setter;

/**
 * 在分布式作业中只执行一次的监听器.
 * 
 * @author zhangliang
 */
public abstract class AbstractDistributeOnceElasticJobListener implements ElasticJobListener {
    
    private final long startedTimeoutMilliseconds;  * 任务开始超时时间 
    
    private final Object startedWait = new Object(); * 任务开始同步锁
    
    private final long completedTimeoutMilliseconds; * 任务结束超时时间
    
    private final Object completedWait = new Object(); * 任务结束同步锁
    
    @Setter
    private GuaranteeService guaranteeService; //保证分布式任务全部开始和结束状态的服务.
    
    private TimeService timeService = new TimeService(); // 时间服务
    
    public AbstractDistributeOnceElasticJobListener(final long startedTimeoutMilliseconds, final long completedTimeoutMilliseconds) {
        if (startedTimeoutMilliseconds <= 0L) {
            this.startedTimeoutMilliseconds = Long.MAX_VALUE;
        } else {
            this.startedTimeoutMilliseconds = startedTimeoutMilliseconds;
        }
        if (completedTimeoutMilliseconds <= 0L) {
            this.completedTimeoutMilliseconds = Long.MAX_VALUE; 
        } else {
            this.completedTimeoutMilliseconds = completedTimeoutMilliseconds;
        }
    }
    
    @Override
    public final void beforeJobExecuted(final ShardingContexts shardingContexts) {
        guaranteeService.registerStart(shardingContexts.getShardingItemParameters().keySet());
        if (guaranteeService.isAllStarted()) {
            doBeforeJobExecutedAtLastStarted(shardingContexts);
            guaranteeService.clearAllStartedInfo();
            return;
        }
        long before = timeService.getCurrentMillis();
        try {
            synchronized (startedWait) {
                startedWait.wait(startedTimeoutMilliseconds);
            }
        } catch (final InterruptedException ex) {
            Thread.interrupted();
        }
        if (timeService.getCurrentMillis() - before >= startedTimeoutMilliseconds) {
            guaranteeService.clearAllStartedInfo();
            handleTimeout(startedTimeoutMilliseconds);
        }
    }
    
 *我们可以看到 doAfterJobExecutedAtLastCompleted 调用者实际是 afterJobExecuted ,
    @Override
    public final void afterJobExecuted(final ShardingContexts shardingContexts) {
        *当有实例 完成任务的时候 ,  guaranteeService 在zk 上注册完成任务分片节点 ,节点path ,
        比如分片1,2,3  在/JOB_NAMESPACE/JOB_NAME/guarantee/completed创建 分片号1,2 ,3 的节点 ,节点为
        永久节点   
        guaranteeService.registerComplete(shardingContexts.getShardingItemParameters().keySet());  
       * 如果job的全部分片完成节点 ,都已在zk上注册  (这个地方造成了多次调用问题 , 不同实例可以同时满足这个条件)
        if (guaranteeService.isAllCompleted()) {
            * 调用doAfterJobExecutedAtLastCompleted 方法 
            doAfterJobExecutedAtLastCompleted(shardingContexts);
           * 清理完成节点  ,直接 clear zk   guarantee 路径下的节点 
            guaranteeService.clearAllCompletedInfo();
            return;
        }
       * 如果没有全部完成 ,则当前实例 需要等待 完成超时时间
        long before = timeService.getCurrentMillis();
        try {
            synchronized (completedWait) {
                * 等待 完成超时时间
                completedWait.wait(completedTimeoutMilliseconds);
            }
        } catch (final InterruptedException ex) {
            * 等待超时 中断
            Thread.interrupted();
        }
        * 等待已经超时
        if (timeService.getCurrentMillis() - before >= completedTimeoutMilliseconds) {
            * clear 掉  zk   guarantee 路径下的节点  ,这里是问题2的原因
            guaranteeService.clearAllCompletedInfo();
            * 处理超时异常 ,抛出JobSystemException
            handleTimeout(completedTimeoutMilliseconds);
        }
    }
    
    private void handleTimeout(final long timeoutMilliseconds) {
        throw new JobSystemException("Job timeout. timeout mills is %s.", timeoutMilliseconds);
    }
    
    /**
     * 分布式环境中最后一个作业执行前的执行的方法.
     *
     * @param shardingContexts 分片上下文
     */
    public abstract void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts);
    
    /**
     * 分布式环境中最后一个作业执行后的执行的方法.
     *
     * @param shardingContexts 分片上下文
     */
    public abstract void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts);
    
    /**
     * 通知任务开始.
     */
    public void notifyWaitingTaskStart() {
        synchronized (startedWait) {
            startedWait.notifyAll();
        }
    }
    
    /**
     * 通知任务结束.
     */
    public void notifyWaitingTaskComplete() {
        synchronized (completedWait) {
            completedWait.notifyAll();
        }
    }
}

问题 1 ,在源码中 可以 很简单看到原因 ,多实例 可以同时执行doAfterJobExecutedAtLastCompleted
问题 2 ,之前提过 超时后, doAfterJobExecutedAtLastCompleted 会不会执行的问题 ,大多数情况 是不会执行的 ,但是已经超时 但并未清除zk完成节点时候 , guaranteeService.isAllCompleted() 会返回true

问题 1的解决方案
1 doAfterJobExecutedAtLastCompleted 方法 幂等
2 doAfterJobExecutedAtLastCompleted 业务方法里面 实现 分布式锁的机制 ,谁先获取锁 ,谁执行
3 源码中 afterJobExecuted 方法里面 实现 分布式锁的机制 ,谁先获取锁 ,谁执行

问题 2的解决方案
1 completedTimeoutMilliseconds 时间 设计 必须要长于 job 完成 实际时间 , 这点 需要注意的 , 如果设置小了 ,job 不可能完成

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 关于Mongodb的全面总结 MongoDB的内部构造《MongoDB The Definitive Guide》...
    中v中阅读 32,109评论 2 89
  • 本文是我自己在秋招复习时的读书笔记,整理的知识点,也是为了防止忘记,尊重劳动成果,转载注明出处哦!如果你也喜欢,那...
    波波波先森阅读 13,872评论 4 56
  • Java继承关系初始化顺序 父类的静态变量-->父类的静态代码块-->子类的静态变量-->子类的静态代码快-->父...
    第六象限阅读 6,447评论 0 9
  • 实现思路 依然是用到UIScrollView,不同的是这次运用重用的思想,因为不管我们有多少张图片要轮播,在UIS...
    Salad可乐阅读 7,276评论 11 1
  • 会不会有这样一个人,无关乎爱情只是希望他过得好。 会不会有这样一个人,你对他的好不求回报。 会不会有这样一个人,你...
    俊白阅读 1,729评论 0 2