分布式定时任务elastic-job(二)

目录

目录.png

分布式定时任务系列


执行

执行器的创建

AbstractElasticJobExecutor各个属性
AbstractElasticJobExecutor各个属性.png
  • JobFacade: 作业门面对象
  • JobRootConfiguration: 作业配置
  • ExecutorService: 执行线程池
  • JobExceptionHandler: 异常处理器
构造函数
  • 构造函数调用
protected AbstractElasticJobExecutor(final JobFacade jobFacade) {
    this.jobFacade = jobFacade;
    // 加载作业配置
    jobRootConfig = jobFacade.loadJobRootConfiguration(true);
    jobName = jobRootConfig.getTypeConfig().getCoreConfig().getJobName();
    // 获取线程池
    executorService = ExecutorServiceHandlerRegistry.getExecutorServiceHandler(jobName, (ExecutorServiceHandler) getHandler(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER));
    // 获取异常处理器
    jobExceptionHandler = (JobExceptionHandler) getHandler(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER);
    // 分片错误信息集合设置
    itemErrorMessages = new ConcurrentHashMap<>(jobRootConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(), 1);
}
  • 拿simple job举例是通过SimpleJobExecutor初始化父类AbstractElasticJobExecutor
public final class SimpleJobExecutor extends AbstractElasticJobExecutor {

    /**
     * simple job实现
     */
    private final SimpleJob simpleJob;
    
    public SimpleJobExecutor(final SimpleJob simpleJob, final JobFacade jobFacade) {
        super(jobFacade);
        this.simpleJob = simpleJob;
    }
}
构造函数中获取线程池
  • 当某台服务承担多个分片时,需要多线程执行加快速度,再加上任务隔离性,一个任务对应一个线程池。
// AbstractElasticJobExecutor
protected AbstractElasticJobExecutor(final JobFacade jobFacade) {
    // 省略部分代码
    // 获取线程池
    executorService = ExecutorServiceHandlerRegistry.getExecutorServiceHandler(jobName, (ExecutorServiceHandler) getHandler(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER));
}

// AbstractElasticJobExecutor.getHandler
private Object getHandler(final JobProperties.JobPropertiesEnum jobPropertiesEnum) {
    // 根据配置的实现ExecutorServiceHandler子类的类,反射获取对象,这样实现了SPI自定义线程池
    String handlerClassName = jobRootConfig.getTypeConfig().getCoreConfig().getJobProperties().get(jobPropertiesEnum);
    try {
        Class<?> handlerClass = Class.forName(handlerClassName);
        if (jobPropertiesEnum.getClassType().isAssignableFrom(handlerClass)) {
            return handlerClass.newInstance();
        }
        return getDefaultHandler(jobPropertiesEnum, handlerClassName);
    } catch (final ReflectiveOperationException ex) {
        return getDefaultHandler(jobPropertiesEnum, handlerClassName);
    }
}

// ExecutorServiceHandlerRegistry
// Map<String, ExecutorService> REGISTRY = new HashMap<>()
public static synchronized ExecutorService getExecutorServiceHandler(final String jobName, final ExecutorServiceHandler executorServiceHandler) {
    // 缓存不存在则创建线程池,key是jobName
    if (!REGISTRY.containsKey(jobName)) 
        REGISTRY.put(jobName, executorServiceHandler.createExecutorService(jobName));
    }
    return REGISTRY.get(jobName);
}

// ExecutorServiceHandler 是个接口, 支持自定义线程池
ExecutorService createExecutorService(final String jobName);
  • 构造函数中getHandler是实现可以自定义线程池, 根据配置的实现ExecutorServiceHandler子类的类,反射获取对象,这样实现了SPI自定义线程池
  • 自定义线程池和自定义异常处理器逻辑都是一样的
private LiteJobConfiguration getLiteJobConfiguration(Class<? extends SimpleJob> jobClass, String cron,
                                                         int shardingTotalCount, String shardingItemParameters, String jobParameter, boolean failover,
                                                         boolean monitorExecution, int monitorPort, int maxTimeDiffSeconds, String jobShardingStrategyClass) {

    //定义作业核心配置
    JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration
            .newBuilder(jobClass.getName(), cron, shardingTotalCount)
            .misfire(true)
            .failover(failover)
            .jobParameter(jobParameter)
            .shardingItemParameters(shardingItemParameters)
            // 自定义异常处理器,也可以自定义线程池
            .jobProperties("job_exception_handler", "com.seeger.demo.config.MyJobExceptionHandler")
            .build();

    // 省略其他代码

    return liteJobConfiguration;
}
  • 如果没有自定义线程池则使用DefaultExecutorServiceHandler创建线程池, DefaultExecutorServiceHandler创建的线程池使用了guava提供的方法可以再jvm退出时增加钩子等120秒再优雅退出,kill -15类似这种。这种方法会把线程池的线程设置成守护线程,如果遇到需要发mq, 操作数据库,不太适合,因为这jvm关闭spring也关了,发mq也会需要spring(因为一般使用mq都是结合spring来的),这会导致发送失败。
// DefaultExecutorServiceHandler
public final class DefaultExecutorServiceHandler implements ExecutorServiceHandler { 
    @Override
    public ExecutorService createExecutorService(final String jobName) {
        return new ExecutorServiceObject("inner-job-" + jobName, Runtime.getRuntime().availableProcessors() * 2).createExecutorService();
    }
}

// ExecutorServiceObject
public ExecutorService createExecutorService() {
    // 使用guava提供的方法将线程池自定增加jvm退出钩子
    return MoreExecutors.listeningDecorator(MoreExecutors.getExitingExecutorService(threadPoolExecutor));
}
构造函数中获取异常处理器
  • 实现思路跟获取线程池很像,也是可以自定义的SPI接口,也有默认实现
public final class DefaultJobExceptionHandler implements JobExceptionHandler {
    
    @Override
    public void handleException(final String jobName, final Throwable cause) {
        log.error(String.format("Job '%s' exception occur in job processing", jobName), cause);
    }
}

执行器的执行

  • 时序图


    时序图.png
1.1 检查作业环境
  • 源码
// AbstractElasticJobExecutor
public final void execute() {
    try {
        jobFacade.checkJobExecutionEnvironment();
    } catch (final JobExecutionEnvironmentException cause) {
        jobExceptionHandler.handleException(jobName, cause);
    }
    // 省略部分代码
}

// LiteJobFacade
public void checkJobExecutionEnvironment() throws JobExecutionEnvironmentException {
    configService.checkMaxTimeDiffSecondsTolerable();
}
// ConfigService
public void checkMaxTimeDiffSecondsTolerable() throws JobExecutionEnvironmentException {
    // 检查本机与注册中心的时间误差秒数是否在允许范围, 可通过插入测试数据获取zk的更新时间判断zk那边的系统时间
    int maxTimeDiffSeconds = load(true).getMaxTimeDiffSeconds();
    if (-1  == maxTimeDiffSeconds) {
        return;
    }
    long timeDiff = Math.abs(timeService.getCurrentMillis() - jobNodeStorage.getRegistryCenterTime());
    if (timeDiff > maxTimeDiffSeconds * 1000L) {
        throw new JobExecutionEnvironmentException(
                "Time different between job server and register center exceed '%s' seconds, max time different is '%s' seconds.", timeDiff / 1000, maxTimeDiffSeconds);
    }
}
1.2 获取当前作业上下文
  • 在分片章节分析
1.3,1.5 发布作业状态追踪事件
  • 在事件追踪章节分析
1.4 跳过正在运行中的被错过执行的作业
  • 这里是防御性编程,防止正在执行的但是属于错过执行的任务重复执行。因为错过任务是在1.9清除错过标记之后(如果1.8判断需要执行错过方法),调用executor方法执行错过任务(1.10),1.7会有executor方法调用这个是执行正常任务。这也就是说,错过任务是在正常任务执行完,马上执行,而不像失效转移任务不是在正常任务执行完,下次任务执行失效转移任务。
  • 分配的作业分片项里存在任意一个分片正在运行中,则设置分片项被错过执行,不去执行这些任务,因为这时候如果不跳过,可能导致同时运行某个作业分片。依赖monitorExecution为true记录下有分片在执行。
