大容量文件夹的拆分和合并处理-Java

日常网络和数据库文件输入输出处理是很耗时间和消耗处理器时间的,所以I/O操作被普遍认为是昂贵的操作。 这里我们假设有一份非常大的文件,比如1G, 我们不可能拿任意一个InputStream 实现类去直接用那文件类File去封装传入构造方法直接处理,操作系统是不会让一个正在执行“非常耗时”的程序去占用大量时间去处理一个I/O 操作。所以我们必须拆分这个大文件为数个小的文件去依此处理。主体思想是先拆分成小字节块的文件在由这些小字节块文件合并成原来文件;这有点类似与网络传输一个大文件的过程,把文件拆除小的数据包然后再由数千甚至上万不等地路径传输到不同的终端。

主要核心:

  • 按某个数量值去拆分大文件成小文件
    • 小文件的读写都由一个特定的BufferSize (字节容器大小)来决定输入和输出流的字节负荷
  • 合并 的思路也类似
    • 按特定的BufferSize (字节容器大小)来决定读写可承受的负荷去处理多个拆分的小文件

下方带图阐述了主要思路

注意 : 本教程未用到同步和并发处理,仅是为了阐述I/O大文件拆分和合并的主体思想,若要用并发处理,下列代码实现需做改变。

FileSpliterAndMerger.png

FileSpliter.java


  • 成员变量
    • - String inputPathName;
    • - String outputPathNamePrefix;
    • - int maxBufferSize;
    • - final int ONE_BYTE = 1024;
  • 成员方法
    • + FileSpliter(String inputPathName, String outputPathNamePrefix, ​ int maxBufferSize)
    • + void splitStart(int numOfSplits)
    • + void split(int numOfSplits) throws Exception
    • + static void readWrite(RandomAccessFile raf, BufferedOutputStream bw, long numBytes) throws IOException

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;

/**
 * FileSpliter: 用于拆分一个大容量文件,把其分成 numOfSplits 小块文件大小字节
 *              处理,这个减缓IOStream的吞吐量。这个类可以日后加多线程处理或用
 *              并行库来用不同的core或者thread去运算和处理每个文件。 
 * @author kaili
 *
 */
public class FileSpliter {
    private String inputPathName;           // 输入文件路径 
    private String outputPathNamePrefix;    // 输出文件路径
    private int maxBufferSize;              // 每个块的数据处理容器大小: maxBufferSize * ONE_BYTE = maxBufferSize kb
    private final int ONE_BYTE = 1024; 
    
    /**
     * 初始化下列参数列表成员变量 
     * @param inputPathName
     * @param outputPathNamePrefix
     * @param maxBufferSize
     */
    public FileSpliter(String inputPathName, String outputPathNamePrefix, 
                        int maxBufferSize) {
         this.inputPathName  = inputPathName; 
         this.outputPathNamePrefix = outputPathNamePrefix; 
         this.maxBufferSize  = maxBufferSize; 
    }
    
    
    /***
     * 启动方法,间接调用拆分方法split
     * @param numOfSplits
     */
    public void splitStart(int numOfSplits) {
        try {
            split(numOfSplits);
        } catch (Exception e) { 
            e.printStackTrace();
        }
    }
    
    /**
     * 辅助方法
     * @param numOfSplits   拆分文件块数量
     * @throws Exception    FileNotFoundException, IOException
     */
    public void split(int numOfSplits) throws Exception {
        File inputFile = new File(inputPathName);
        RandomAccessFile raf = new RandomAccessFile(inputFile, "r");
        long numSplits = numOfSplits; 
        // 记录随机访问输入流的大小
        long sourceSize = raf.length();
        // 算出每个拆分块的大小
        long bytesPerSplit = sourceSize/numSplits ;
        // 算出没被拆分最大值整除的剩余字节
        long remainingBytes = sourceSize % numSplits;

        // 最大的字节数组容器大小,便于数据的读写操作
        int maxReadBufferSize = maxBufferSize * ONE_BYTE; // max size bytes taken in buffer 
        // 遍历所有的拆分块
        for(int destIx=0; destIx < numSplits; destIx++) {
            // 自制合并拆分文件的大小,用前缀加拆分下标destIX
            String filePath = outputPathNamePrefix + "split."+ (destIx + 1);
            
            // 用缓冲输出流来拆分字节文件输出到文件夹
            BufferedOutputStream bw = new BufferedOutputStream(new FileOutputStream(filePath));
            
            // 若每个拆分块字节大小比最大容器字节大小还大,我们
            // 遍历处理这个文件字节块
            if(bytesPerSplit > maxReadBufferSize) {
                // 算出字节块能容多少个最大容器字节
                long numReads = bytesPerSplit/maxReadBufferSize;
                // 算出上个算式剩下不能被最大容器大小的字节数量
                long numRemainingRead = bytesPerSplit % maxReadBufferSize;
                
                // 把numReads * maxReadBufferSize 用遍历
                // 形式去调用readWrite方法 把字节块拆分为
                // 以maxReadBufferSize 容器大小的字节量依依读写
                for(int i=0; i<numReads; i++) {
                    readWrite(raf, bw, maxReadBufferSize);
                }
                
                // 这里读写原理同上, 只不过读写的不是maxReadBufferSize
                // 的大小,而是剩余的字节量
                if(numRemainingRead > 0) {
                    readWrite(raf, bw, numRemainingRead);
                }
            }else {
                // 若拆分字节块的字节量没有maxReadBufferSize大
                // 直接调用readWrite处理
                readWrite(raf, bw, bytesPerSplit);
            }
            bw.flush(); // 记得冲掉bw数据“管道”里的存储数据,以免有遗留
            bw.close(); // 关闭流
        }
        
        // 这个就是拆分的字节块后剩余的字节,若此大于0, 字节用readWrite处理
        if(remainingBytes > 0) {
            BufferedOutputStream bw = new BufferedOutputStream(new FileOutputStream("split."+(numSplits+1)));
            readWrite(raf, bw, remainingBytes);
            bw.flush();
            bw.close();
        }
            raf.close();
    }
    
    /**
     * 辅助方法,读写功能,这个基本做了基本的数据处理工作; 方法要求下列参数。
     * @param fis       input file stream 
     * @param fos       output file stream 
     * @param numBytes  number of bytes to process in each buffer 
     * @return
     * @throws IOException
     */
    public static void readWrite(RandomAccessFile raf, BufferedOutputStream bw, long numBytes) throws IOException {
        // 处理字节容器数组,用于传入输入流:这里是随机访问输入流
        byte[] buf = new byte[(int) numBytes];
        int val = raf.read(buf);
        // 若返回字节值不等于-1, 写字节数据到缓冲输出流的指定文件
        if(val != -1) {
            bw.write(buf);
        }
    }

} 

FileMerger.java


  • 成员变量

    • - FileInputStream fis;
    • - FileOutputStream fos;
    • - List<File> list;
    • - String inputMergerFilePath;
    • - String outputMergerFilePath;
    • - int maxBufferChunkSize;
    • - int ONE_BYTE = 1024;
  • 成员方法

    • + FileMerger(String inputMergerPath, String outputMergerPath, int maxBufferSize)
    • + void startMerging()
    • + void merge()
    • + void processFilesInList(File ofile)
    • + void sortList(List<File> fileLists)
    • + long readWrite(FileInputStream fis, FileOutputStream fos, long numBytes) throws IOException

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
/**
 * FileMerger: 用于合并拆分后的文件夹中的文件
 * @author kaili
 *
 */
public class FileMerger {   
        private FileInputStream fis; 
        private FileOutputStream fos; 
        private List<File> list;
        private String inputMergerFilePath;
        private String outputMergerFilePath; 
        private int maxBufferChunkSize;  
        private int ONE_BYTE = 1024; 
        
        /***
           *  初始化 输入文件路径 和 输出文件路径 和 最大 容器 大小 (用于存储每次读写的容器字节数组)
         * @param inputMergerPath           input path
         * @param outputMergerPath          output path 
         * @param maxBufferSize             the maximum size of each split chunk or size of buffer 
         */
        public FileMerger(String inputMergerPath, String outputMergerPath, int maxBufferSize) {
            this.inputMergerFilePath  = inputMergerPath; 
            this.outputMergerFilePath = outputMergerPath;  
            this.maxBufferChunkSize   = maxBufferSize * ONE_BYTE; // the maximum size of each split chunk 
        }
        
        /**
           *  开始合并
         */
        public void startMerging() {
            merge();
        }
    
        /**
         * 辅助方法用于合并
         */
        public void merge() {  
            // 合并(拆分文件)的输出文件对象
            File ofile = new File(outputMergerFilePath); 
            
            // 输入文件类(本质必须是一个文件夹)
            File inputFile = new File(inputMergerFilePath); 
            
            // 赋值文件列表
            this.list = new ArrayList<File>();

            if (inputFile.isDirectory()) {  // 检查是不是文件夹 
                // 拿到文件夹的里面的文件数组
                File[] listFiles = inputFile.listFiles();   
                // 加载所有拆分文件到列表集合里头
                for (File inputf : listFiles) {
                    list.add(inputf);
                }
                // 注意: inputFile.listFiles() 传回的数组是无序的
                // 所以拆分文件命名必须要遵守某种规则,便于取出文件的序列号
                // 也为排序做好铺垫
                // 排序好数组里的文件 
                sortList(list);
                // 处理列表里的文件
                processFilesInList(ofile);
            } else {
                System.err.println("合并不成功,处理文件必须要是文件夹.");
            }
        } 
        
        
        /**
         * 处理文件方法,遍历所有文件数组里的文件
         * 注意:在多线程或并发环境条件下,这个拆分处理
         // 需要应相应情况而改变。 
         * @param ofile
         */
        public void processFilesInList(File ofile) {
            try {
                // 输出流
                fos = new FileOutputStream(ofile,true);
                // 遍历文件
                for (File file : list) {
                    fis = new FileInputStream(file); 
                    // 这个变量用于检查拆分字节量是否等于原文件大小
                    long sumByte = 0; 
                    // 如文件字节大小大于最大字节容器大小
                    if (file.length() > maxBufferChunkSize) {
                        // 算出从文件字节里可以拆分多少个以maxBufferChunkSize
                        // 为大小的单位
                        long numReads = file.length() / maxBufferChunkSize; 
                        // 算出剩余的拆分后的字节大小
                        long remains  = file.length() % maxBufferChunkSize; 
                        // 读写numReads*maxBufferChunkSize大小的字节量
                        for (int i = 0; i < numReads; i++) {
                            sumByte += readWrite(fis, fos, maxBufferChunkSize);
                        }
                        // 若剩余字节大于零,直接读写
                        if (remains > 0) {
                            sumByte += readWrite(fis, fos, remains);
                        }
                        
                    }  else { // 文件大小小于最大容器数量,直接读写
                        sumByte += readWrite(fis, fos, file.length());
                    } 
                    // 检查拆分字节量的总量是否等于文件字节量
                    assert(sumByte == file.length());
                     
                    fis.close();    // 关闭输入流
                    fis = null;     // 重置输入流
                }
                fos.close();        // 关闭输出流
                fos = null;         // 重置输出流
            }catch (Exception exception){
                exception.printStackTrace();
            }
        }
        
        /**
         * 按文件序列号排序,从小到大。文件名:拆分名.序号
         * 
         * 合同:拆分文件的合并必须时有序的,所以文件命名要遵循一定规则 
         * @param fileLists 文件数组列表
         */
        public void sortList(List<File> fileLists) {
            Collections.sort(fileLists, new Comparator<File>() {  
                @Override
                public int compare(File o1, File o2) {
                    String fileName1 = o1.getName(); 
                    String fileName2 = o2.getName(); 
                     
                    int val1 = Integer.parseInt(fileName1.substring(fileName1.lastIndexOf('.') + 1, fileName1.length()));
                    int val2 = Integer.parseInt(fileName2.substring(fileName2.lastIndexOf('.') + 1, fileName2.length()));
                     
                    if (val1 < val2) {
                        return -1; 
                    } else if (val1 > val2) {
                        return 1; 
                    } else {
                        return 0;
                    }
                } 
            });
        }
        
        /**
         * 读写赋值方法:要求下列参数。 
         * @param fis       input file stream 
         * @param fos       output file stream 
         * @param numBytes  number of bytes to process in each buffer 
         * @return
         * @throws IOException
         */
        public long readWrite(FileInputStream fis, FileOutputStream fos, long numBytes) throws IOException {
            byte[] buf = new byte[(int) numBytes];
            int bytesRead = fis.read(buf, 0, (int)numBytes); // 3个变量的读操作会帮助fis得知处理字节数量
            if(bytesRead != -1) {
                fos.write(buf);
            }
            assert(bytesRead == numBytes); 
            fos.flush();
            return bytesRead;
        } 
}

main 方法实现


 private static final int MAX_BUFFER_SIZE = 8; // 8 kb
    private static final int NUM_OF_SPLITS      = 10;             // 10 SPLITS
    
    public static void main(String[] args) {
        
        // -------------------------------Splitter---------------------------------//
        
        // 注意: 需要改变路径来测试
        String inputSplitPath  = "---------xx路径---------"; 
        String outputSplitPathPrefix = "----------------xx路径---------------"; 
        //---------------------------------------------------------------//
        
        System.out.println("start splitting...");
        Instant start = Instant.now(); 
        FileSpliter fs = new FileSpliter(inputSplitPath, outputSplitPathPrefix, MAX_BUFFER_SIZE); 
        fs.splitStart(NUM_OF_SPLITS);
        Instant end = Instant.now(); 
        
        System.out.println(Duration.between(start, end).getSeconds());
        System.out.println("done spliting...");
        
        
        // ---------------------------------Merger-------------------------------- //
        
        System.out.println("start merging...");
        
        // 注意: 需要改变路径来测试
        String inputMergerFilePath  = "----------------xx路径---------------"; 
        String outputMergerFilePath = "----------------xx路径---------------";  
        
        start = Instant.now(); 
        FileMerger fm1 = new FileMerger(inputMergerFilePath, outputMergerFilePath, MAX_BUFFER_SIZE); 
        fm1.startMerging();
        end = Instant.now();
        
        System.out.println(Duration.between(start, end).getSeconds());
        System.out.println("done merging...");
    }  
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,607评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,239评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,960评论 0 355
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,750评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,764评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,604评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,347评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,253评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,702评论 1 315
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,893评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,015评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,734评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,352评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,934评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,052评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,216评论 3 371
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,969评论 2 355

推荐阅读更多精彩内容