1. 我获取的是本地文件路径而并不是文件流, 所以在分段上传时需要把本地文件路径通过fs.createReadStream转换成可读文件流,然后通过data事件将所有的Buffer都保存起来,然后通过end事件将其转换成文件流
const {
S3Client,
UploadPartCommand,
CreateMultipartUploadCommand,
CompleteMultipartUploadCommand,
AbortMultipartUploadCommand,
ListMultipartUploadsCommand
} = require('@aws-sdk/client-s3')
......
data () {
return {
s3: null,
chunkSize: 1024 * 1024 * 10, // 分片大小
bucketName: 'xx' , //桶名称
// 上传文件列表
filelist: [{
id: uuidv4(),
name: 'xxx',
type 'xxx',
size 'xxx',
localPath: 'C:\\file\\xxxxx.png',
percentage: 0, // 进度条进度
// 上传状态
status: 'wait',
// 是否暂停
needSuspend: false,
sharding: [], //已上传分片数组
}]
}
}
.......
initClient () {
// s3Client 是后端返回的s3配置
const s3Client = { ..... }
const params = {
endpoint: s3Client.endpoint,
region: 'us-east-1',
signatureVersion: 'v4',
credentials: {
accessKeyId: s3Client.accessKey,
secretAccessKey: s3Client.secretKey,
sessionToken: s3Client.sessionToken,
},
}
this.bucketName = s3Client.defaultBucketName
this.path = s3Client.path
this.s3 = new S3Client(params)
}
uploadList () {
for(let i = 0; i < this.filelist.length; i++) {
this.uploadItem(this.filelist(i))
}
}
uploadItem (item) {
fs.stat(item.localPath, async (err, data) => {
if (err) {
this.$message({
message: '上传文件不存在或文件路径已发生变化!',
type: 'error',
})
this.filelist= this.filelist.map((item) => ({
...item,
status: 'fail',
}))
return
}
// 建立分段上传,用户获取任务id
const connect = await this.s3.send(
new CreateMultipartUploadCommand({
Bucket: this.bucketName,
Key: `${this.path}/client_b/${item.name}`,
ContentType: item.type,
}),
)
// 获取文件总大小
const fileSize = item.size
// 计算块的数量
const numberOfChunks = Math.ceil(fileSize / this.chunkSize)
let start = 0
// const chunkList = []
for (let i = 1; i <= numberOfChunks; i++) {
// 如果文件暂停,就终止
if (item.needSuspend) {
return
}
// 解决fs.createReadStream读取分段文件,总是会大1的问题
const end = Math.min(i * this.chunkSize, fileSize) - 1
let res = await this.uploadChunk(i, start, end, connect.UploadId, item)
// 解决fs.createReadStream读取分段文件,总是会大1的问题
start = end + 1
item.sharding.push({
ETag: res.ETag,
PartNumber: i,
UploadId: connect.UploadId,
})
if (item.sharding.length === numberOfChunks) {
const parts = item.sharding.map((i) => ({
ETag: i.ETag,
PartNumber: i.PartNumber,
}))
this.s3
.send(
new CompleteMultipartUploadCommand({
Bucket: this.bucketName,
Key: `${this.path}/client_b/${item.name}`,
// Key: item.name,
MultipartUpload: {
Parts: parts,
},
UploadId: connect.UploadId,
}),
)
.then((complete) => {
// 合并成功后,告诉后端文件分片上传成功
this.updateFileInfo(item)
// console.log(complete, 'complete')
})
.catch((err) => {
// 合并失败
this.filelist= this.filelist.map((i) => ({
...i,
status: i.id === item.id ? 'fail' : i.status,
}))
// 当列表数据只有一条时且状态为fail时, 关闭旋转动画(本项目要求,其他可忽略)
if (this.filelist.length) {
this.$store.commit('setIsUploadRotate', false)
} else {
// 当列表中存在状态不为fail时
const r = this.filelist.some((item) => item.status !== 'fail')
if (!r) {
this.$store.commit('setIsUploadRotate', false)
}
}
})
}
}
})
}
uploadChunk(chunkNumber, start, end, uploadId, item) {
return new Promise((resolve, reject) => {
const readStream = fs.createReadStream(item.localPath, {
start,
end,
highWaterMark: this.chunkSize,
})
const chunkBuffer = []
readStream.on('data', (chunk) => {
chunkBuffer.push(chunk)
})
readStream.on('end', (err) => {
const blob = new Blob(chunkBuffer)
const _chunkFile = new File([blob], item.name)
const params = {
Bucket: this.bucketName,
Key: `${this.path}/client_b/${item.name}`,
PartNumber: chunkNumber,
Body: _chunkFile,
UploadId: uploadId,
}
this.s3
.send(new UploadPartCommand(params))
.then((res) => {
// 设置进度条
this.$set(item, 'percentage', (end / item.size).toFixed(2) * 100)
resolve(res)
})
.catch((err) => {
item.status = 'fail'
// 当列表数据只有一条时且状态为fail时, 关闭旋转动画
if (this.viewList.length) {
this.$store.commit('setIsUploadRotate', false)
} else {
// 当列表中存在状态不为fail时
const r = this.viewList.some((item) => item.status !== 'fail')
if (!r) {
this.$store.commit('setIsUploadRotate', false)
}
}
})
})
})
},