异步下载超大Excel文件实现逻辑

新建位图图像.jpg

1. 核心组件介绍

1.1 AsyncDownloadJobHandlerContent

AsyncDownloadJobHandlerContent.java 是一个用于管理异步下载任务处理器的组件,主要功能包括:

  • 实现了 CommandLineRunner 接口,在应用启动时自动扫描并缓存带有 @AsyncDownloadJobWorker 注解的方法
  • 使用 handlerMap 存储任务处理器名称和对应的处理方法
  • 提供 getByName 方法用于根据名称获取对应的处理方法

@Slf4j
@Component
public class AsyncDownloadJobHandlerContent implements CommandLineRunner {

    private final Map<String, Method> handlerMap = new HashMap<>();

    @Resource
    private ApplicationContext applicationContext;

    public Method getByName(String name) {
        Method method = handlerMap.get(name);
        if (method == null) {
            throw new RuntimeException("未找到处理器:" + name);
        }
        return method;
    }

    @Override
    public void run(String... args) throws Exception {
        log.info("应用启动扫描异步下载任务方法");
        scanAndCacheAnnotatedMethods();
    }

    private void scanAndCacheAnnotatedMethods() {
        log.info("开始扫描异步下载任务方法");
        String[] beanNames = applicationContext.getBeanDefinitionNames();
        for (String beanName : beanNames) {
            Object bean = applicationContext.getBean(beanName);
            Class<?> clazz = bean.getClass();
            for (Method method : clazz.getDeclaredMethods()) {
                AsyncDownloadJobWorker annotation = AnnotationUtils.findAnnotation(method, AsyncDownloadJobWorker.class);
                if (annotation != null) {
                    String name = annotation.value();
                    log.info("添加异步下载任务:{}", name);
                    handlerMap.put(name, method);
                }
            }
        }
        log.info("完成扫描异步下载任务方法,共找到 {} 个方法", handlerMap.size());
    }
}

1.2 AsyncDownloadJobHandler

AsyncDownloadJobHandler.java 是异步下载任务的核心处理器,主要功能包括:

  • 使用 @KafkaListener 监听异步下载任务消息
  • 任务处理流程:
    1. 获取任务信息
    2. 创建临时文件
    3. 使用 Redis 加锁防止重复处理
    4. 更新任务状态为处理中
    5. 执行具体的下载任务
    6. 将生成的文件保存到存储目录
    7. 更新任务状态为成功或失败
@Slf4j
@Component
@RequiredArgsConstructor
public class AsyncDownloadJobHandler {

    private final AsyncDownloadJobService asyncDownloadJobService;
    private final RedisTemplate<String, String> redisTemplate;
    private final ApplicationContext applicationContext;
    private final AsyncDownloadJobHandlerContent asyncDownloadJobHandlerContent;

    @SneakyThrows
    @KafkaListener(topics = Const.KAFKA_TOPIC_ASYNC_DOWNLOAD_JOB, concurrency = "1")
    public void asyncJobHandler(String params) {
        log.info("异步下载任务开始处理,参数:{}", params);
        AsyncDownloadJobDTO asyncDownloadJob = asyncDownloadJobService.getById(Long.valueOf(params));
        if (asyncDownloadJob == null) {
            log.error("异步下载任务不存在,任务ID:{}", params);
            return;
        }

        Path tempFile = Files.createFile(Paths.get(asyncDownloadJob.getFileTitle()));
        File file = tempFile.toFile();
        String lockKey = Const.ASYNC_LOCK_KEY + asyncDownloadJob.getId();

        Boolean addLock = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 60, TimeUnit.MINUTES);

        if (addLock) {
            try {
                boolean downloadGenerating = asyncDownloadJobService.setDownloadGenerating(asyncDownloadJob.getId());
                if (!downloadGenerating) {
                    log.warn("设置任务状态为处理中失败,任务ID:{}", asyncDownloadJob.getId());
                    return;
                }

                execute(asyncDownloadJob.getWorker(), asyncDownloadJob.getRequestContent(), file);


                //根据实际情况上传至对应的存储服务,以下为示例代码,请根据实际情况修改
                // 创建本地存储目录
                Path storageDir = Paths.get("storage", "downloads");
                Files.createDirectories(storageDir);
                
                // 生成唯一的文件名
                String fileName = System.currentTimeMillis() + "_" + asyncDownloadJob.getFileTitle();
                Path targetPath = storageDir.resolve(fileName);
                
                // 将临时文件移动到存储目录
                Files.move(file.toPath(), targetPath);
                
                // 更新任务状态为成功,并保存文件路径
                asyncDownloadJobService.setDownloadSuccess(asyncDownloadJob.getId(), targetPath.toString());
                


            } catch (Exception e) {
                log.error("异步下载任务失败:{};{}", e, e.getMessage());
                asyncDownloadJobService.setDownloadFail(asyncDownloadJob.getId());
            } finally {
                if (Objects.nonNull(file)) {
                    Files.deleteIfExists(file.toPath());
                }
                redisTemplate.delete(lockKey);
            }
        } else {
            log.info("异步任务加锁失败:{}", params);
        }
    }

    @SneakyThrows
    private void execute(String worker, String content, File file) {
        Method method = asyncDownloadJobHandlerContent.getByName(worker);
        Class<?>[] parameterTypes = method.getParameterTypes();
        Object[] args = new Object[parameterTypes.length];
        boolean hasOutputStream = false;

        for (int i = 0; i < parameterTypes.length; i++) {
            if (File.class.isAssignableFrom(parameterTypes[i])) {
                args[i] = file;
                hasOutputStream = true;
            } else {
                ObjectMapper objectMapper = new ObjectMapper();
                args[i] = objectMapper.readValue(content, parameterTypes[i]);
            }
        }

        if (!hasOutputStream) {
            throw new RuntimeException("异步下载任务方法参数中没有File类型");
        }

        Object targetBean = applicationContext.getBean(method.getDeclaringClass());
        method.invoke(targetBean, args);
    }
}

2. 测试用例说明

AsyncDownloadTest.java 提供了两个测试用例:

3. 关键流程

  1. 客户端发起异步下载请求
  2. 服务端创建任务并返回任务ID
  3. 发送任务消息到Kafka
  4. AsyncDownloadJobHandler监听并处理任务:
    • Redis加锁防止重复处理
    • 创建临时文件
    • 调用对应的Worker处理下载请求
    • 将文件保存到存储目录
    • 更新任务状态
  5. 客户端轮询任务状态
  6. 任务完成后下载文件

git地址:async-download-tutorial: 超大文件异步下载设计

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容