大数据中的归并排序(Merge Sort)

最近在研究大数据的分治思想,很多场景计算得出的中间结果都是“内部有序,外部无序”,这时候就要用到“归并排序”算法将多个这样的结果集合归并为一个有序的集合。于是就复习了一下“归并排序”。

归并排序的大概流程

  1. 用递归将数组拆分成一个个最小的数组
{6, 9, 5, 2, 4, 9, 3, 11, 44, 55, 43, 27, 18, 94, 35, 32, 17}
{6, 9, 5, 2, 4, 9, 3, 11, 44}{55, 43, 27, 18, 94, 35, 32, 17}
{6, 9, 5, 2, 4}{9, 3, 11, 44}{55, 43, 27, 18}{94, 35, 32, 17}
{6, 9, 5}{2, 4}{9, 3}{11, 44}{55, 43}{27, 18}{94, 35}{32, 17}
{6, 9}{5}{2}{4}{9}{3}{11}{44}{55}{43}{27}{18}{94}{35}{32}{17}
{6}{9}{5}{2}{4}{9}{3}{11}{44}{55}{43}{27}{18}{94}{35}{32}{17}
  1. 将小数组逐个排序合并
{6, 9}{5}{2}{4}{9}{3}{11}{44}{55}{43}{27}{18}{94}{35}{32}{17}
{5, 6, 9}{2, 4}{3, 9}{11, 44}{43, 55}{18, 27}{35, 94}{17, 32}
{2, 4, 5, 6, 9}{3, 9, 11, 44}{18, 27, 43, 55}{17, 32, 35, 94}
{2, 3, 4, 5, 6, 9, 9, 11, 44}{17, 18, 27, 32, 35, 43, 55, 94}
{2, 3, 4, 5, 6, 9, 9, 11, 17, 18, 27, 32, 35, 43, 44, 55, 94}

java代码实现

public class MergeSort {

    public static void main(String[] args) {
        int[] array = {6, 9, 5, 2, 4, 9, 3, 11, 44, 55, 43, 27, 18, 94, 35, 32, 17};
        sort(array, 0, array.length -1);
        Utils.printf(array);
    }


    private static void sort(int[] array, int begin, int end) {
        if(begin >= end ){
            return;
        }

        int mid = (begin + end) / 2;
        
        sort(array, begin, mid);
        sort(array, mid + 1, end);
        merge(array, begin, mid, end);
    }

    private static void merge(int[] array, int begin, int mid, int end) {

        //合并两个数组结果的临时数组
        int[] temp = new int[end - begin + 1];
        //左边数组的遍历指针
        int lStart = begin;
        //右边数组的遍历指针
        int rStart = mid + 1;
        int k = 0;
        while (lStart <= mid && rStart <= end){
            //两个数组的指针对应的元素做比较
            if(array[lStart] < array[rStart]){
                //如果左边的元素小,把左边的元素放入临时数组,并将左边指针向前移动
                temp[k ++] = array[lStart ++];
            }else {
                //如果右边的元素小,把右边的元素放入临时数组,并将右边指针向前移动
                temp[k ++] = array[rStart ++];
            }
        }

        //如果两个数组中还有剩余的元素,就全部加到临时数组中(剩余的肯定比之前加入的元素值大,无需比较)
        while (lStart <= mid){
            temp[k ++] = array[lStart ++];
        }
        while (rStart <= end){
            temp[k ++] = array[rStart ++];
        }
        //把排序合并后的临时数组覆盖到原数组中
        for(int i =0; i < temp.length; i ++){
            array[begin + i] = temp[i];
        }

    }

}

大数据的分布式计算中“归并排序”的应用场景

在大数据场景中,数据量远大于计算机的内存,磁盘和网络的IO又是无法解决的瓶颈,就需要用到分治思想。
比如有1000G的中国2020年某电商平台用户消费总金额数据。数据格式是用户名:消费总金额。所有数据都是无序的。现在有1台磁盘5T/内存8G(程序可用内存约为5G多)的计算机。如何将数据按消费总金额从大到小排序输出?

1. 计算机上将1000G的数据每次加载5G到内存中做排序(这一步不一定要用归并排序,可以用快排等其他排序方式)输出一个5G的小文件,这样得到200份内部有序外部无序的5G文件。

image.png

2. 从每个5G文件中读取25M数据到内存中,内存中有200个有序的数组。这个就要用到“多路归并”,创建一个有序队列,从200个数组每个读取一个元素放入有序队列。

