最近在研究大数据的分治思想,很多场景计算得出的中间结果都是“内部有序,外部无序”,这时候就要用到“归并排序”算法将多个这样的结果集合归并为一个有序的集合。于是就复习了一下“归并排序”。
归并排序的大概流程
- 用递归将数组拆分成一个个最小的数组
{6, 9, 5, 2, 4, 9, 3, 11, 44, 55, 43, 27, 18, 94, 35, 32, 17}
{6, 9, 5, 2, 4, 9, 3, 11, 44}{55, 43, 27, 18, 94, 35, 32, 17}
{6, 9, 5, 2, 4}{9, 3, 11, 44}{55, 43, 27, 18}{94, 35, 32, 17}
{6, 9, 5}{2, 4}{9, 3}{11, 44}{55, 43}{27, 18}{94, 35}{32, 17}
{6, 9}{5}{2}{4}{9}{3}{11}{44}{55}{43}{27}{18}{94}{35}{32}{17}
{6}{9}{5}{2}{4}{9}{3}{11}{44}{55}{43}{27}{18}{94}{35}{32}{17}
- 将小数组逐个排序合并
{6, 9}{5}{2}{4}{9}{3}{11}{44}{55}{43}{27}{18}{94}{35}{32}{17}
{5, 6, 9}{2, 4}{3, 9}{11, 44}{43, 55}{18, 27}{35, 94}{17, 32}
{2, 4, 5, 6, 9}{3, 9, 11, 44}{18, 27, 43, 55}{17, 32, 35, 94}
{2, 3, 4, 5, 6, 9, 9, 11, 44}{17, 18, 27, 32, 35, 43, 55, 94}
{2, 3, 4, 5, 6, 9, 9, 11, 17, 18, 27, 32, 35, 43, 44, 55, 94}
java代码实现
public class MergeSort {
public static void main(String[] args) {
int[] array = {6, 9, 5, 2, 4, 9, 3, 11, 44, 55, 43, 27, 18, 94, 35, 32, 17};
sort(array, 0, array.length -1);
Utils.printf(array);
}
private static void sort(int[] array, int begin, int end) {
if(begin >= end ){
return;
}
int mid = (begin + end) / 2;
sort(array, begin, mid);
sort(array, mid + 1, end);
merge(array, begin, mid, end);
}
private static void merge(int[] array, int begin, int mid, int end) {
//合并两个数组结果的临时数组
int[] temp = new int[end - begin + 1];
//左边数组的遍历指针
int lStart = begin;
//右边数组的遍历指针
int rStart = mid + 1;
int k = 0;
while (lStart <= mid && rStart <= end){
//两个数组的指针对应的元素做比较
if(array[lStart] < array[rStart]){
//如果左边的元素小,把左边的元素放入临时数组,并将左边指针向前移动
temp[k ++] = array[lStart ++];
}else {
//如果右边的元素小,把右边的元素放入临时数组,并将右边指针向前移动
temp[k ++] = array[rStart ++];
}
}
//如果两个数组中还有剩余的元素,就全部加到临时数组中(剩余的肯定比之前加入的元素值大,无需比较)
while (lStart <= mid){
temp[k ++] = array[lStart ++];
}
while (rStart <= end){
temp[k ++] = array[rStart ++];
}
//把排序合并后的临时数组覆盖到原数组中
for(int i =0; i < temp.length; i ++){
array[begin + i] = temp[i];
}
}
}
大数据的分布式计算中“归并排序”的应用场景
在大数据场景中,数据量远大于计算机的内存,磁盘和网络的IO又是无法解决的瓶颈,就需要用到分治思想。
比如有1000G的中国2020年某电商平台用户消费总金额数据。数据格式是用户名:消费总金额。所有数据都是无序的。现在有1台磁盘5T/内存8G(程序可用内存约为5G多)的计算机。如何将数据按消费总金额从大到小排序输出?
1. 计算机上将1000G的数据每次加载5G到内存中做排序(这一步不一定要用归并排序,可以用快排等其他排序方式)输出一个5G的小文件,这样得到200份内部有序外部无序的5G文件。
2. 从每个5G文件中读取25M数据到内存中,内存中有200个有序的数组。这个就要用到“多路归并”,创建一个有序队列,从200个数组每个读取一个元素放入有序队列。
a. 从有序队列中取出第一个元素。
b. 将此元素数据写入输出的文件中,并从此元素所属的数组中的下一个元素放入有序队列。
c. 重复步骤a,b直到某个数组元素耗尽,从这个数组所属的5G文件中,继续加载25M数据到内存中。
d. 重复步骤a,b,c直到所有5G文件的数据全部加载到内存中,并写入到输出文件。
public class ExternalSort {
private static int fileCount = 5;
public static void main(String[] args) {
List<String[]> temp25MDataList = new ArrayList<>(fileCount);
//有序队列
PriorityQueue<SortNode> queue = new PriorityQueue<>(fileCount);
for (int i = 0; i < fileCount; i++) {
String[] temp25MData = load25MData(i);
if (temp25MData != null) {
queue.offer(new SortNode(temp25MData[0], i, 0));
temp25MDataList.add(temp25MData);
}
}
while (!queue.isEmpty()) {
SortNode top = queue.poll();
outPut(top);
String[] temp25MData = temp25MDataList.get(top.temp25MDataListIndex);
if (temp25MData != null) {
int nextIndex = top.arrayIndex + 1;
if (nextIndex >= temp25MData.length) {
temp25MData = load25MData(top.temp25MDataListIndex);
temp25MDataList.set(top.temp25MDataListIndex, temp25MData);
if (temp25MData == null) {
//文件已经读到头 没有新数据
continue;
}
nextIndex = 0;
}
queue.offer(new SortNode(temp25MData[nextIndex], top.temp25MDataListIndex, nextIndex));
}
}
}
private static int[] fileFlag = new int[fileCount];
private static String[][][] file = {
{{"ZMd:78842", "xXj:77789", "OQY:76941", "sYg:75182", "HYn:74065", "LUP:73859", "zRT:72138", "oNp:71667"}, {"cLD:68121", "KlF:67784", "kIN:66996", "jQX:65355", "AeX:64774", "YPo:63734", "jTX:62459", "Mkr:61190"}, {"Ovb:58003", "mEn:57046", "IZA:56354", "JjK:55444", "fCy:54862", "TTb:53371", "cvO:52598", "bkS:51587"}, {"SsO:48568", "KAl:47930", "AfA:46038", "jIe:45192", "nrV:44816", "vsb:43376", "skh:42345", "dtH:41544"}, {"tqy:38536", "xuw:37038", "FSL:36757", "ToD:35277", "uSp:34142", "moQ:33083", "Ebt:32911", "fKy:31058"}, {"NTQ:28601", "cZr:27009", "kji:26874", "XTh:25851", "JyK:24195", "qAm:23031", "rgG:22364", "gwD:21211"}, {"HbE:18722", "IAX:17893", "hWQ:16460", "wRM:15013", "OMw:14013", "cfM:13873", "QAH:12757", "hQe:11005"}},
{{"POE:78980", "QfG:77939", "Czx:76307", "HbK:75092", "hBP:74457", "OwK:73958", "wPi:72788", "ojI:71505"}, {"ela:68770", "Qby:67636", "fLZ:66952", "lbi:65236", "yyM:64618", "Zdg:63293", "ScE:62698", "yUh:61485"}, {"qDW:58225", "fUs:57631", "PeX:56454", "bXo:55411", "TAz:54106", "WpE:53043", "Pcm:52200", "tCq:51502"}, {"xtv:48353", "Wlg:47817", "SaB:46027", "VIv:45536", "JkS:44980", "SKj:43964", "YYW:42522", "pBA:41114"}, {"STu:38637", "sEo:37158", "vru:36676", "ePQ:35241", "hte:34096", "RWm:33327", "RtI:32647", "HhN:31650"}, {"NXU:28487", "PvG:27528", "FWJ:26530", "oPK:25824", "PUv:24096", "evU:23993", "Tca:22690", "NlC:21121"}, {"Fxb:18336", "zMb:17461", "TNU:16330", "Shp:15694", "keW:14798", "Ohi:13818", "sAs:12711", "PfH:11591"}},
{{"onA:78675", "NKW:77198", "BiC:76132", "TCt:75323", "mVA:74176", "jtp:73571", "xhI:72545", "UCb:71028"}, {"zkC:68103", "SOo:67748", "pap:66597", "pSr:65427", "RKe:64639", "ncM:63850", "UiQ:62243", "FJS:61733"}, {"XSC:58999", "rKD:57964", "BLB:56886", "okS:55365", "EeY:54187", "ilt:53104", "uZP:52922", "FWY:51497"}, {"bZy:48417", "nsO:47122", "ePq:46682", "qVj:45565", "SqX:44784", "fLV:43697", "URc:42674", "ITa:41361"}, {"qOq:38218", "xzl:37426", "AsU:36587", "iyP:35741", "NcC:34431", "MNq:33121", "oaJ:32654", "eej:31807"}, {"gOG:28888", "DVF:27197", "XqX:26278", "wGs:25024", "OOx:24750", "lcW:23309", "IrX:22209", "hVv:21809"}, {"hpa:18321", "FaA:17425", "pdL:16297", "jtb:15103", "QRt:14261", "GMP:13625", "wZn:12043", "mIs:11716"}},
{{"bJO:78884", "TeF:77135", "dxq:76073", "MAt:75633", "htE:74536", "NVD:73703", "hQh:72633", "rUB:71319"}, {"YxL:68522", "SAD:67944", "QCS:66813", "bLN:65281", "pya:64909", "HKz:63275", "sTr:62312", "Dzh:61143"}, {"Jvu:58147", "AkG:57181", "IyY:56025", "yQY:55140", "EDI:54258", "KWk:53397", "jDs:52056", "xPF:51308"}, {"pft:48248", "GFD:47917", "vZj:46552", "hEb:45708", "She:44098", "FrM:43914", "xxN:42975", "vPL:41051"}, {"qio:38400", "xoY:37931", "XPj:36668", "rrZ:35825", "sEX:34363", "aMQ:33799", "xKz:32545", "VDW:31023"}, {"LsY:28703", "FAG:27837", "jWv:26353", "azW:25015", "UtA:24988", "ibk:23155", "HvD:22423", "gsQ:21484"}, {"EUC:18337", "RbC:17712", "mzB:16440", "RWS:15565", "uco:14496", "NQM:13077", "MRf:12855", "EGJ:11477"}},
{{"qbR:78968", "kNB:77422", "ACe:76306", "mts:75256", "pSC:74062", "VWz:73760", "Bss:72004", "EXI:71749"}, {"gqH:68273", "wtS:67736", "Mom:66496", "sbW:65494", "zQc:64862", "GEf:63678", "cdl:62428", "euN:61078"}, {"Wqz:58348", "Trl:57043", "GlW:56513", "DMM:55889", "NAE:54120", "vgP:53512", "kVw:52517", "rIn:51253"}, {"xgm:48739", "hCO:47707", "Tro:46023", "Kql:45625", "eTp:44525", "JXP:43079", "DAh:42748", "Gvw:41238"}, {"CtC:38134", "hIf:37260", "ULK:36578", "YQG:35379", "dvr:34404", "RFD:33182", "BsO:32542", "dth:31035"}, {"nGV:28973", "gZk:27364", "dOJ:26356", "saM:25370", "aaR:24316", "alL:23693", "yYr:22854", "fWK:21847"}, {"WXV:18336", "Fed:17718", "GYT:16476", "nom:15964", "Ybr:14829", "TCO:13902", "tDm:12314", "Hbd:11947"}}
};
private static String[] load25MData(int fileIndex) {
//模拟读取文件
if (fileFlag[fileIndex] >= file[fileIndex].length) {
return null;
}
return file[fileIndex][fileFlag[fileIndex]++];
}
private static void outPut(SortNode sortNode) {
//模拟输出
System.out.println(sortNode.toString());
}
//多路归并的有序链表节点
private static class SortNode implements Comparable<SortNode> {
private String userName;
private Double amount;
/**
* 元素所属的原数组标记
*/
private int temp25MDataListIndex;
/**
* 元素在所属的原数组的位置
*/
private int arrayIndex;
public SortNode(String data, int temp25MDataListIndex, int arrayIndex) {
//数据格式 userName:amount
String[] split = data.split(":");
this.userName = split[0];
this.amount = Double.parseDouble(split[1]);
this.temp25MDataListIndex = temp25MDataListIndex;
this.arrayIndex = arrayIndex;
}
@Override
public int compareTo(SortNode b) {
//倒序
return -this.amount.compareTo(b.amount);
}
@Override
public String toString() {
return this.userName + ":" + this.amount;
}
}
}