1.背景:
Java批量导入百万级数据到mysql
之前写过批量导入百万级数据到mysql,但是这个局限性比较大,遇到需要复杂校验(重复性校验,有效性校验)的场景下,这种很容易就超时,同时一个系统内,肯定会有多个地方需要用到导入导出,每个地方都写一堆类似的代码,同时还得不断优化性能(数据越来越多,需要越来越复杂),这时候一个管理系统所有导入/导出记录的页面就很实用了,可以让各个模块业务专注在业务上,不需要关心上传和下载。
2.设计:
2.1 常规的导入同步流程如下, 这就引发了一个问题:如果 Excel 的行非常多,或者解析非常复杂,那么解析+校验的过程就非常耗时。如果接口是一个同步的接口,则非常容易出现接口超时,进而返回的校验错误信息也无法展示给前端,这就需要从功能上解决这个问题。
2.2 把同步改为异步,同时引入NFS用来临时存储文件
2.3 导入导出日志记录表设计如下:
CREATE TABLE `t_import_export_records` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`module_name` varchar(32) NOT NULL COMMENT '模块名称',
`file_name` varchar(32) NOT NULL COMMENT '文件名称',
`type` tinyint(2) NOT NULL DEFAULT '1' COMMENT '导入:1;导出:2;',
`state` tinyint(2) NOT NULL DEFAULT '0' COMMENT '进行中:0;成功:1;失败:2;已过期:3',
`nfs_path` varchar(200) DEFAULT NULL COMMENT '文件路径',
`error_reason` text COMMENT '异常日志文件路径',
`expire_time` varchar(32) NOT NULL COMMENT '过期时间,小时',
`start_time` datetime DEFAULT NULL COMMENT '开始时间',
`end_time` datetime DEFAULT NULL COMMENT '开始时间',
`creator` varchar(32) DEFAULT NULL COMMENT '创建人员',
`create_tm` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`modifier` varchar(32) DEFAULT NULL COMMENT '修改人员',
`modify_tm` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
KEY `creator` (`creator`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='导入导出日志记录表';
2.4 代码实现
从上面的流程中可以看出,每个业务自己只需要实现自己的 数据校验,持久化到db 这两个个步骤,别的都是通用的,简单点的就是我们写几个工具类,然后让大家去调用,但是这样还是麻烦,不够简便和优雅,我想的是可以把这些完全抽象出来,让大家不用关心这些重复的步骤。
使用注解来完成
2.4.1 定义一个注解,用于标识切点:
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Upload {
//业务模块名称
String moduleName() default "";
//文件过期时间
int expireTime() default 24;
//文件名称
String fileName() default "";
//excel解析类
Class clazz();
//excel 监听器
Class listener();
}
2.4.2 编写切面:
import com.alibaba.excel.EasyExcel;
import com.alibaba.excel.annotation.ExcelProperty;
import com.alibaba.fastjson.JSON;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.demo.framework.domain.Result;
import com.demo.management.util.DateUtil;
import com.demo.management.util.FileUploadUtil;
import com.demo.management.util.ResultUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.time.DateUtils;
import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;
import org.springframework.web.multipart.MultipartFile;
import java.io.*;
import java.lang.reflect.Field;
import java.util.*;
import java.util.concurrent.*;
@Component
@Aspect
@Slf4j
public class UploadAspect {
public static ThreadFactory commonThreadFactory = new ThreadFactoryBuilder().setNameFormat("upload-pool-%d")
.setPriority(Thread.NORM_PRIORITY).build();
public static ExecutorService uploadExecuteService = new ThreadPoolExecutor(1, 20, 300L,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024), commonThreadFactory, new ThreadPoolExecutor.AbortPolicy());
@Pointcut("@annotation(com.demo.management.mallgoods.controller.Upload)")
public void uploadPoint() {}
@Around(value = "uploadPoint()")
public Object uploadControl(ProceedingJoinPoint joinPoint) throws Throwable {
// 获取方法上的注解,进而获取uploadType
MethodSignature signature = (MethodSignature)joinPoint.getSignature();
Upload annotation = signature.getMethod().getAnnotation(Upload.class);
//导入文件
MultipartFile file = (MultipartFile)joinPoint.getArgs()[0];
//1、文件上传
String filePath = uploadFile(file);
// 线程池启动异步线程,开始执行上传的逻辑,joinPoint.proceed()就是你实现的业务功能
uploadExecuteService.submit(() -> {
ImportExportRecords records = new ImportExportRecords();
try {
// 2、初始化导入日志,记录开始时间
records.setModuleName(annotation.moduleName());
records.setExpireTime(DateUtil.format(DateUtils.addHours(new Date(),annotation.expireTime()),DateUtil.DATE_TIME_PATTERN));
records.setType(1);
records.setStartTime(new Date());
// records= writeRecordsToDB(records);
System.out.println("writeInitToDB");
//2.1 子线程读取文件,并解析excel文件
List list= EasyExcel.read(new FileInputStream(new File(filePath)),annotation.clazz(),(AnalysisEventListener)annotation.listener().newInstance()).sheet().doReadSync();
//2.1 下面是通过反序列化构建的excel列表,过于复杂了,上面的是优化版本
// List list= EasyExcel.read(new FileInputStream(new File(filePath))).sheet().doReadSync();
// //反序列化 构建读取excel列表
// Class clazz = annotation.clazz();
// Field[] fields = clazz.getDeclaredFields();
// List<Map<String,Object>> resultList = new ArrayList();
// for(int i=0;i<list.size()-1;i++){
// LinkedHashMap<Integer,Object> map = (LinkedHashMap)list.get(i);
// Map<String,Object> paramMap = new HashMap<>();
// for(Map.Entry<Integer,Object> entry: map.entrySet()){
// Field field = Arrays.stream(fields).filter(item -> item.getAnnotation(ExcelProperty.class).index()==entry.getKey()).findFirst().get();
// paramMap.put(field.getName(),entry.getValue());
// }
// resultList.add(paramMap);
// }
// list = JSON.parseArray(JSON.toJSONString(resultList), annotation.clazz());
//2.2 执行业务方法 数据校验,持久化DB
//传参
Object result = joinPoint.proceed(new Object[]{file,list});
Result errorResult = JSON.parseObject(JSON.toJSONString(result),Result.class);
//2.3 更新导入日志结果
if (errorResult!=null && errorResult.isSuccess()) {
// 成功,
records.setState(1);
} else {
// 失败,
records.setState(2);
records.setErrorReason(errorResult.getErrorMessage());
}
} catch (Throwable e) {
// 异常,需要记录
log.error("error",e);
records.setState(2);
records.setErrorReason(e.getMessage());
}
//2.3 更新导入日志结果
// updateByRecordsId(records);
System.out.println("updateByRecordsId");
});
return ResultUtils.success();
}
文件上传方法
public String uploadFile(MultipartFile file) {
String path = FileUploadUtil.getDefaultSavePath("businessCoupon", file.getOriginalFilename());
String fileSavePath = null;
try {
boolean upload = FileUploadUtil.saveFileUpload(path, FileUploadUtil.multipartFileToFile(file));
if (upload) {
fileSavePath = path;
}
} catch (Exception e) {
log.error("文件上传失败,", e);
}
return fileSavePath;
}
public static boolean saveFileUpload(String savePath, File file) {
try {
if (StringUtils.isEmpty(savePath)) {
log.info("savePath is null");
return false;
}
log.info("save file path : " + savePath);
java.nio.file.Files.copy(file.toPath(), new File(savePath).toPath());
return true;
} catch (IOException e) {
log.error("saveFileUpload error", e);
}
return false;
}
/**
* MultipartFile 转 File
*/
public File multipartFileToFile(MultipartFile file) {
File toFile = null;
try{
if (file == null || StringUtils.isBlank(file.getOriginalFilename()) || file.getSize() <= 0) {
return null;
} else {
InputStream ins;
ins = file.getInputStream();
toFile = new File(file.getOriginalFilename());
inputStreamToFile(ins, toFile);
ins.close();
}
}catch (Exception e){
log.error("multipartFileToFile err", e);
// throw new ManagementRuntimeException(e.getMessage());
}
return toFile;
}
/**
* 获取文件流
*/
private static void inputStreamToFile(InputStream ins, File file) {
try {
OutputStream os = new FileOutputStream(file);
int bytesRead = 0;
byte[] buffer = new byte[8192];
while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {
os.write(buffer, 0, bytesRead);
}
os.close();
ins.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
2.4.3 注解使用:
用户只需要定义好execl解析类,和解析监听器就可以
@Upload(moduleName="aaa",expireTime=3,clazz = MallOrderExcelVO.class,listener = MallOrderExcelListener.class)
public Result<Integer> uploadWaybillNo(MultipartFile file, ArrayList<MallOrderExcelVO> list) {
if (null == file) {
return ResultUtils.error(ResponseResultStatusEnum.ERR_UPLOAD_FILE);
}
try {
//list 数据校验
if(CollectionUtils.isEmpty(list)){
return ResultUtils.error("数据为空");
}
//list 数据持久到db
this.uploadWaybillNo(list);
return ResultUtils.success();
} catch (Exception e) {
return ResultUtils.error(ResponseResultStatusEnum.ERR_UNKNOWN);
}
}
异步导出见下一篇Java注解实现异步导入与导出(二)