执行流程
-
任务创建
- 通过Controller接口创建延迟任务
- 根据延迟时间判断处理方式:
- ≤5分钟:直接加入Redisson队列
- 5分钟:保存到数据库
-
任务预加载
- XXL-Job定时扫描数据库中的任务
- 将即将到期的任务加载到Redisson队列
- 删除已加载的数据库任务记录
-
任务执行
- DelayQueueManager监听Redisson队列
- 到期任务交由LazyJobHandler执行
- 执行失败进入重试流程
- 超过重试次数进入死信队列
流程图

diagram-2438133959841033919(1).png
核心组件
DelayQueueManager
- 负责管理Redisson延迟队列
- 处理5分钟内的短延迟任务直接投放到队列
- 异步监听队列并执行到期任务
- 处理任务重试和失败转移到死信队列
@Service
@Slf4j
public class DelayQueueManager {
@Resource
private RedissonClient redissonClient;
@Resource
private LazyJobHandler lazyJobHandler;
@Resource
@Lazy
private LazyJobDeadLetterService lazyJobDeadLetterService;
@Resource
@Lazy
private LazyJobService lazyJobService;
private static final long FIVE_MINUTES = 5 * 60 * 1000;
private static SimpleAsyncTaskExecutor taskExecutor;
private volatile boolean isRunning = true;
private RDelayedQueue<DelayedMessage> delayedQueue;
static {
taskExecutor = new SimpleAsyncTaskExecutor("Delay Queue");
taskExecutor.setConcurrencyLimit(20);
}
@PostConstruct
public void init() {
RBlockingDeque<DelayedMessage> blockingDeque = redissonClient.getBlockingDeque("delay_queue");
this.delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
taskExecutor.submit(this::processDelayedMessages);
}
public void addTask(String worker, String content, LocalDateTime executeTime) {
long delayTime = ChronoUnit.MILLIS.between(LocalDateTime.now(), executeTime);
if (delayTime < 0) {
delayTime = 0;
}
DelayedMessage message = new DelayedMessage()
.setType(worker)
.setBody(content)
.setDelayTime(delayTime);
// 5分钟内的任务直接放入Redisson队列
delayedQueue.offer(message, delayTime, TimeUnit.MILLISECONDS);
log.info("短延迟任务已加入Redisson队列, worker: {}, delay: {}ms", worker, delayTime);
}
private void processDelayedMessages() {
RBlockingDeque<DelayedMessage> blockingDeque = redissonClient.getBlockingDeque("delay_queue");
while (isRunning) {
try {
List<DelayedMessage> messages = new ArrayList<>();
DelayedMessage message = blockingDeque.poll(30, TimeUnit.SECONDS);
if (message != null) {
messages.add(message);
blockingDeque.drainTo(messages, 99);
}
for (DelayedMessage msg : messages) {
try {
lazyJobHandler.execute(msg.getType(), msg.getBody());
log.info("执行延迟任务成功: {}", msg.getType());
} catch (Exception e) {
log.error("延迟消息处理错误: {}", e.getMessage());
// 获取当前重试级别
LazyJobLazyLevel currentLevel = msg.getRetryLevel() != null ?
LazyJobLazyLevel.fromCode(msg.getRetryLevel()) : LazyJobLazyLevel.MINUTE10;
if (currentLevel == null) {
log.error("未找到当前延迟级别: {}", msg.getRetryLevel());
continue;
}
Integer nextLevelCode = currentLevel.getNextLevel();
if (nextLevelCode == -1) {
// 已达到最大重试次数,加入死信队列
lazyJobDeadLetterService.add(msg.getType(), msg.getBody(), LocalDateTime.now());
log.info("任务已达到最大重试次数,加入死信队列: {}", msg.getType());
} else {
// 创建下一级延迟任务
LazyJobLazyLevel nextLevel = LazyJobLazyLevel.fromCode(nextLevelCode);
if (nextLevel != null) {
LocalDateTime nextExecuteTime = LocalDateTime.now().plus(nextLevel.getCode(), ChronoUnit.MINUTES);
lazyJobService.add(msg.getType(), msg.getBody(), nextExecuteTime, nextLevel);
log.info("创建下一级延迟任务成功: {}, 下次执行时间: {}", msg.getType(), nextExecuteTime);
}
}
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("处理延迟消息时发生错误: {}", e.getMessage());
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
break;
}
}
}
}
public void shutdown() {
isRunning = false;
}
}
LazyJobHandlerContent
- 系统启动时扫描所有@LazyJob注解的方法
- 缓存任务处理方法,提供任务名到处理方法的映射
- 支持运行时动态更新处理方法缓存
@Slf4j
@Component
public class LazyJobHandlerContent implements CommandLineRunner {
@Resource
private ApplicationContext applicationContext;
private final Map<String, Method> annotatedMethods = new HashMap<>();
@Override
public void run(String... args) {
log.info("应用启动扫描延迟任务方法");
scanAndCacheAnnotatedMethods();
}
private void scanAndCacheAnnotatedMethods() {
log.info("开始扫描异步延迟任务方法");
String[] beanNames = applicationContext.getBeanDefinitionNames();
for (String beanName : beanNames) {
Object bean = applicationContext.getBean(beanName);
Class<?> clazz = bean.getClass();
for (Method method : clazz.getDeclaredMethods()) {
LazyJob annotation = AnnotationUtils.findAnnotation(method, LazyJob.class);
if (annotation != null) {
annotatedMethods.put(annotation.value(), method);
}
}
}
log.info("完成扫描异步延迟任务方法,共找到 {} 个方法", annotatedMethods.size());
}
public Method getByName(String name) {
Method method = annotatedMethods.get(name);
if (Objects.isNull(method)) {
scanAndCacheAnnotatedMethods();
method = annotatedMethods.get(name);
}
return method;
}
}
LazyJobHandler
- 执行具体的延迟任务
- 通过反射调用对应的处理方法
- 处理任务参数的JSON反序列化
@Slf4j
@Component
@RequiredArgsConstructor
public class LazyJobHandler {
private final ApplicationContext applicationContext;
private final LazyJobHandlerContent lazyJobHandlerContent;
public void execute(String worker, String content) throws Exception {
Method method = lazyJobHandlerContent.getByName(worker);
if (method == null) {
throw new IllegalArgumentException("未找到延迟任务处理方法: " + worker);
}
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length != 1) {
throw new IllegalArgumentException("延迟任务处理方法必须只有一个参数");
}
Object arg = JSONUtil.toBean(content, parameterTypes[0]);
Object targetBean = applicationContext.getBean(method.getDeclaringClass());
method.invoke(targetBean, arg);
}
}
接口测试
使用LazyJobController提供的接口进行测试:
POST /api/lazy-job/notification
Content-Type: application/json
{
"message": "测试通知",
"userId": "123"
}