// LiteJobFacade.java
@Override
public boolean misfireIfRunning(final Collection<Integer> shardingItems) {
   return executionService.misfireIfHasRunningItems(shardingItems);
}

// ExecutionService.java
public boolean misfireIfHasRunningItems(final Collection<Integer> items) {
   if (!hasRunningItems(items)) {
       return false;
   }
   setMisfire(items);
   return true;
}

public boolean hasRunningItems(final Collection<Integer> items) {
   LiteJobConfiguration jobConfig = configService.load(true);
   if (null == jobConfig || !jobConfig.isMonitorExecution()) {
       return false;
   }
   for (int each : items) {
       if (jobNodeStorage.isJobNodeExisted(ShardingNode.getRunningNode(each))) {
           return true;
       }
   }
   return false;
}
1.6 作业执行前的方法
  • 监听器章节分析, 源码
public void beforeJobExecuted(final ShardingContexts shardingContexts) {
    for (ElasticJobListener each : elasticJobListeners) {
        each.beforeJobExecuted(shardingContexts);
    }
}
执行正常任务
  • 1.7执行正常任务executor方法
// 错过执行和正常任务执行还有失效转移执行都是调用此方法,ExecutionSource 不同而已
// AbstractElasticJobExecutor.java
private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
   // 分片数据为空,则发布事件
   if (shardingContexts.getShardingItemParameters().isEmpty()) {
       if (shardingContexts.isAllowSendJobEvent()) {
           jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));
       }
       return;
   }
   // 注册作业启动信息
   jobFacade.registerJobBegin(shardingContexts);
   // 发布作业状态追踪事件
   String taskId = shardingContexts.getTaskId();
   if (shardingContexts.isAllowSendJobEvent()) {
       jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
   }
   try {
       // 核心处理
       process(shardingContexts, executionSource);
   } finally {
       // 注册作业完成信息
       jobFacade.registerJobCompleted(shardingContexts);
       // 根据是否有异常,发布作业状态追踪事件
       if (itemErrorMessages.isEmpty()) {
           if (shardingContexts.isAllowSendJobEvent()) {
               jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
           }
       } else {
           if (shardingContexts.isAllowSendJobEvent()) {
               jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
           }
       }
   }
}
  • 注册作业启动信息
// LiteJobFacade
public void registerJobBegin(final ShardingContexts shardingContexts) {
   executionService.registerJobBegin(shardingContexts);
}

// ExecutionService
public void registerJobBegin(final ShardingContexts shardingContexts) {
    // 告诉JobRegistry当前jobName下的任务正在执行
    JobRegistry.getInstance().setJobRunning(jobName, true);
    if (!configService.load(true).isMonitorExecution()) {
        return;
    }
    // 设置监控作业运行时状态, 才记录运行状态
    for (int each : shardingContexts.getShardingItemParameters().keySet()) {
        jobNodeStorage.fillEphemeralJobNode(ShardingNode.getRunningNode(each), "");
    }
}
  • 注册作业完成信息, 思路跟注册作业启动信息相反,就是移除正在执行的标记,如果开启了失效转移了,调用FailoverService.updateFailoverComplete(), 这个在失效转移章节分析
  • 核心process方法, 但分片则单线程执行,多个分片则多线程执行
