基于Minio封装多部件客户端-分片上传实现
感兴趣的小伙伴,最下面有完整代码实现的仓库地址!
企业微信截图_17351971661031.png
介绍:基于minio-java-sdk封装实现了MinioMultipartClient
客户端,用于分片上传、分片合并、校验等方法,支持同步、异步等分片的上传/合并操作。
核心实现:提供了浏览器客户端(前端)把分片文件直接通过申请的URL上传到minio,不用经过后端,减少网络IO耗时。
废话不多说,上图、上代码。
Minio分片上传-2024-12-24-1718.png
yaml配置
minio:
endpoint: https://play.min.io # 官网示例测试
#endpoint: http://127.0.0.1:9000 # 本地环境测试
access-key: Q3AM3UQ867SPQQA43P2F
secret-key: zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG
bucketName: test-zja
MinioConfig
@Configuration
@RequiredArgsConstructor
public class MinioConfig {
private final MinioProperties minioProperties;
/**
* 配置 Minio客户端
*/
@Bean
public MinioClient minioClient() {
return MinioClient.builder()
.endpoint(minioProperties.getEndpoint())
.credentials(minioProperties.getAccessKey(), minioProperties.getSecretKey())
.build();
}
/**
* 配置 Minio异步客户端
*/
@Bean
public MinioAsyncClient minioAsyncClient() {
return MinioAsyncClient.builder()
.endpoint(minioProperties.getEndpoint())
.credentials(minioProperties.getAccessKey(), minioProperties.getSecretKey())
.build();
}
/**
* 配置Minio多部件客户端:分片上传、合并分片、验证分片等,支持同步/异步
*/
@Bean
public MinioMultipartClient minioMultipartClient() {
return new MinioMultipartClient(minioAsyncClient());
}
}
MinioMultipartClient
/**
* @Author: zhengja
* @Date: 2024-12-18 10:19
*/
@Slf4j
// @Component
public class MinioMultipartClient extends MinioAsyncClient {
public MinioMultipartClient(MinioAsyncClient minioAsyncClient) {
super(minioAsyncClient);
}
/**
* 申请上传id(uploadId)
*/
public String applyForUploadId(ApplyForUploadIdArgs args) throws ServerException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, XmlParserException, InvalidResponseException, InternalException {
CreateMultipartUploadResponse response = createMultipartUpload(args);
if (response == null) {
return null;
}
return response.result().uploadId();
}
/**
* 创建多部分上传
*/
public CreateMultipartUploadResponse createMultipartUpload(ApplyForUploadIdArgs args) throws IOException, InvalidKeyException, NoSuchAlgorithmException, InsufficientDataException, InternalException, XmlParserException, ServerException, ErrorResponseException, InvalidResponseException {
try {
CompletableFuture<CreateMultipartUploadResponse> future = super.createMultipartUploadAsync(args.bucket(), args.region(), args.object(), args.extraHeaders(), args.extraQueryParams());
return future.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
super.throwEncapsulatedException(e);
return null;
}
}
/**
* 上传分片(推荐采用minio官网sdk封装的minioClient.uploadObject(UploadObjectArgs args)方法上传,底层已经封装好了分片上传逻辑,不需要自己重新封装。)
*/
public UploadPartResponse uploadPart(UploadPartArgs args) throws IOException, InvalidKeyException, NoSuchAlgorithmException, InsufficientDataException, InternalException, XmlParserException, ServerException, ErrorResponseException, InvalidResponseException {
try {
return uploadPartAsync(args).get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
super.throwEncapsulatedException(e);
return null;
}
}
/**
* 上传分片-异步
*/
public CompletableFuture<UploadPartResponse> uploadPartAsync(UploadPartArgs args) throws IOException, InvalidKeyException, NoSuchAlgorithmException, InsufficientDataException, InternalException, XmlParserException {
return super.uploadPartAsync(args.bucket(), args.region(), args.object(), args.partData(), args.partSize(), args.uploadId(), args.partNumber(), args.extraHeaders(), args.extraQueryParams());
}
/**
* 获取分片上传的预签名URL(通过此 预签名URL PUT请求进行上传分片)
*/
public String getPresignedPartUrl(GetPresignedPartUrlArgs args) throws ServerException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException {
Multimap<String, String> queryParams = HashMultimap.create();
if (args.extraQueryParams() != null) {
queryParams.putAll(args.extraQueryParams());
}
queryParams.put("uploadId", args.uploadId());
queryParams.put("partNumber", String.valueOf(args.partNumber()));
Multimap<String, String> extraQueryParams = Multimaps.unmodifiableMultimap(queryParams);
GetPresignedObjectUrlArgs urlArgs = GetPresignedObjectUrlArgs.builder().method(Method.PUT).bucket(args.bucket()).object(args.object()).region(args.region()).expiry(args.expiry()).extraHeaders(args.extraHeaders()).versionId(args.versionId()).extraQueryParams(extraQueryParams).build();
return super.getPresignedObjectUrl(urlArgs);
}
/**
* 查询指定对象(即文件)的已上传分片列表。
* 应用场景:当你已经开始了某个对象的分片上传,并且想要查看该对象当前已经上传了哪些分片时使用。通常用于在合并分片之前,确认所有分片是否都已经成功上传。
*/
public ListPartsResponse listParts(ListPartsArgs args) throws NoSuchAlgorithmException, InsufficientDataException, IOException, InvalidKeyException, XmlParserException, InternalException, ServerException, ErrorResponseException, InvalidResponseException {
try {
return super.listPartsAsync(args.bucket(), args.region(), args.object(), args.maxParts(), args.partNumberMarker(), args.uploadId(), args.extraHeaders(), args.extraQueryParams()).get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
super.throwEncapsulatedException(e);
return null;
}
}
/**
* 查询指定存储桶中正在进行的多部分上传任务列表。
* 应用场景:当你想要查看某个存储桶中所有正在进行的多部分上传任务时使用。这可以帮助你了解当前有哪些文件正在分片上传,但还没有完成合并。
*/
public ListMultipartUploadsResponse listMultipartUploads(ListMultipartUploadsArgs args) throws NoSuchAlgorithmException, InsufficientDataException, IOException, InvalidKeyException, XmlParserException, InternalException, ServerException, ErrorResponseException, InvalidResponseException {
try {
return super.listMultipartUploadsAsync(args.bucket(), args.region(), args.delimiter(), args.encodingType(), args.keyMarker(), args.maxUploads(), args.prefix(), args.uploadIdMarker(), args.extraHeaders(), args.extraQueryParams()).get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
super.throwEncapsulatedException(e);
return null;
}
}
/**
* 合并分片:完成分片上传,执行合并文件
*/
public ObjectWriteResponse completeMultipartUpload(CompleteMultipartUploadArgs args) throws IOException, NoSuchAlgorithmException, InsufficientDataException, InternalException, XmlParserException, InvalidKeyException, ServerException, ErrorResponseException, InvalidResponseException {
try {
return completeMultipartUploadAsync(args).get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
super.throwEncapsulatedException(e);
return null;
}
}
/**
* 合并分片-异步:完成分片上传,执行合并文件
*/
public CompletableFuture<ObjectWriteResponse> completeMultipartUploadAsync(CompleteMultipartUploadArgs args) throws IOException, NoSuchAlgorithmException, InsufficientDataException, InternalException, XmlParserException, InvalidKeyException {
return super.completeMultipartUploadAsync(args.bucket(), args.region(), args.object(), args.uploadId(), args.parts(), args.extraHeaders(), args.extraQueryParams());
}
/**
* 验证合并分片前,进行验证分片数据量
*/
public boolean validateCompleteMultipartUploadBefore(ValidateCompleteMultipartUploadBeforeArgs args) throws ServerException, InsufficientDataException, ErrorResponseException, NoSuchAlgorithmException, IOException, InvalidKeyException, XmlParserException, InvalidResponseException, InternalException {
ListPartsResponse listPartsResponse = listParts(ListPartsArgs.builder()
.bucket(args.bucket())
.object(args.object())
.region(args.region())
.uploadId(args.uploadId())
.maxParts(10000)
.extraHeaders(args.extraHeaders())
.extraQueryParams(args.extraQueryParams()).build());
return validateListParts(listPartsResponse, args.objectPartSize());
}
// 完成合并分片前,进行验证分片数据量
public boolean validateListParts(ListPartsResponse partsResponse, long objectPartSize) {
if (partsResponse == null || partsResponse.result() == null || partsResponse.result().partList() == null) {
throw new RuntimeException("分片上传未开始,请先上传分片!");
}
List<Part> partList = partsResponse.result().partList();
int size = partList.size();
if (size == objectPartSize) {
// 分片上传已结束
return true;
} else if (size < objectPartSize) {
// 分片上传未完成
return false;
} else {
// 分片上传已超限
throw new RuntimeException("分片上传已超限,请重新申请上传分片!");
}
}
/**
* 验证合并分片后,进行验证对象完整性(支持按对象名称、对象ETag、对象长度等可选条件匹配验证是否上传完整)
*/
public boolean validateCompleteMultipartUploadAfter(ValidateCompleteMultipartUploadAfterArgs args) throws InsufficientDataException, IOException, NoSuchAlgorithmException, InvalidKeyException, XmlParserException, InternalException, ServerException, ErrorResponseException, InvalidResponseException {
try {
StatObjectResponse objectResponse = super.statObjectAsync(StatObjectArgs.builder()
.bucket(args.bucket())
.region(args.region())
.object(args.object())
.matchETag(args.matchETag())
.extraHeaders(args.extraHeaders())
.extraQueryParams(args.extraQueryParams()).build()).get();
if (objectResponse == null || objectResponse.deleteMarker()) {
return false;
}
return args.matchLength() == 0 || args.matchLength() == objectResponse.size();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
super.throwEncapsulatedException(e);
throw new RuntimeException(e);
}
}
/**
* 取消分片上传
*/
public AbortMultipartUploadResponse abortMultipartUpload(AbortMultipartUploadArgs args) throws InsufficientDataException, NoSuchAlgorithmException, IOException, InvalidKeyException, XmlParserException, InternalException, ServerException, ErrorResponseException, InvalidResponseException {
try {
CompletableFuture<AbortMultipartUploadResponse> future = super.abortMultipartUploadAsync(args.bucket(), args.region(), args.object(), args.uploadId(), args.extraHeaders(), args.extraQueryParams());
return future.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
super.throwEncapsulatedException(e);
return null;
}
}
/**
* 取消分片上传-异步
*/
public CompletableFuture<AbortMultipartUploadResponse> abortMultipartUploadAsync(AbortMultipartUploadArgs args) throws InsufficientDataException, NoSuchAlgorithmException, IOException, InvalidKeyException, XmlParserException, InternalException {
return super.abortMultipartUploadAsync(args.bucket(), args.region(), args.object(), args.uploadId(), args.extraHeaders(), args.extraQueryParams());
}
/**
* 获取文件链接地址(用于预览、下载等)
*/
public String getFileUrl(GetPresignedObjectUrlArgs args) throws ServerException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, InvalidResponseException, XmlParserException, InternalException {
return super.getPresignedObjectUrl(args);
}
}
单元测试
/**
* @Author: zhengja
* @Date: 2024-12-25 14:04
*/
public class MinioMultipartClientTest extends MinioApplicationTests {
@Autowired
MinioMultipartClient minioMultipartClient;
@Value("${minio.bucketName}")
public String bucketName;
public String objectName = "test.zip";
// 源文件
public String sourceFilePath = "D:\\temp\\zip\\test.zip";
// 服务端上传分片
@Test
public void testUploadPart_1() throws ServerException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, XmlParserException, InvalidResponseException, InternalException {
// 1. 申请上传ID
String uploadId = minioMultipartClient.applyForUploadId(ApplyForUploadIdArgs.builder()
.bucket(bucketName)
.object(objectName).build());
System.out.println("uploadId:" + uploadId);
// 2 上传分片-通过服务端上传分片
minioMultipartClient.uploadPart(UploadPartArgs.builder()
.bucket(bucketName)
.object(objectName)
.uploadId(uploadId)
.partData(Files.newInputStream(Paths.get("D:\\temp\\zip\\test\\part1.part")))
.partSize(Files.size(Paths.get("D:\\temp\\zip\\test\\part1.part")))
.partNumber(1)
.build());
minioMultipartClient.uploadPart(UploadPartArgs.builder()
.bucket(bucketName)
.object(objectName)
.uploadId(uploadId)
.partData(Files.newInputStream(Paths.get("D:\\temp\\zip\\test\\part2.part")))
.partSize(Files.size(Paths.get("D:\\temp\\zip\\test\\part2.part")))
.partNumber(2)
.build());
minioMultipartClient.uploadPart(UploadPartArgs.builder()
.bucket(bucketName)
.object(objectName)
.uploadId(uploadId)
.partData(Files.newInputStream(Paths.get("D:\\temp\\zip\\test\\part3.part")))
.partSize(Files.size(Paths.get("D:\\temp\\zip\\test\\part3.part")))
.partNumber(3)
.build());
// 3. 列出分片上传
ListPartsResponse listPartsResponse = minioMultipartClient.listParts(ListPartsArgs.builder()
.bucket(bucketName)
.object(objectName)
.uploadId(uploadId).build());
List<Part> partList = listPartsResponse.result().partList();
int chunkCount = listPartsResponse.result().partList().size();
System.out.println("分片数量:" + chunkCount);
if (chunkCount == 0) {
throw new RuntimeException("分片数量为[0].");
}
for (int i = 0; i < chunkCount; i++) {
Part part = partList.get(i);
System.out.println("分片" + part.partNumber() + "上传成功,etag=" + part.etag());
}
// 4. 完成分片上传
ObjectWriteResponse objectWriteResponse = minioMultipartClient.completeMultipartUpload(CompleteMultipartUploadArgs.builder()
.bucket(bucketName)
.object(objectName)
.uploadId(uploadId)
.parts(partList)
.build());
System.out.println("完成分片上传 objectWriteResponse.object:" + objectWriteResponse.object());
// 5. 获取文件下载地址
String fileUrl = minioMultipartClient.getFileUrl(GetPresignedObjectUrlArgs.builder()
.method(Method.GET)
.bucket(bucketName)
.object(objectName)
.expiry(1, TimeUnit.DAYS).build());
System.out.println("合并后的文件下载地址 fileUrl:" + fileUrl);
}
// 客户端上传分片(模拟,例如:浏览器请求)
@Test
public void testUploadPart_2() throws ServerException, InsufficientDataException, ErrorResponseException, IOException, NoSuchAlgorithmException, InvalidKeyException, XmlParserException, InvalidResponseException, InternalException {
// 1. 申请上传ID
String uploadId = minioMultipartClient.applyForUploadId(ApplyForUploadIdArgs.builder()
.bucket(bucketName)
.object(objectName).build());
System.out.println("uploadId:" + uploadId);
// 2. 获取分片上传地址-通过客户端上传分片
String presignedPartUrl1 = minioMultipartClient.getPresignedPartUrl(GetPresignedPartUrlArgs.builder()
.bucket(bucketName)
.object(objectName)
.uploadId(uploadId)
.partNumber(1)
.build());
String presignedPartUrl2 = minioMultipartClient.getPresignedPartUrl(GetPresignedPartUrlArgs.builder()
.bucket(bucketName)
.object(objectName)
.uploadId(uploadId)
.partNumber(2)
.build());
String presignedPartUrl3 = minioMultipartClient.getPresignedPartUrl(GetPresignedPartUrlArgs.builder()
.bucket(bucketName)
.object(objectName)
.uploadId(uploadId)
.partNumber(3)
.build());
System.out.println("presignedPartUrl1:" + presignedPartUrl1);
System.out.println("presignedPartUrl2:" + presignedPartUrl2);
System.out.println("presignedPartUrl3:" + presignedPartUrl3);
// 3. 上传分片-通过客户端上传分片
OkHttpUtils.doPutUploadFile(presignedPartUrl1, new MockMultipartFile(
objectName,
Files.newInputStream(Paths.get("D:\\temp\\zip\\test\\part1.part"))));
OkHttpUtils.doPutUploadFile(presignedPartUrl2, new MockMultipartFile(
objectName,
Files.newInputStream(Paths.get("D:\\temp\\zip\\test\\part2.part"))));
OkHttpUtils.doPutUploadFile(presignedPartUrl3, new MockMultipartFile(
objectName,
Files.newInputStream(Paths.get("D:\\temp\\zip\\test\\part3.part"))));
// 4. 列出分片上传
ListPartsResponse listPartsResponse = minioMultipartClient.listParts(ListPartsArgs.builder()
.bucket(bucketName)
.object(objectName)
.uploadId(uploadId).build());
List<Part> partList = listPartsResponse.result().partList();
int chunkCount = listPartsResponse.result().partList().size();
System.out.println("分片数量:" + chunkCount);
if (chunkCount == 0) {
throw new RuntimeException("分片数量为[0].");
}
for (int i = 0; i < chunkCount; i++) {
Part part = partList.get(i);
System.out.println("分片" + part.partNumber() + "上传成功,etag=" + part.etag());
}
// 5. 完成分片上传
ObjectWriteResponse objectWriteResponse = minioMultipartClient.completeMultipartUpload(CompleteMultipartUploadArgs.builder()
.bucket(bucketName)
.object(objectName)
.uploadId(uploadId)
.parts(partList)
.build());
System.out.println("完成分片上传 objectWriteResponse.object:" + objectWriteResponse.object());
// 6. 获取文件下载地址
String fileUrl = minioMultipartClient.getFileUrl(GetPresignedObjectUrlArgs.builder()
.method(Method.GET)
.bucket(bucketName)
.object(objectName)
.expiry(1, TimeUnit.DAYS).build());
System.out.println("合并后的文件下载地址 fileUrl:" + fileUrl);
}
}
完整代码示例: