根据项目需求,将文件从FDFS,SASS迁移到azure。此文章记录了java对三个服务器文件上传下载简单的实现。
@Controller
@RequestMapping("/file")
public class FileFloadMigrationController extends BaseController {
private static Logger log = LoggerFactory.getLogger(FileServiceImpl.class);
@Autowired
@Qualifier("fileUploadServiceImpl")
private FileService fileService;
@Autowired
private FileUploadedToDao fileUploadedToDao;
@Autowired(required = false)
protected MongoTemplate mongoTemplate;
@Autowired
private FileMapDao fileMapDao;
@Autowired
private FileMapService fileMapService;
@Autowired
private FileUploadedService fileUploadedService;
/**
* 文件迁移
*
* @Title: fileMigration
* @param state
* 状态:fail:失败 upload:上传
* @param module
* 模块名 module=all:所有模块
* @param lastModifyTime
* 最后修改时间:存在该项时证明要导入新增数据
* @return
* @throws Exception
*/
@RequestMapping(value = "/filefload/{state}/{module}", method = RequestMethod.GET)
public @ResponseBody Rest fileMigration(@PathVariable String state, @PathVariable String module, String lastModifyTime) throws Exception {
new HttpClient().getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(0, false));
List<FileUploaded> fileUploadeds = new ArrayList<FileUploaded>();
List<FileUploadedTo> findFileUploadedTos = new ArrayList<FileUploadedTo>();
Map<Integer, Object> result = new HashMap<Integer, Object>();
// 上传失败后
if (state.equals("fail")) {
// 从新上传所有模块中失败数据
if (module.equals("all")) {
findFileUploadedTos = getFileUploadsFail();
} else {
// 按模块上传
findFileUploadedTos = getFileUploadsFail(module);
}
result = uploadFileFail(findFileUploadedTos);
} else if (state.equals("upload")) {
if (module.equals("all")) {
// 上传所有模块中数据
fileUploadeds = getFileUploadsNomal();
} else {
// 按模块上传数据
fileUploadeds = getFileUploadsNomal(module);
}
// 最后修改时间不为空,需要上传新文件
// 比如2017-11-09日转移一次后,11日需要在转移。此时lastModifyTime 需要写为2017-11-09
if (lastModifyTime != null && !lastModifyTime.equals("")) {
fileUploadeds = getFileUploadTosByLastModify(lastModifyTime);
}
result = uploadFileNomal(fileUploadeds);
}
return Rest.item(StateCode.STATUS_CODE_SUCCESS, result);
}
/**
* 正常情况下文件迁移,不按照模块。所有模块
*
* @return List<FileUploaded>
*/
private List<FileUploaded> getFileUploadsNomal() {
List<FileUploaded> fileUploadeds = new ArrayList<FileUploaded>();
Query query = new Query();
fileUploadeds = mongoTemplate.find(query, FileUploaded.class);
return fileUploadeds;
}
/**
* 正常情况下文件迁移,按照模块
*
* @Title: getFileUploadsNomal
* @param module
* 模块名字
* @return List<FileUploaded>
*/
private List<FileUploaded> getFileUploadsNomal(String module) {
List<FileUploaded> fileUploadeds = new ArrayList<FileUploaded>();
Query query = new Query();
Criteria criteria = Criteria.where("module").is(module);
query.addCriteria(criteria);
fileUploadeds = mongoTemplate.find(query, FileUploaded.class);
return fileUploadeds;
}
/**
* 上传过程中出现失败情况。从新上传。所有模块
*
* @Title: getFileUploadsFail
* @return List<FileUploadedTo>
*/
private List<FileUploadedTo> getFileUploadsFail() {
List<FileUploadedTo> findFileUploadedTos = new ArrayList<FileUploadedTo>();
Query query = new Query();
Criteria criteria = Criteria.where("isSuccess").is(false);
query.addCriteria(criteria);
findFileUploadedTos = mongoTemplate.find(query, FileUploadedTo.class);
return findFileUploadedTos;
}
/**
* 上传过程中出现失败情况。从新上传。按模块进行上传
*
* @Title: getFileUploadsFail
* @param module
* 模块名
* @return List<FileUploadedTo>
*/
private List<FileUploadedTo> getFileUploadsFail(String module) {
List<FileUploadedTo> findFileUploadedTos = new ArrayList<FileUploadedTo>();
Query query = new Query();
Criteria criteria = Criteria.where("fileUploaded.module").is(module).and("isSuccess").is(false);
query.addCriteria(criteria);
findFileUploadedTos = mongoTemplate.find(query, FileUploadedTo.class);
return findFileUploadedTos;
}
/**
* 上传部分后,出现新增数据。根据文件最后更新时间,进行操作
*
* @Title: getFileUploadTosByLastModify
* @param lastModifyTime
* 最后修改时间
* @return List<FileUploadedTo>
* @throws ParseException
*/
private List<FileUploaded> getFileUploadTosByLastModify(String lastModifyTime) throws ParseException {
List<FileUploaded> fileUploadeds = new ArrayList<FileUploaded>();
Query query = new Query();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
Criteria criteria = Criteria.where("lastModify").gte(sdf.parse(lastModifyTime));
query.addCriteria(criteria);
fileUploadeds = mongoTemplate.find(query, FileUploaded.class);
return fileUploadeds;
}
/**
* 正常情况下上传文件主方法
*
* @Title: uploadFileNomal
* @param fileUploadeds
* @return
* @throws Exception
*/
private Map<Integer, Object> uploadFileNomal(List<FileUploaded> fileUploadeds) throws Exception {
Map<Integer, Object> map = new HashMap<Integer, Object>();
List<FileUploadedTo> fileUploadedTos = new ArrayList<FileUploadedTo>();
if (fileUploadeds == null || fileUploadeds.size() == 0) {
return map;
}
System.out.println("共有文件:" + fileUploadeds.size() + "条");
long startTime = System.currentTimeMillis();
// 将表中的数据复制到扩展表中(将文件所有信息复制、设置org为要迁移文件的org、设置迁移失败(false))
for (FileUploaded fileUploaded : fileUploadeds) {
FileUploadedTo fileUploadedTo = new FileUploadedTo();
fileUploadedTo.setFileUploaded(fileUploaded);
fileUploadedTo.setOrg(fileUploaded.getOrg());
fileUploadedTo.setSuccess(false);
fileUploadedTos.add(fileUploadedTo);
}
// 保存复制表
fileUploadedToDao.save(fileUploadedTos);
uploadFileCurrency(fileUploadedTos);
long endTime = System.currentTimeMillis();
System.out.println("程序运行时间: " + (endTime - startTime) + "ms");
return map;
}
/**
* 上传失败后重新上传主方法
*
* @Title: uploadFileFail
* @param fileUploadedTos
* @return
*/
private Map<Integer, Object> uploadFileFail(List<FileUploadedTo> fileUploadedTos) throws Exception {
Map<Integer, Object> map = new HashMap<Integer,Object>();
uploadFileCurrency(fileUploadedTos);
return map;
}
/**
* 上传文件通用方法
*
* @Title: uploadFileCurrency
* @param fileUploadedTos
*/
private void uploadFileCurrency(List<FileUploadedTo> fileUploadedTos) {
int i = 0;
for (FileUploadedTo fileUploadedTo : fileUploadedTos) {
String path = fileUploadedTo.getFileUploaded().getPath();
if (path != null && path.indexOf("group") != -1) {
byte fileByte[] = fileService.getFileBytes(path);
if (fileByte == null) {
System.out.println("第" + i++ + "个 fileByte不存在,path是" + path);
} else {
if (path.indexOf(".pdf.") == -1) {// 不是从文件
uploadFile(fileUploadedTo, fileByte);
} else {// 是从文件
uploadSlaveFile(fileByte, "file.pdf", fileUploadedTo);
}
}
}
}
}
/**
* 上传文件到分布式系统,并写入文件 文件限制大小 上传的文件可以直接通过id使用特定接口访问 上传失败return null
*
* @param fileBytes
* 文件
* @param name
* 文件名称
* @param module
* 模块标示
* @return
*/
private void uploadFile(FileUploadedTo fileUploadedTo, byte[] fileBytes) {
// 获取fileUpload
FileUploaded fileUploaded = fileUploadedTo.getFileUploaded();
// 获取模板名
String module = fileUploaded.getModule();
// 获取当前文件信息内的org
String org = fileUploaded.getOrg();
// 获取文件名
String name = fileUploaded.getName();
// 获取文件ID
String fileId = fileUploaded.getId();
if (!isLegal(module)) {
log.error("module ilegal: 模块名称非法");
}
try {
// 长度小于3报错
if (org.length() < 4) {
org = "org" + org;
}
CloudBlockBlob blob = BlobClient.uploadFile(fileBytes, org, module, name);
// 如果上传成功会返回一个Blob对象(不为空)
// 那么此时就将上传后的新路径以及是否上传成功状态进行更改。
if (blob != null) {
// 设置更新成功
fileUploadedTo.setSuccess(true);
// 设置新路径
String newPath = BlobClient.getPath(org, blob.getName());
fileUploadedTo.setNewPath(newPath);
fileUploadedToDao.update(fileUploadedTo);
// 更新fileUpload路径。
fileUploaded.setPath(newPath);
mongoTemplate.save(fileUploaded);
// 更新filemap路径
FileMap fileMap = findByFileId(fileId);
if (fileMap != null) {
fileMap.setFilePath(newPath);
fileMapService.update(fileMap);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 根据fileId 获取 fileMap。
*
* @Title: findByFileId
* @param id
* 文件ID
* @return
* @throws
*/
private FileMap findByFileId(String id) {
Query query = new Query(Criteria.where("fileId").is(id));
return fileMapDao.findOne(query);
}
/**
* 校验模块是否合法<br/>
* 模块名在properties中存在即合法
*
* @param module
* 模块名称
* @return 合法返回true ,非法返回false
*/
private boolean isLegal(String module) {
if (module.indexOf('.') >= 0) {
return false;
}
String v = PropertiesUtils.getString("file.path." + module);
if (v == null || v.length() < 1) {
return false;
} else {
return true;
}
}
/**
* 上传从文件主方法
*
* @Title: uploadSlaveFile
* @param fileBytes
* 从文件字节大小
* @param prefix_name
* 从文件前缀
* @param fileUploadedTo
* fileUploaded 复制表
*/
private void uploadSlaveFile(byte[] fileBytes, String prefix_name, FileUploadedTo fileUploadedTo) {
String org = fileUploadedTo.getFileUploaded().getOrg();
FileUploaded master = fileUploadedTo.getFileUploaded();
String preName = master.getName().substring(0, master.getName().lastIndexOf("."));
prefix_name = BlobClient.getPath(preName.concat("_cut"), preName.concat(".pdf"));
System.out.println(prefix_name);
try {
CloudBlockBlob blob = BlobClient.uploadFile(fileBytes, org, master.getModule(), prefix_name);
if (blob != null) {
fileUploadedTo.setSuccess(true);
}
fileUploadedTo.setNewPath(BlobClient.getPath(org, blob.getName()));
} catch (InvalidKeyException | URISyntaxException | StorageException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
fileUploadedToDao.update(fileUploadedTo);
}
}
上面的类首先获取数据库中的文件,然后根据文件的路径获取文件的字节。如果路径符合规范且字节不为空,那么就将该文件上传。下面的方法是上传到Azure的方法。
Azure Storage 是微软 Azure 云提供的云端存储解决方案,当前支持的存储类型有 Blob、Queue、File 和 Table。
Azure Blob Storage 是用来存放大量的像文本、图片、视频等非结构化数据的存储服务。我们可以在任何地方通过互联网协议 http 或者 https 访问 Blob Storage。简单说,就是把文件放在云上,给它一个 URL,通过这个 URL 来访问文件。这就涉及到一个问题:如何控制访问权限?答案是我们可以根据自己的需要,设置 Blob 对象是只能被自己访问,还是可以被所有人访问。
下面是 Blog Storage 典型的应用场景:
- 存储图片和文档,这些文件可以直接通过浏览器访问。
- 支持分布式访问,主要用于 cdn。
- 提供视频、音频流。
- 存储基本的文件备份和归档文件。
static final String connectionString = String.format(
"DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s;EndpointSuffix=core.chinacloudapi.cn",
storageAccountName, storageAccountKey);
/**
* 通过容器的方式实现上传二进制文件
* @throws InvalidKeyException
* @time:2017年9月22日 上午10:40:19
*/
public static CloudBlockBlob uploadFile(byte[] fileBytes,String org,String module,String fileName) throws URISyntaxException, StorageException, IOException, InvalidKeyException{
// CloudStorageAccount 类表示一个 Azure Storage Account,我们需要先创建它的实例,才能访问属于它的资源。
// 注意连接字符串中的xxx和yyy,分别对应Access keys中的Storage account name 和 key。
CloudStorageAccount storageAccount = CloudStorageAccount.parse(connectionString);
// Create the blob client object.
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
CloudBlobContainer container = blobClient.getContainerReference(org);
container.createIfNotExists(); // 创建一个容器(如果该容器不存在)
// 保存到azure上的文件的名称,由于azure服务器对重名文件进行覆盖处理,
// 上传时对文件名称进行唯一标识处理,缺点是在文件服务器看不到文件的真识名称了。
UUID token = UUID.randomUUID();
int postFixIndex = fileName.lastIndexOf(".");
String fileFullName = "";
String postFix = "";
String startName = "";
if (postFixIndex > 0) {
postFix = fileName.substring(postFixIndex + 1);
startName = fileName.substring(0, postFixIndex);
}else {
startName = fileName;
}
/*azure服务器存储相同文件名的文件会覆盖之前的,因此需要对文件名进行处理
存储到azure上的文件名格试为:原文件名+"_"+uuid+扩展名
*/
fileFullName = startName.concat("_").concat(token.toString()).concat(".").concat(postFix);
/*getPath(module,fileFullName) 为放在 container 中的 Blob 的连接路径。
getBlockBlobReference 方法获得一个 Block 类型的 Blob 对象的引用。
您可以根据应用的需要,分别调用 getBlobReference,getAppendBlobReference 或 getPageBlobReference 来创建不同类型的 Blob 对象。
*/
CloudBlockBlob blob = container.getBlockBlobReference(getPath(module,fileFullName));
try
{
InputStream stream = new ByteArrayInputStream(fileBytes);
//blob.upload(stream, (long) fileBytes.length);
PutBlock(blob,stream);
}
catch (StorageException e)
{
return null;
}
return blob;
}
大文件分块上传:
/**
*文件过大,分块上传
* @Title: PutBlock
* @param blob
* @param stream
* @throws StorageException
* @throws IOException
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public static void PutBlock(CloudBlockBlob blob,InputStream stream) throws StorageException, IOException
{
Iterable<BlockEntry> blockList = new ArrayList<BlockEntry>();
int len = stream.available();
int bytesRead = 0;
int cur_read_len = STEP_LENGTH;
byte[] b = null;
int index = 0;
if (len <= STEP_LENGTH) {// 如果文件没有超过设定长度,一次上传上即可
blob.upload(stream, (long) len);
} else {// 如果文件太大,分批上传
while (true) {
if (len - bytesRead > STEP_LENGTH) {
cur_read_len = STEP_LENGTH;
} else {
cur_read_len = len - bytesRead;
}
b = new byte[cur_read_len + 1];
int bytesReadLength = 0;
bytesReadLength = stream.read(b, 0, cur_read_len);
if (bytesReadLength == -1){// end of InputStream
blob.commitBlockList(blockList);
break;
}
bytesRead += bytesReadLength;
if (bytesRead <= len) {
try {
String blockId = Base64.getEncoder().encodeToString(String.format("%08d",index).getBytes(StandardCharsets.UTF_8));
blob.uploadBlock(blockId, new ByteArrayInputStream(b),Long.valueOf(bytesReadLength));
((AbstractCollection) blockList).add(new BlockEntry(blockId));
} catch (Exception e) {
e.printStackTrace();
}
}
index++;
}
}
}