// AbstractElasticJobExecutor
private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
    Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
    // 如果是单个分片则直接执行
    if (1 == items.size()) {
        int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
        JobExecutionEvent jobExecutionEvent =  new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, item);
        //执行单个任务
        process(shardingContexts, item, jobExecutionEvent);
        return;
    }
    // 多个分片,则多线程执行
    final CountDownLatch latch = new CountDownLatch(items.size());
    for (final int each : items) {
        final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each);
        if (executorService.isShutdown()) {
            return;
        }
        executorService.submit(new Runnable() {
            
            @Override
            public void run() {
                try {
                    //执行单个任务
                    process(shardingContexts, each, jobExecutionEvent);
                } finally {
                    latch.countDown();
                }
            }
        });
    }
    try {
        latch.await();
    } catch (final InterruptedException ex) {
        Thread.currentThread().interrupt();
    }
}
// AbstractElasticJobExecutor
private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
    // 发布单个事件开始
    if (shardingContexts.isAllowSendJobEvent()) {
        jobFacade.postJobExecutionEvent(startEvent);
    }
    log.trace("Job '{}' executing, item is: '{}'.", jobName, item);
    JobExecutionEvent completeEvent;
    try {
        // 子类实现,抽象方法
        process(new ShardingContext(shardingContexts, item));
        completeEvent = startEvent.executionSuccess();
        log.trace("Job '{}' executed, item is: '{}'.", jobName, item);
        if (shardingContexts.isAllowSendJobEvent()) {
            jobFacade.postJobExecutionEvent(completeEvent);
        }
        // CHECKSTYLE:OFF
    } catch (final Throwable cause) {
        //发布单个事件结束
        completeEvent = startEvent.executionFailure(cause);
        jobFacade.postJobExecutionEvent(completeEvent);
        itemErrorMessages.put(item, ExceptionUtil.transform(cause));
        jobExceptionHandler.handleException(jobName, cause);
    }
}
// 拿SimpleJobExecutor举例
protected void process(final ShardingContext shardingContext) {
    // 调用自己实现的job类 
    simpleJob.execute(shardingContext);
}
执行错过转移任务
  • 1.10执行错过任务executor方法, 如果1.8判断需要错过转移
  • 标记错过执行
// JobScheduler.java
private Scheduler createScheduler() {
   Scheduler result;
   // 省略部分代码
   result.getListenerManager().addTriggerListener(schedulerFacade.newJobTriggerListener());
   return result;
}

private Properties getBaseQuartzProperties() {
   // 省略部分代码
   // 设置最大允许超过 1 毫秒,作业分片项即被视为错过执行
   result.put("org.quartz.jobStore.misfireThreshold", "1");
   return result;
}

// JobScheduleController.class
private CronTrigger createTrigger(final String cron) {
   return TriggerBuilder.newTrigger()
           .withIdentity(triggerIdentity)
           .withSchedule(CronScheduleBuilder.cronSchedule(cron)
            // quartz遇到错过执行时不进行处理交给elastic-job
           .withMisfireHandlingInstructionDoNothing())
           .build();
}
 
// TriggerListener 监听处理
// JobTriggerListener.java
public final class JobTriggerListener extends TriggerListenerSupport {
    
    @Override
    public void triggerMisfired(final Trigger trigger) {
        if (null != trigger.getPreviousFireTime()) {
            executionService.setMisfire(shardingService.getLocalShardingItems());
        }
    }
}

// ExecutionService设置错过执行标记
public void setMisfire(final Collection<Integer> items) {
   for (int each : items) {
       jobNodeStorage.createJobNodeIfNeeded(ShardingNode.getMisfireNode(each));
   }
}
  • 执行,判断是用内存数据判断,更新数据是zk异步更新,由于网络原因可能,内存数据没更新,是防御编程,保证内存缓存的数据已经更新,这时候业务方需要保证幂等,因为可能重复调用。
public final void execute() {
   // .... 省略部分代码
   // 判断是否执行错误执行任务, 从时序图1.7开始
   while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
       // 清除错过执行标识
       jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
       // 执行错过任务,跟前面执行正常任务逻辑一样
       execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
   }
}
执行失效转移1.11
  • 失效转移章节分析
作业执行后方法1.12
  • 监听器章节分析, 源码
// LiteJobFacade
public void afterJobExecuted(final ShardingContexts shardingContexts) {
    for (ElasticJobListener each : elasticJobListeners) {
        each.afterJobExecuted(shardingContexts);
    }
}

注册中心

JobNodeStorage

  • JobNodeStorage: 作业节点数据访问类
  • zk注册中心类图, RegistryCenter接口定义了简单的增删改查注册数据和查询时间的接口,CoordinatorRegistryCenter协调分布式服务注册中心,体用持久节点、临时节点、持久顺序节点、临时顺序节点等,ZookeeperRegistryCenter是zk实现的注册中心


    image.png
  • JobeNodeStorage依赖RegistryCenter,各个service是依赖JobeNodeStorage而不直接依赖RegistryCenter


    JobeNodeStorage.png
  • ZookeeperRegistryCenter主要是用curator框架操作zk,有兴趣可以深入一下
  • 几个比较有意思的方法举例说明下:
获取注册中心当前时间
  • 更新一下数据,然后根据更新数据返回的更新时间,获得注册中心时间。
//ZookeeperRegistryCenter
public long getRegistryCenterTime(final String key) {
   long result = 0L;
   try {
       persist(key, "");
       result = client.checkExists().forPath(key).getMtime();
   } catch (final Exception ex) {
       RegExceptionHandler.handleException(ex);
   }
   Preconditions.checkState(0L != result, "Cannot get registry center time.");
   return result;
}

在主节点执行操作,分布式锁

  • Curator用zk实现了两种分布式锁,LeaderLatch为其中一种,start后await等待拿到锁,多线程调用时其他线程会等待,当前线程释放锁时,其他线程会获取锁,try with resource结束后会调用close方法,LeaderLatch 实现了closable接口,具体使用在选主章节介绍。还有一种分布式锁有兴趣的可以看看
// JobeNodeStorage 分布式锁,并提供回调机制
public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
        // zk节点创建LeaderLatch getClinet依赖ZookeeperRegistryCenter
    try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
        latch.start();
        latch.await();
        callback.execute();
    } catch (final Exception ex) {
        handleException(ex);
    }
}

在事务中执行操作

  • 开启事务,并执行TransactionExecutionCallback 回调
// JobeNodeStorage
public void executeInTransaction(final TransactionExecutionCallback callback) {
    try {
        CuratorTransactionFinal curatorTransactionFinal = getClient().inTransaction().check().forPath("/").and();
        callback.execute(curatorTransactionFinal);
        curatorTransactionFinal.commit();
    } catch (final Exception ex) {
        RegExceptionHandler.handleException(ex);
    }
}

监听器

  • ListenerManager管理多个监听器


    ListenerManager.png
  • field中各个manage就是实现各个监听功能

注册中心监听器

  • ListenerManager中RegistryCenterConnectionStateListener,实现 Curator ConnectionStateListener 接口,监听注册中心连接状态


    RegistryCenterConnectionStateListener.png
  • 监听状态变更
public void stateChanged(final CuratorFramework client, final ConnectionState newState) {
    if (JobRegistry.getInstance().isShutdown(jobName)) {
        return;
    }
    JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
    // 连接中断 或者连接丢失
    if (ConnectionState.SUSPENDED == newState || ConnectionState.LOST == newState) {
        // 则暂停job
        jobScheduleController.pauseJob();
    } else if (ConnectionState.RECONNECTED == newState) {
        // 持久化作业服务器上线信息
        serverService.persistOnline(serverService.isEnableServer(JobRegistry.getInstance().getJobInstance(jobName).getIp()));
        // 持久化作业运行实例上线相关信息
        instanceService.persistOnline();
        // 清除本地分配的作业分片项运行中的标记
        executionService.clearRunningInfo(shardingService.getLocalShardingItems());
        // 恢复作业调度
        jobScheduleController.resumeJob();
    }
}

// JobScheduleController
public synchronized void pauseJob() {
    try {
        if (!scheduler.isShutdown()) {
            scheduler.pauseAll();
        }
    } catch (final SchedulerException ex) {
        throw new JobSystemException(ex);
    }
}

public synchronized void resumeJob() {
    try {
        if (!scheduler.isShutdown()) {
            scheduler.resumeAll();
        }
    } catch (final SchedulerException ex) {
        throw new JobSystemException(ex);
    }
}

分片监听

  • ListenerManager中ShardingListenerManager是分片监听管理
  • 在作业初始化时会开启所有监听器,
