OSS文件分片上传
依赖
<!-- https://mvnrepository.com/artifact/com.aliyun.oss/aliyun-sdk-oss -->
<dependency>
<groupId>com.aliyun.oss</groupId>
<artifactId>aliyun-sdk-oss</artifactId>
<version>3.8.1</version>
</dependency>
基础参数dto
/**
* @author WJL
*/
@Data
@Builder
public class OssParamDTO {
private String endpoint;
private String accessKeyId;
private String accessKeySecret;
private String bucketName;
private String folder;
/**
* objectName = folder + fileName
*/
private String objectName;
/**
* 上传线程
*/
private Integer task;
/**
* 每个线程处理大小 分片大小
*/
private Integer number;
}
具体上传方法
小文件上传
public static PutObjectResult uploadFile(OssParamDTO ossParamDTO, InputStream inputStream){
// 创建OSSClient实例。
OSS ossClient = new OSSClientBuilder().build(ossParamDTO.getEndpoint(), ossParamDTO.getAccessKeyId(), ossParamDTO.getAccessKeySecret());
PutObjectResult putObjectResult = null;
// 上传文件流。
try {
putObjectResult = ossClient.putObject(ossParamDTO.getBucketName(), ossParamDTO.getObjectName(), inputStream);
//权限设置
ossClient.setBucketAcl(ossParamDTO.getBucketName(), CannedAccessControlList.PublicRead);
} catch (Exception e) {
e.printStackTrace();
}finally {
// 关闭OSSClient。
ossClient.shutdown();
}
return putObjectResult;
}
大文件上传,分片oss自己处理
处理逻辑:前段轮训查询数据库某个字段,当该字段被回调接口更新时结束轮训,上传完成
public static void uploadBigFile(OssParamDTO ossParamDTO,String path, File file,Long fileId) throws Throwable {
System.out.println("上传时间:"+System.currentTimeMillis());
// Endpoint以杭州为例,其它Region请按实际情况填写。
String endpoint = ossParamDTO.getEndpoint();
// 阿里云主账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM账号进行API访问或日常运维,请登录 https://ram.console.aliyun.com 创建RAM账号。
String accessKeyId = ossParamDTO.getAccessKeyId();
String accessKeySecret = ossParamDTO.getAccessKeySecret();
// 创建OSSClient实例。
OSS ossClient = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);
ObjectMetadata meta = new ObjectMetadata();
// 指定上传的内容类型。
meta.setContentType("text/plain");
// 通过UploadFileRequest设置多个参数。
UploadFileRequest uploadFileRequest = new UploadFileRequest(ossParamDTO.getBucketName(),ossParamDTO.getObjectName());
// 指定上传的本地文件。
uploadFileRequest.setUploadFile(path);
// 指定上传并发线程数,默认为1。
uploadFileRequest.setTaskNum(ossParamDTO.getTask());
// 指定上传的分片大小,范围为100KB~5GB,默认为文件大小/10000。
uploadFileRequest.setPartSize(ossParamDTO.getNumber() * 1024 * 1024);
uploadFileRequest.setObjectMetadata(meta);
// 设置上传成功回调,参数为Callback类型。
Callback callback = new Callback();
callback.setCalbackBodyType(Callback.CalbackBodyType.URL);
//回调参数 --- 同同步到数据库
callback.setCallbackBody("fileId="+fileId+"&fileName=${object}&uploadStatus=1");
//回调接口(自己服务器接口,可供外网访问)
callback.setCallbackUrl("http://3m8wv2.natappfree.cc/web/common/callBack");
uploadFileRequest.setCallback(callback);
// 断点续传上传。
ossClient.uploadFile(uploadFileRequest);
//权限设置
ossClient.setBucketAcl(ossParamDTO.getBucketName(), CannedAccessControlList.PublicRead);
// 关闭OSSClient。
ossClient.shutdown();
}
大文件本地分片,多线程执行分片上传,再合并碎片
分片上传代码
PartETag getUploadPartETag(String objectName, String bucketName, String uploadId,
InputStream instream, Long curPartSize,Integer partNum,
OSS ossClient, CountDownLatch countDownLatch){
long before = System.currentTimeMillis();
UploadPartRequest uploadPartRequest = null;
try {
log.debug("分片文件上传线程: {}",Thread.currentThread().getName());
uploadPartRequest = new UploadPartRequest();
uploadPartRequest.setBucketName(bucketName);
uploadPartRequest.setKey(objectName);
uploadPartRequest.setUploadId(uploadId);
uploadPartRequest.setInputStream(instream);
// 设置分片大小。除了最后一个分片没有大小限制,其他的分片最小为100KB。
uploadPartRequest.setPartSize(curPartSize);
// 设置分片号。每一个上传的分片都有一个分片号,取值范围是1~10000,如果超出这个范围,OSS将返回InvalidArgument的错误码。
uploadPartRequest.setPartNumber(partNum);
// 每个分片不需要按顺序上传,甚至可以在不同客户端上传,OSS会按照分片号排序组成完整的文件。
UploadPartResult uploadPartResult = ossClient.uploadPart(uploadPartRequest);
// 每次上传分片之后,OSS的返回结果会包含一个PartETag。PartETag将被保存到partETags中。
log.debug("getPartETag ::{}" ,uploadPartResult.getPartETag().getETag());
return uploadPartResult.getPartETag();
}finally {
countDownLatch.countDown();
log.debug("线程: {} 执行完毕, 等待线程数 :{}, 消耗时间: {}",
Thread.currentThread().getName(),countDownLatch.getCount(),
((System.currentTimeMillis()-before)/1000)+"s");
}
}
外部分片代码
@Qualifier("taskExecutor")
@Autowired
ThreadPoolTaskExecutor taskExecutor;
/**
* 上传
* @param ossParamDTO
* @param multipartFile
* @return
*/
public CompleteMultipartUploadResult uploadBigFileForProd(OssParamDTO ossParamDTO, MultipartFile multipartFile){
Long before = System.currentTimeMillis();
// Endpoint以杭州为例,其它Region请按实际情况填写。
String endpoint = ossParamDTO.getEndpoint();
// 阿里云主账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM账号进行API访问或日常运维,请登录 https://ram.console.aliyun.com 创建RAM账号。
String accessKeyId = ossParamDTO.getAccessKeyId();
String accessKeySecret = ossParamDTO.getAccessKeySecret();
String bucketName = ossParamDTO.getBucketName();
// <yourObjectName>表示上传文件到OSS时需要指定包含文件后缀在内的完整路径,例如abc/efg/123.jpg。
String objectName = ossParamDTO.getObjectName();
// 创建OSSClient实例。
OSS ossClient = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);
// 创建InitiateMultipartUploadRequest对象。
InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(bucketName, objectName);
// 初始化分片。
InitiateMultipartUploadResult upresult = ossClient.initiateMultipartUpload(request);
// 返回uploadId,它是分片上传事件的唯一标识,您可以根据这个ID来发起相关的操作,如取消分片上传、查询分片上传等。
String uploadId = upresult.getUploadId();
// partETags是PartETag的集合。PartETag由分片的ETag和分片号组成。
List<PartETag> partETags = new ArrayList<>();
// 计算文件有多少个分片 15MB
final long partSize = 2 * 1024 * 1024L;
long fileLength = multipartFile.getSize();
int partCount = (int) (fileLength / partSize);
if (fileLength % partSize != 0) {
partCount++;
}
// 遍历分片上传。
log.info("分片数量 {}",partCount);
List<Future<PartETag>> futureList = Collections.synchronizedList(new ArrayList());
CountDownLatch countDownLatch = new CountDownLatch(partCount);
for (int i = 0; i < partCount; i++) {
long startPos = i * partSize;
long curPartSize = (i + 1 == partCount) ? (fileLength - startPos) : partSize;
InputStream instream = null;
try {
instream = multipartFile.getInputStream();
} catch (IOException e) {
e.printStackTrace();
}
// 跳过已经上传的分片。
try {
instream.skip(startPos);
} catch (IOException e) {
e.printStackTrace();
}
int finalI = i;
InputStream finalInstream = instream;
Future<PartETag> partETagFuture = taskExecutor.submit(() ->
fileServiceExtAsync.getUploadPartETag(objectName, bucketName, uploadId, finalInstream, curPartSize, finalI + 1, ossClient, countDownLatch));
futureList.add(partETagFuture);
}
try {
countDownLatch.await();
for (Future<PartETag> tagFuture : futureList) {
partETags.add(tagFuture.get());
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
// 创建CompleteMultipartUploadRequest对象。
List<PartETag> collect = partETags.stream().sorted(Comparator.comparing(PartETag::getPartNumber)).collect(Collectors.toList());
// 在执行完成分片上传操作时,需要提供所有有效的partETags。OSS收到提交的partETags后,会逐一验证每个分片的有效性。当所有的数据分片验证通过后,OSS将把这些分片组合成一个完整的文件。
log.debug("文件开始合并");
CompleteMultipartUploadRequest completeMultipartUploadRequest =
new CompleteMultipartUploadRequest(bucketName, objectName, uploadId, collect);
// 如果需要在完成文件上传的同时设置文件访问权限,请参考以下示例代码。
completeMultipartUploadRequest.setObjectACL(CannedAccessControlList.PublicRead);
// 完成上传。
CompleteMultipartUploadResult completeMultipartUploadResult = ossClient.completeMultipartUpload(completeMultipartUploadRequest);
// 关闭OSSClient。
ossClient.shutdown();
log.debug("消耗总时间: {}",((System.currentTimeMillis()-before)/1000)+"s");
return completeMultipartUploadResult;
}