a. 从有序队列中取出第一个元素。
b. 将此元素数据写入输出的文件中,并从此元素所属的数组中的下一个元素放入有序队列。
c. 重复步骤a,b直到某个数组元素耗尽,从这个数组所属的5G文件中,继续加载25M数据到内存中。
d. 重复步骤a,b,c直到所有5G文件的数据全部加载到内存中,并写入到输出文件。


image.png
public class ExternalSort {

    private static int fileCount = 5;

    public static void main(String[] args) {

        List<String[]> temp25MDataList = new ArrayList<>(fileCount);
        //有序队列
        PriorityQueue<SortNode> queue = new PriorityQueue<>(fileCount);
        for (int i = 0; i < fileCount; i++) {
            String[] temp25MData = load25MData(i);
            if (temp25MData != null) {
                queue.offer(new SortNode(temp25MData[0], i, 0));
                temp25MDataList.add(temp25MData);
            }
        }

        while (!queue.isEmpty()) {
            SortNode top = queue.poll();
            outPut(top);
            String[] temp25MData = temp25MDataList.get(top.temp25MDataListIndex);
            if (temp25MData != null) {
                int nextIndex = top.arrayIndex + 1;
                if (nextIndex >= temp25MData.length) {
                    temp25MData = load25MData(top.temp25MDataListIndex);
                    temp25MDataList.set(top.temp25MDataListIndex, temp25MData);
                    if (temp25MData == null) {
                        //文件已经读到头 没有新数据
                        continue;
                    }
                    nextIndex = 0;
                }
                queue.offer(new SortNode(temp25MData[nextIndex], top.temp25MDataListIndex, nextIndex));
            }
        }

    }

