实习总结:文件迁移

根据项目需求,将文件从FDFS,SASS迁移到azure。此文章记录了java对三个服务器文件上传下载简单的实现。

@Controller
@RequestMapping("/file")
public class FileFloadMigrationController extends BaseController {

    private static Logger log = LoggerFactory.getLogger(FileServiceImpl.class);
    @Autowired
    @Qualifier("fileUploadServiceImpl")
    private FileService fileService;

    @Autowired
    private FileUploadedToDao fileUploadedToDao;

    @Autowired(required = false)
    protected MongoTemplate mongoTemplate;

    @Autowired
    private FileMapDao fileMapDao;

    @Autowired
    private FileMapService fileMapService;

    @Autowired
    private FileUploadedService fileUploadedService;

    /**
     * 文件迁移
     * 
     * @Title: fileMigration
     * @param state
     *            状态:fail:失败 upload:上传
     * @param module
     *            模块名 module=all:所有模块
     * @param lastModifyTime
     *            最后修改时间:存在该项时证明要导入新增数据
     * @return
     * @throws Exception
     */
    @RequestMapping(value = "/filefload/{state}/{module}", method = RequestMethod.GET)
    public @ResponseBody Rest fileMigration(@PathVariable String state, @PathVariable String module, String lastModifyTime) throws Exception {
        new HttpClient().getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(0, false));
        List<FileUploaded> fileUploadeds = new ArrayList<FileUploaded>();
        List<FileUploadedTo> findFileUploadedTos = new ArrayList<FileUploadedTo>();
        Map<Integer, Object> result = new HashMap<Integer, Object>();
        // 上传失败后
        if (state.equals("fail")) {
            // 从新上传所有模块中失败数据
            if (module.equals("all")) {
                findFileUploadedTos = getFileUploadsFail();
            } else {
                // 按模块上传
                findFileUploadedTos = getFileUploadsFail(module);
            }

            result = uploadFileFail(findFileUploadedTos);

        } else if (state.equals("upload")) {
            if (module.equals("all")) {
                // 上传所有模块中数据
                fileUploadeds = getFileUploadsNomal();
            } else {
                // 按模块上传数据
                fileUploadeds = getFileUploadsNomal(module);
            }
            // 最后修改时间不为空,需要上传新文件
            // 比如2017-11-09日转移一次后,11日需要在转移。此时lastModifyTime 需要写为2017-11-09
            if (lastModifyTime != null && !lastModifyTime.equals("")) {
                fileUploadeds = getFileUploadTosByLastModify(lastModifyTime);
            }

            result = uploadFileNomal(fileUploadeds);
        }

