近实时延迟任务系统执行流程

执行流程

  1. 任务创建

    • 通过Controller接口创建延迟任务
    • 根据延迟时间判断处理方式:
      • ≤5分钟:直接加入Redisson队列
      • 5分钟:保存到数据库
  2. 任务预加载

    • XXL-Job定时扫描数据库中的任务
    • 将即将到期的任务加载到Redisson队列
    • 删除已加载的数据库任务记录
  3. 任务执行

    • DelayQueueManager监听Redisson队列
    • 到期任务交由LazyJobHandler执行
    • 执行失败进入重试流程
    • 超过重试次数进入死信队列

流程图

diagram-2438133959841033919(1).png

核心组件

DelayQueueManager

  • 负责管理Redisson延迟队列
  • 处理5分钟内的短延迟任务直接投放到队列
  • 异步监听队列并执行到期任务
  • 处理任务重试和失败转移到死信队列
@Service
@Slf4j
public class DelayQueueManager {
    @Resource
    private RedissonClient redissonClient;

    @Resource
    private LazyJobHandler lazyJobHandler;

    @Resource
    @Lazy
    private LazyJobDeadLetterService lazyJobDeadLetterService;

    @Resource
    @Lazy
    private LazyJobService lazyJobService;

    private static final long FIVE_MINUTES = 5 * 60 * 1000;

    private static SimpleAsyncTaskExecutor taskExecutor;

    private volatile boolean isRunning = true;

    private RDelayedQueue<DelayedMessage> delayedQueue;

    static {
        taskExecutor = new SimpleAsyncTaskExecutor("Delay Queue");
        taskExecutor.setConcurrencyLimit(20);
    }

    @PostConstruct
    public void init() {
        RBlockingDeque<DelayedMessage> blockingDeque = redissonClient.getBlockingDeque("delay_queue");
        this.delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        taskExecutor.submit(this::processDelayedMessages);
    }

    public void addTask(String worker, String content, LocalDateTime executeTime) {
        long delayTime = ChronoUnit.MILLIS.between(LocalDateTime.now(), executeTime);
        if (delayTime < 0) {
            delayTime = 0;
        }
        DelayedMessage message = new DelayedMessage()
                .setType(worker)
                .setBody(content)
                .setDelayTime(delayTime);

         // 5分钟内的任务直接放入Redisson队列
         delayedQueue.offer(message, delayTime, TimeUnit.MILLISECONDS);
         log.info("短延迟任务已加入Redisson队列, worker: {}, delay: {}ms", worker, delayTime);

    }

    private void processDelayedMessages() {
        RBlockingDeque<DelayedMessage> blockingDeque = redissonClient.getBlockingDeque("delay_queue");
        while (isRunning) {
            try {
                List<DelayedMessage> messages = new ArrayList<>();
                DelayedMessage message = blockingDeque.poll(30, TimeUnit.SECONDS);
                if (message != null) {
                    messages.add(message);
                    blockingDeque.drainTo(messages, 99);
                }

                for (DelayedMessage msg : messages) {
                    try {
                        lazyJobHandler.execute(msg.getType(), msg.getBody());
                        log.info("执行延迟任务成功: {}", msg.getType());
                    } catch (Exception e) {
                        log.error("延迟消息处理错误: {}", e.getMessage());
                        
                        // 获取当前重试级别
                        LazyJobLazyLevel currentLevel = msg.getRetryLevel() != null ? 
                            LazyJobLazyLevel.fromCode(msg.getRetryLevel()) : LazyJobLazyLevel.MINUTE10;
                        
                        if (currentLevel == null) {
                            log.error("未找到当前延迟级别: {}", msg.getRetryLevel());
                            continue;
                        }

                        Integer nextLevelCode = currentLevel.getNextLevel();
                        if (nextLevelCode == -1) {
                            // 已达到最大重试次数,加入死信队列
                            lazyJobDeadLetterService.add(msg.getType(), msg.getBody(), LocalDateTime.now());
                            log.info("任务已达到最大重试次数,加入死信队列: {}", msg.getType());
                        } else {
                            // 创建下一级延迟任务
                            LazyJobLazyLevel nextLevel = LazyJobLazyLevel.fromCode(nextLevelCode);
                            if (nextLevel != null) {
                                LocalDateTime nextExecuteTime = LocalDateTime.now().plus(nextLevel.getCode(), ChronoUnit.MINUTES);
                                lazyJobService.add(msg.getType(), msg.getBody(), nextExecuteTime, nextLevel);
                                log.info("创建下一级延迟任务成功: {}, 下次执行时间: {}", msg.getType(), nextExecuteTime);
                            }
                        }
                    }
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            } catch (Exception e) {
                log.error("处理延迟消息时发生错误: {}", e.getMessage());
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
    }

    public void shutdown() {
        isRunning = false;
    }
}

LazyJobHandlerContent

  • 系统启动时扫描所有@LazyJob注解的方法
  • 缓存任务处理方法,提供任务名到处理方法的映射
  • 支持运行时动态更新处理方法缓存
@Slf4j
@Component
public class LazyJobHandlerContent implements CommandLineRunner {

    @Resource
    private ApplicationContext applicationContext;
    private final Map<String, Method> annotatedMethods = new HashMap<>();

    @Override
    public void run(String... args) {
        log.info("应用启动扫描延迟任务方法");
        scanAndCacheAnnotatedMethods();
    }

    private void scanAndCacheAnnotatedMethods() {
        log.info("开始扫描异步延迟任务方法");
        String[] beanNames = applicationContext.getBeanDefinitionNames();
        for (String beanName : beanNames) {
            Object bean = applicationContext.getBean(beanName);
            Class<?> clazz = bean.getClass();
            for (Method method : clazz.getDeclaredMethods()) {
                LazyJob annotation = AnnotationUtils.findAnnotation(method, LazyJob.class);
                if (annotation != null) {
                    annotatedMethods.put(annotation.value(), method);
                }
            }
        }
        log.info("完成扫描异步延迟任务方法,共找到 {} 个方法", annotatedMethods.size());
    }

    public Method getByName(String name) {
        Method method = annotatedMethods.get(name);
        if (Objects.isNull(method)) {
            scanAndCacheAnnotatedMethods();
            method = annotatedMethods.get(name);
        }
        return method;
    }
}

LazyJobHandler

  • 执行具体的延迟任务
  • 通过反射调用对应的处理方法
  • 处理任务参数的JSON反序列化
@Slf4j
@Component
@RequiredArgsConstructor
public class LazyJobHandler {

    private final ApplicationContext applicationContext;
    private final LazyJobHandlerContent lazyJobHandlerContent;

    public void execute(String worker, String content) throws Exception {
        Method method = lazyJobHandlerContent.getByName(worker);
        if (method == null) {
            throw new IllegalArgumentException("未找到延迟任务处理方法: " + worker);
        }

        Class<?>[] parameterTypes = method.getParameterTypes();
        if (parameterTypes.length != 1) {
            throw new IllegalArgumentException("延迟任务处理方法必须只有一个参数");
        }

        Object arg = JSONUtil.toBean(content, parameterTypes[0]);
        Object targetBean = applicationContext.getBean(method.getDeclaringClass());
        method.invoke(targetBean, arg);
    }
}

接口测试

使用LazyJobController提供的接口进行测试:

POST /api/lazy-job/notification
Content-Type: application/json

{
    "message": "测试通知",
    "userId": "123"
}

项目地址:delay-task-tutorial: 延时任务冷热数据处理

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容