    private static int[] fileFlag = new int[fileCount];
    private static String[][][] file = {
            {{"ZMd:78842", "xXj:77789", "OQY:76941", "sYg:75182", "HYn:74065", "LUP:73859", "zRT:72138", "oNp:71667"}, {"cLD:68121", "KlF:67784", "kIN:66996", "jQX:65355", "AeX:64774", "YPo:63734", "jTX:62459", "Mkr:61190"}, {"Ovb:58003", "mEn:57046", "IZA:56354", "JjK:55444", "fCy:54862", "TTb:53371", "cvO:52598", "bkS:51587"}, {"SsO:48568", "KAl:47930", "AfA:46038", "jIe:45192", "nrV:44816", "vsb:43376", "skh:42345", "dtH:41544"}, {"tqy:38536", "xuw:37038", "FSL:36757", "ToD:35277", "uSp:34142", "moQ:33083", "Ebt:32911", "fKy:31058"}, {"NTQ:28601", "cZr:27009", "kji:26874", "XTh:25851", "JyK:24195", "qAm:23031", "rgG:22364", "gwD:21211"}, {"HbE:18722", "IAX:17893", "hWQ:16460", "wRM:15013", "OMw:14013", "cfM:13873", "QAH:12757", "hQe:11005"}},
            {{"POE:78980", "QfG:77939", "Czx:76307", "HbK:75092", "hBP:74457", "OwK:73958", "wPi:72788", "ojI:71505"}, {"ela:68770", "Qby:67636", "fLZ:66952", "lbi:65236", "yyM:64618", "Zdg:63293", "ScE:62698", "yUh:61485"}, {"qDW:58225", "fUs:57631", "PeX:56454", "bXo:55411", "TAz:54106", "WpE:53043", "Pcm:52200", "tCq:51502"}, {"xtv:48353", "Wlg:47817", "SaB:46027", "VIv:45536", "JkS:44980", "SKj:43964", "YYW:42522", "pBA:41114"}, {"STu:38637", "sEo:37158", "vru:36676", "ePQ:35241", "hte:34096", "RWm:33327", "RtI:32647", "HhN:31650"}, {"NXU:28487", "PvG:27528", "FWJ:26530", "oPK:25824", "PUv:24096", "evU:23993", "Tca:22690", "NlC:21121"}, {"Fxb:18336", "zMb:17461", "TNU:16330", "Shp:15694", "keW:14798", "Ohi:13818", "sAs:12711", "PfH:11591"}},
            {{"onA:78675", "NKW:77198", "BiC:76132", "TCt:75323", "mVA:74176", "jtp:73571", "xhI:72545", "UCb:71028"}, {"zkC:68103", "SOo:67748", "pap:66597", "pSr:65427", "RKe:64639", "ncM:63850", "UiQ:62243", "FJS:61733"}, {"XSC:58999", "rKD:57964", "BLB:56886", "okS:55365", "EeY:54187", "ilt:53104", "uZP:52922", "FWY:51497"}, {"bZy:48417", "nsO:47122", "ePq:46682", "qVj:45565", "SqX:44784", "fLV:43697", "URc:42674", "ITa:41361"}, {"qOq:38218", "xzl:37426", "AsU:36587", "iyP:35741", "NcC:34431", "MNq:33121", "oaJ:32654", "eej:31807"}, {"gOG:28888", "DVF:27197", "XqX:26278", "wGs:25024", "OOx:24750", "lcW:23309", "IrX:22209", "hVv:21809"}, {"hpa:18321", "FaA:17425", "pdL:16297", "jtb:15103", "QRt:14261", "GMP:13625", "wZn:12043", "mIs:11716"}},
            {{"bJO:78884", "TeF:77135", "dxq:76073", "MAt:75633", "htE:74536", "NVD:73703", "hQh:72633", "rUB:71319"}, {"YxL:68522", "SAD:67944", "QCS:66813", "bLN:65281", "pya:64909", "HKz:63275", "sTr:62312", "Dzh:61143"}, {"Jvu:58147", "AkG:57181", "IyY:56025", "yQY:55140", "EDI:54258", "KWk:53397", "jDs:52056", "xPF:51308"}, {"pft:48248", "GFD:47917", "vZj:46552", "hEb:45708", "She:44098", "FrM:43914", "xxN:42975", "vPL:41051"}, {"qio:38400", "xoY:37931", "XPj:36668", "rrZ:35825", "sEX:34363", "aMQ:33799", "xKz:32545", "VDW:31023"}, {"LsY:28703", "FAG:27837", "jWv:26353", "azW:25015", "UtA:24988", "ibk:23155", "HvD:22423", "gsQ:21484"}, {"EUC:18337", "RbC:17712", "mzB:16440", "RWS:15565", "uco:14496", "NQM:13077", "MRf:12855", "EGJ:11477"}},
            {{"qbR:78968", "kNB:77422", "ACe:76306", "mts:75256", "pSC:74062", "VWz:73760", "Bss:72004", "EXI:71749"}, {"gqH:68273", "wtS:67736", "Mom:66496", "sbW:65494", "zQc:64862", "GEf:63678", "cdl:62428", "euN:61078"}, {"Wqz:58348", "Trl:57043", "GlW:56513", "DMM:55889", "NAE:54120", "vgP:53512", "kVw:52517", "rIn:51253"}, {"xgm:48739", "hCO:47707", "Tro:46023", "Kql:45625", "eTp:44525", "JXP:43079", "DAh:42748", "Gvw:41238"}, {"CtC:38134", "hIf:37260", "ULK:36578", "YQG:35379", "dvr:34404", "RFD:33182", "BsO:32542", "dth:31035"}, {"nGV:28973", "gZk:27364", "dOJ:26356", "saM:25370", "aaR:24316", "alL:23693", "yYr:22854", "fWK:21847"}, {"WXV:18336", "Fed:17718", "GYT:16476", "nom:15964", "Ybr:14829", "TCO:13902", "tDm:12314", "Hbd:11947"}}
    };

    private static String[] load25MData(int fileIndex) {
        //模拟读取文件
        if (fileFlag[fileIndex] >= file[fileIndex].length) {
            return null;
        }
        return file[fileIndex][fileFlag[fileIndex]++];
    }

    private static void outPut(SortNode sortNode) {
        //模拟输出
        System.out.println(sortNode.toString());
    }

    //多路归并的有序链表节点
    private static class SortNode implements Comparable<SortNode> {

        private String userName;

        private Double amount;

        /**
         * 元素所属的原数组标记
         */
        private int temp25MDataListIndex;
        /**
         * 元素在所属的原数组的位置
         */
        private int arrayIndex;

        public SortNode(String data, int temp25MDataListIndex, int arrayIndex) {
            //数据格式 userName:amount
            String[] split = data.split(":");
            this.userName = split[0];
            this.amount = Double.parseDouble(split[1]);
            this.temp25MDataListIndex = temp25MDataListIndex;
            this.arrayIndex = arrayIndex;
        }

        @Override
        public int compareTo(SortNode b) {
            //倒序
            return -this.amount.compareTo(b.amount);
        }

        @Override
        public String toString() {
            return this.userName + ":" + this.amount;
        }

    }

}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。