// SchedulerFacade.java
public void registerStartUpInfo(final boolean enabled) {
   // 开启所有监听器
   listenerManager.startAllListeners();
   // 省略部分方法
}
// SchedulerFacade.java
public void registerStartUpInfo(final boolean enabled) {
   // 开启 所有监听器
   listenerManager.startAllListeners();
   // 省略部分方法
}
// 开启 所有监听器
public void startAllListeners() {
    electionListenerManager.start();
    shardingListenerManager.start();
    failoverListenerManager.start();
    monitorExecutionListenerManager.start();
    shutdownListenerManager.start();
    triggerListenerManager.start();
    rescheduleListenerManager.start();
    guaranteeListenerManager.start();

  • 调父类的方法AbstractListenerManager中对应方法


    AbstractListenerManager.png
// AbstractListenerManager
//开启监听器
public abstract void start();
// 添加注册中心监听器到注册中心TreeCache
protected void addDataListener(final TreeCacheListener listener) {
    jobNodeStorage.addDataListener(listener);
    jobNodeStorage.addConnectionStateListener(regCenterConnectionStateListener);
}
  • ShardingListenerManager中start方法
public void start() {
    addDataListener(new ShardingTotalCountChangedJobListener());
    addDataListener(new ListenServersChangedJobListener());
}
  • AbstractJobListener作业注册中心的监听器抽象类实现了curator中TreeCacheListener
@Override
public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
    // 忽略掉非数据变化的事件
    ChildData childData = event.getData();
    if (null == childData) {
        return;
    }
    String path = childData.getPath();
    if (path.isEmpty()) {
        return;
    }
    dataChanged(path, event.getType(), null == childData.getData() ? "" : new String(childData.getData(), Charsets.UTF_8));
}
// 节点变化交给子类实现
protected abstract void dataChanged(final String path, final Type eventType, final String data);
  • 拿ShardingListenerManager举例,监听到变化时设置重新分片标识
// ShardingListenerManager
class ListenServersChangedJobListener extends AbstractJobListener {
    
    @Override
    protected void dataChanged(final String path, final Type eventType, final String data) {
        if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) {
            shardingService.setReshardingFlag();
        }
    }
    
    private boolean isInstanceChange(final Type eventType, final String path) {
        return instanceNode.isInstancePath(path) && Type.NODE_UPDATED != eventType;
    }
    
    private boolean isServerChange(final String path) {
        return serverNode.isServerPath(path);
    }
}

作业监听器

  • ElasticJobListener作业监听器
public interface ElasticJobListener {
    // 作业执行前的执行的方法.
    void beforeJobExecuted(final ShardingContexts shardingContexts);
    // 作业执行后的执行的方法.
    void afterJobExecuted(final ShardingContexts shardingContexts);
}
  • 在没传执行任务时会调用
// AbstractElasticJobExecutor.java
public final void execute() {
   // 省略代码
   
   // 执行 作业执行前的方法
   try {
       jobFacade.beforeJobExecuted(shardingContexts);
   } catch (final Throwable cause) {
       jobExceptionHandler.handleException(jobName, cause);
   }
   // 省略代码
   // 执行 作业执行后的方法
   try {
       jobFacade.afterJobExecuted(shardingContexts);
   } catch (final Throwable cause) {
       jobExceptionHandler.handleException(jobName, cause);
   }
}
  • 自定义实现可以在配置中设置
@Bean(initMethod = "init")
public JobScheduler simpleJobScheduler(// 省略) {

    return new SpringJobScheduler(simpleJob,
            registryCenter,
            getLiteJobConfiguration(simpleJob.getClass(),
                    cron,
                    shardingTotalCount,
                    shardingItemParameters,
                    jobParameter,
                    failover,
                    monitorExecution,
                    monitorPort,
                    maxTimeDiffSeconds,
                    jobShardingStrategyClass),
            jobEventConfiguration,
            // 自定义作业监听器
            new SimpleJobListener());

}
public class SimpleJobListener implements ElasticJobListener {
    @Override
    public void beforeJobExecuted(ShardingContexts shardingContexts) {
        System.out.println("任务开始了");
    }

    @Override
    public void afterJobExecuted(ShardingContexts shardingContexts) {
        System.out.println("任务结束了");
    }
}

参考文章

  1. 脑裂是什么?Zookeeper是如何解决的?
  2. Kafka研究系列之kafka 如何避免脑裂?如何选举leader
  3. 如何防止ElasticSearch集群出现脑裂现象
  4. elastic-job调度模型
  5. 芋道源码-elastic-job
  6. Quartz原理解密
  7. 分布式定时任务调度系统技术选型
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容