        return Rest.item(StateCode.STATUS_CODE_SUCCESS, result);
    }

    /**
     * 正常情况下文件迁移,不按照模块。所有模块
     * 
     * @return List<FileUploaded>
     */
    private List<FileUploaded> getFileUploadsNomal() {
        List<FileUploaded> fileUploadeds = new ArrayList<FileUploaded>();
        Query query = new Query();
        fileUploadeds = mongoTemplate.find(query, FileUploaded.class);
        return fileUploadeds;
    }

    /**
     * 正常情况下文件迁移,按照模块
     * 
     * @Title: getFileUploadsNomal
     * @param module
     *            模块名字
     * @return List<FileUploaded>
     */
    private List<FileUploaded> getFileUploadsNomal(String module) {
        List<FileUploaded> fileUploadeds = new ArrayList<FileUploaded>();
        Query query = new Query();
        Criteria criteria = Criteria.where("module").is(module);
        query.addCriteria(criteria);
        fileUploadeds = mongoTemplate.find(query, FileUploaded.class);
        return fileUploadeds;
    }

    /**
     * 上传过程中出现失败情况。从新上传。所有模块
     * 
     * @Title: getFileUploadsFail
     * @return List<FileUploadedTo>
     */
    private List<FileUploadedTo> getFileUploadsFail() {
        List<FileUploadedTo> findFileUploadedTos = new ArrayList<FileUploadedTo>();
        Query query = new Query();
        Criteria criteria = Criteria.where("isSuccess").is(false);
        query.addCriteria(criteria);
        findFileUploadedTos = mongoTemplate.find(query, FileUploadedTo.class);
        return findFileUploadedTos;
    }

    /**
     * 上传过程中出现失败情况。从新上传。按模块进行上传
     * 
     * @Title: getFileUploadsFail
     * @param module
     *            模块名
     * @return List<FileUploadedTo>
     */
    private List<FileUploadedTo> getFileUploadsFail(String module) {
        List<FileUploadedTo> findFileUploadedTos = new ArrayList<FileUploadedTo>();
        Query query = new Query();
        Criteria criteria = Criteria.where("fileUploaded.module").is(module).and("isSuccess").is(false);
        query.addCriteria(criteria);
        findFileUploadedTos = mongoTemplate.find(query, FileUploadedTo.class);
        return findFileUploadedTos;
    }

    /**
     * 上传部分后,出现新增数据。根据文件最后更新时间,进行操作
     * 
     * @Title: getFileUploadTosByLastModify
     * @param lastModifyTime
     *            最后修改时间
     * @return List<FileUploadedTo>
     * @throws ParseException 
     */
    private List<FileUploaded> getFileUploadTosByLastModify(String lastModifyTime) throws ParseException {
        List<FileUploaded> fileUploadeds = new ArrayList<FileUploaded>();
        Query query = new Query();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        Criteria criteria = Criteria.where("lastModify").gte(sdf.parse(lastModifyTime));
        query.addCriteria(criteria);
        fileUploadeds = mongoTemplate.find(query, FileUploaded.class);
        return fileUploadeds;
    }

    /**
     * 正常情况下上传文件主方法
     * 
     * @Title: uploadFileNomal
     * @param fileUploadeds
     * @return
     * @throws Exception
     */
    private Map<Integer, Object> uploadFileNomal(List<FileUploaded> fileUploadeds) throws Exception {
        Map<Integer, Object> map = new HashMap<Integer, Object>();
        List<FileUploadedTo> fileUploadedTos = new ArrayList<FileUploadedTo>();
        if (fileUploadeds == null || fileUploadeds.size() == 0) {
            return map;
        }
        System.out.println("共有文件:" + fileUploadeds.size() + "条");
        long startTime = System.currentTimeMillis();
        // 将表中的数据复制到扩展表中(将文件所有信息复制、设置org为要迁移文件的org、设置迁移失败(false))
        for (FileUploaded fileUploaded : fileUploadeds) {
            FileUploadedTo fileUploadedTo = new FileUploadedTo();
            fileUploadedTo.setFileUploaded(fileUploaded);
            fileUploadedTo.setOrg(fileUploaded.getOrg());
            fileUploadedTo.setSuccess(false);
            fileUploadedTos.add(fileUploadedTo);
        }
        // 保存复制表
        fileUploadedToDao.save(fileUploadedTos);

        uploadFileCurrency(fileUploadedTos);

        long endTime = System.currentTimeMillis();
        System.out.println("程序运行时间: " + (endTime - startTime) + "ms");
        return map;
    }

    /**
     * 上传失败后重新上传主方法
     * 
     * @Title: uploadFileFail
     * @param fileUploadedTos
     * @return
     */
    private Map<Integer, Object> uploadFileFail(List<FileUploadedTo> fileUploadedTos) throws Exception {
        Map<Integer, Object> map = new HashMap<Integer,Object>();
        
        uploadFileCurrency(fileUploadedTos);
        
        return map;
    }

    /**
     * 上传文件通用方法
     * 
     * @Title: uploadFileCurrency
     * @param fileUploadedTos
     */

    private void uploadFileCurrency(List<FileUploadedTo> fileUploadedTos) {
        int i = 0;
        for (FileUploadedTo fileUploadedTo : fileUploadedTos) {
            String path = fileUploadedTo.getFileUploaded().getPath();
            
            if (path != null && path.indexOf("group") != -1) {
                byte fileByte[] = fileService.getFileBytes(path);
                if (fileByte == null) {
                    System.out.println("第" + i++ + "个 fileByte不存在,path是" + path);
                } else {
                    if (path.indexOf(".pdf.") == -1) {// 不是从文件
                        uploadFile(fileUploadedTo, fileByte);
                    } else {// 是从文件
                        uploadSlaveFile(fileByte, "file.pdf", fileUploadedTo);
                    }
                }
            }
        }
    }

    /**
     * 上传文件到分布式系统,并写入文件 文件限制大小 上传的文件可以直接通过id使用特定接口访问 上传失败return null
     * 
     * @param fileBytes
     *            文件
     * @param name
     *            文件名称
     * @param module
     *            模块标示
     * @return
     */
    private void uploadFile(FileUploadedTo fileUploadedTo, byte[] fileBytes) {
        // 获取fileUpload
        FileUploaded fileUploaded = fileUploadedTo.getFileUploaded();
        // 获取模板名
        String module = fileUploaded.getModule();
        // 获取当前文件信息内的org
        String org = fileUploaded.getOrg();
        // 获取文件名
        String name = fileUploaded.getName();
        // 获取文件ID
        String fileId = fileUploaded.getId();
        if (!isLegal(module)) {
            log.error("module ilegal: 模块名称非法");
        }
        try {
            // 长度小于3报错
            if (org.length() < 4) {
                org = "org" + org;
            }
            CloudBlockBlob blob = BlobClient.uploadFile(fileBytes, org, module, name);
            // 如果上传成功会返回一个Blob对象(不为空)
            // 那么此时就将上传后的新路径以及是否上传成功状态进行更改。
            if (blob != null) {
                // 设置更新成功
                fileUploadedTo.setSuccess(true);
                // 设置新路径
                String newPath = BlobClient.getPath(org, blob.getName());
                fileUploadedTo.setNewPath(newPath);
                fileUploadedToDao.update(fileUploadedTo);

                // 更新fileUpload路径。
                fileUploaded.setPath(newPath);
                mongoTemplate.save(fileUploaded);

                // 更新filemap路径
                FileMap fileMap = findByFileId(fileId);
                if (fileMap != null) {
                    fileMap.setFilePath(newPath);
                    fileMapService.update(fileMap);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    /**
     * 根据fileId 获取 fileMap。
     * 
     * @Title: findByFileId
     * @param id
     *            文件ID
     * @return
     * @throws
     */
    private FileMap findByFileId(String id) {
        Query query = new Query(Criteria.where("fileId").is(id));
        return fileMapDao.findOne(query);
    }

    /**
     * 校验模块是否合法<br/>
     * 模块名在properties中存在即合法
     * 
     * @param module
     *            模块名称
     * @return 合法返回true ,非法返回false
     */
    private boolean isLegal(String module) {
        if (module.indexOf('.') >= 0) {
            return false;
        }
        String v = PropertiesUtils.getString("file.path." + module);
        if (v == null || v.length() < 1) {
            return false;
        } else {
            return true;
        }
    }

    /**
     * 上传从文件主方法
     * 
     * @Title: uploadSlaveFile
     * @param fileBytes
     *            从文件字节大小
     * @param prefix_name
     *            从文件前缀
     * @param fileUploadedTo
     *            fileUploaded 复制表
     */
    private void uploadSlaveFile(byte[] fileBytes, String prefix_name, FileUploadedTo fileUploadedTo) {
        String org = fileUploadedTo.getFileUploaded().getOrg();
        FileUploaded master = fileUploadedTo.getFileUploaded();
        String preName = master.getName().substring(0, master.getName().lastIndexOf("."));
        prefix_name = BlobClient.getPath(preName.concat("_cut"), preName.concat(".pdf"));
        System.out.println(prefix_name);
        try {
            CloudBlockBlob blob = BlobClient.uploadFile(fileBytes, org, master.getModule(), prefix_name);
            if (blob != null) {
                fileUploadedTo.setSuccess(true);
            }
            fileUploadedTo.setNewPath(BlobClient.getPath(org, blob.getName()));
        } catch (InvalidKeyException | URISyntaxException | StorageException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        fileUploadedToDao.update(fileUploadedTo);
    }
}

上面的类首先获取数据库中的文件,然后根据文件的路径获取文件的字节。如果路径符合规范且字节不为空,那么就将该文件上传。下面的方法是上传到Azure的方法。

Azure Storage 是微软 Azure 云提供的云端存储解决方案,当前支持的存储类型有 Blob、Queue、File 和 Table。

Azure Blob Storage 是用来存放大量的像文本、图片、视频等非结构化数据的存储服务。我们可以在任何地方通过互联网协议 http 或者 https 访问 Blob Storage。简单说,就是把文件放在云上,给它一个 URL,通过这个 URL 来访问文件。这就涉及到一个问题:如何控制访问权限?答案是我们可以根据自己的需要,设置 Blob 对象是只能被自己访问,还是可以被所有人访问。

下面是 Blog Storage 典型的应用场景:

  • 存储图片和文档,这些文件可以直接通过浏览器访问。
  • 支持分布式访问,主要用于 cdn。
  • 提供视频、音频流。
  • 存储基本的文件备份和归档文件。
    static final String connectionString = String.format(
            "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s;EndpointSuffix=core.chinacloudapi.cn",
            storageAccountName, storageAccountKey);
    /**
     * 通过容器的方式实现上传二进制文件
     * @throws InvalidKeyException 
     * @time:2017年9月22日 上午10:40:19
     */
    public static CloudBlockBlob uploadFile(byte[] fileBytes,String org,String module,String fileName) throws URISyntaxException, StorageException, IOException, InvalidKeyException{
        // CloudStorageAccount 类表示一个 Azure Storage Account,我们需要先创建它的实例,才能访问属于它的资源。
        // 注意连接字符串中的xxx和yyy,分别对应Access keys中的Storage account name 和 key。
        CloudStorageAccount storageAccount = CloudStorageAccount.parse(connectionString);

        // Create the blob client object.
        CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
        CloudBlobContainer container = blobClient.getContainerReference(org);
        container.createIfNotExists(); // 创建一个容器(如果该容器不存在)
        // 保存到azure上的文件的名称,由于azure服务器对重名文件进行覆盖处理,
        // 上传时对文件名称进行唯一标识处理,缺点是在文件服务器看不到文件的真识名称了。
        UUID token = UUID.randomUUID();
        int postFixIndex = fileName.lastIndexOf(".");
        String fileFullName = "";
        String postFix = "";
        String startName = "";
        if (postFixIndex > 0) {
            postFix = fileName.substring(postFixIndex + 1);
            startName = fileName.substring(0, postFixIndex);
        }else {
            startName = fileName;
        }

        /*azure服务器存储相同文件名的文件会覆盖之前的,因此需要对文件名进行处理
        存储到azure上的文件名格试为:原文件名+"_"+uuid+扩展名
        */
        fileFullName = startName.concat("_").concat(token.toString()).concat(".").concat(postFix);


        /*getPath(module,fileFullName) 为放在 container 中的 Blob 的连接路径。
          getBlockBlobReference 方法获得一个 Block 类型的 Blob 对象的引用。
          您可以根据应用的需要,分别调用 getBlobReference,getAppendBlobReference 或 getPageBlobReference 来创建不同类型的 Blob 对象。
        */
        CloudBlockBlob blob = container.getBlockBlobReference(getPath(module,fileFullName));
        try
        {
            InputStream stream = new ByteArrayInputStream(fileBytes);
          //blob.upload(stream, (long) fileBytes.length);
            PutBlock(blob,stream);
        }
        catch (StorageException e)
        {
            return null;
        }
      return blob;
    }

大文件分块上传:

  /**
    *文件过大,分块上传
    * @Title: PutBlock
    * @param blob
    * @param stream
    * @throws StorageException
    * @throws IOException
     */
@SuppressWarnings({ "unchecked", "rawtypes" })
    public static void PutBlock(CloudBlockBlob blob,InputStream stream) throws StorageException, IOException  
    {  
        Iterable<BlockEntry> blockList = new ArrayList<BlockEntry>();
        int len = stream.available();
        int bytesRead = 0;
        int cur_read_len = STEP_LENGTH;
        byte[] b = null;
        int index = 0;
        if (len <= STEP_LENGTH) {// 如果文件没有超过设定长度,一次上传上即可
            blob.upload(stream, (long) len);
        } else {// 如果文件太大,分批上传
            while (true) {
                if (len - bytesRead > STEP_LENGTH) {
                    cur_read_len = STEP_LENGTH;
                } else {
                    cur_read_len = len - bytesRead;
                }
                b = new byte[cur_read_len + 1];
                int bytesReadLength = 0;
                bytesReadLength = stream.read(b, 0, cur_read_len);
                if (bytesReadLength == -1){// end of InputStream
                    blob.commitBlockList(blockList);
                    break;
                }
                bytesRead += bytesReadLength;
                if (bytesRead <= len) {
                    try {
                        String blockId = Base64.getEncoder().encodeToString(String.format("%08d",index).getBytes(StandardCharsets.UTF_8));
                        blob.uploadBlock(blockId, new ByteArrayInputStream(b),Long.valueOf(bytesReadLength));
                        ((AbstractCollection) blockList).add(new BlockEntry(blockId));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    
                } 
                index++;
            }
        }
      
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,711评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,079评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,194评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,089评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,197评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,306评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,338评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,119评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,541评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,846评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,014评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,694评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,322评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,026评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,257评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,863评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,895评论 2 351