新建位图图像.jpg
1. 核心组件介绍
1.1 AsyncDownloadJobHandlerContent
AsyncDownloadJobHandlerContent.java
是一个用于管理异步下载任务处理器的组件,主要功能包括:
- 实现了 CommandLineRunner 接口,在应用启动时自动扫描并缓存带有 @AsyncDownloadJobWorker 注解的方法
- 使用 handlerMap 存储任务处理器名称和对应的处理方法
- 提供 getByName 方法用于根据名称获取对应的处理方法
@Slf4j
@Component
public class AsyncDownloadJobHandlerContent implements CommandLineRunner {
private final Map<String, Method> handlerMap = new HashMap<>();
@Resource
private ApplicationContext applicationContext;
public Method getByName(String name) {
Method method = handlerMap.get(name);
if (method == null) {
throw new RuntimeException("未找到处理器:" + name);
}
return method;
}
@Override
public void run(String... args) throws Exception {
log.info("应用启动扫描异步下载任务方法");
scanAndCacheAnnotatedMethods();
}
private void scanAndCacheAnnotatedMethods() {
log.info("开始扫描异步下载任务方法");
String[] beanNames = applicationContext.getBeanDefinitionNames();
for (String beanName : beanNames) {
Object bean = applicationContext.getBean(beanName);
Class<?> clazz = bean.getClass();
for (Method method : clazz.getDeclaredMethods()) {
AsyncDownloadJobWorker annotation = AnnotationUtils.findAnnotation(method, AsyncDownloadJobWorker.class);
if (annotation != null) {
String name = annotation.value();
log.info("添加异步下载任务:{}", name);
handlerMap.put(name, method);
}
}
}
log.info("完成扫描异步下载任务方法,共找到 {} 个方法", handlerMap.size());
}
}
1.2 AsyncDownloadJobHandler
AsyncDownloadJobHandler.java
是异步下载任务的核心处理器,主要功能包括:
- 使用 @KafkaListener 监听异步下载任务消息
- 任务处理流程:
- 获取任务信息
- 创建临时文件
- 使用 Redis 加锁防止重复处理
- 更新任务状态为处理中
- 执行具体的下载任务
- 将生成的文件保存到存储目录
- 更新任务状态为成功或失败
@Slf4j
@Component
@RequiredArgsConstructor
public class AsyncDownloadJobHandler {
private final AsyncDownloadJobService asyncDownloadJobService;
private final RedisTemplate<String, String> redisTemplate;
private final ApplicationContext applicationContext;
private final AsyncDownloadJobHandlerContent asyncDownloadJobHandlerContent;
@SneakyThrows
@KafkaListener(topics = Const.KAFKA_TOPIC_ASYNC_DOWNLOAD_JOB, concurrency = "1")
public void asyncJobHandler(String params) {
log.info("异步下载任务开始处理,参数:{}", params);
AsyncDownloadJobDTO asyncDownloadJob = asyncDownloadJobService.getById(Long.valueOf(params));
if (asyncDownloadJob == null) {
log.error("异步下载任务不存在,任务ID:{}", params);
return;
}
Path tempFile = Files.createFile(Paths.get(asyncDownloadJob.getFileTitle()));
File file = tempFile.toFile();
String lockKey = Const.ASYNC_LOCK_KEY + asyncDownloadJob.getId();
Boolean addLock = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 60, TimeUnit.MINUTES);
if (addLock) {
try {
boolean downloadGenerating = asyncDownloadJobService.setDownloadGenerating(asyncDownloadJob.getId());
if (!downloadGenerating) {
log.warn("设置任务状态为处理中失败,任务ID:{}", asyncDownloadJob.getId());
return;
}
execute(asyncDownloadJob.getWorker(), asyncDownloadJob.getRequestContent(), file);
//根据实际情况上传至对应的存储服务,以下为示例代码,请根据实际情况修改
// 创建本地存储目录
Path storageDir = Paths.get("storage", "downloads");
Files.createDirectories(storageDir);
// 生成唯一的文件名
String fileName = System.currentTimeMillis() + "_" + asyncDownloadJob.getFileTitle();
Path targetPath = storageDir.resolve(fileName);
// 将临时文件移动到存储目录
Files.move(file.toPath(), targetPath);
// 更新任务状态为成功,并保存文件路径
asyncDownloadJobService.setDownloadSuccess(asyncDownloadJob.getId(), targetPath.toString());
} catch (Exception e) {
log.error("异步下载任务失败:{};{}", e, e.getMessage());
asyncDownloadJobService.setDownloadFail(asyncDownloadJob.getId());
} finally {
if (Objects.nonNull(file)) {
Files.deleteIfExists(file.toPath());
}
redisTemplate.delete(lockKey);
}
} else {
log.info("异步任务加锁失败:{}", params);
}
}
@SneakyThrows
private void execute(String worker, String content, File file) {
Method method = asyncDownloadJobHandlerContent.getByName(worker);
Class<?>[] parameterTypes = method.getParameterTypes();
Object[] args = new Object[parameterTypes.length];
boolean hasOutputStream = false;
for (int i = 0; i < parameterTypes.length; i++) {
if (File.class.isAssignableFrom(parameterTypes[i])) {
args[i] = file;
hasOutputStream = true;
} else {
ObjectMapper objectMapper = new ObjectMapper();
args[i] = objectMapper.readValue(content, parameterTypes[i]);
}
}
if (!hasOutputStream) {
throw new RuntimeException("异步下载任务方法参数中没有File类型");
}
Object targetBean = applicationContext.getBean(method.getDeclaringClass());
method.invoke(targetBean, args);
}
}
2. 测试用例说明
AsyncDownloadTest.java
提供了两个测试用例:
3. 关键流程
- 客户端发起异步下载请求
- 服务端创建任务并返回任务ID
- 发送任务消息到Kafka
- AsyncDownloadJobHandler监听并处理任务:
- Redis加锁防止重复处理
- 创建临时文件
- 调用对应的Worker处理下载请求
- 将文件保存到存储目录
- 更新任务状态
- 客户端轮询任务状态
- 任务完成后下载文件