问题来源:
针对一个大文件,如何对里面的元素进行排序
问题描述:
24GTxt文件,每行1个大整数,1-100位不等
纯JDK排序。
解决方案:
程序流程
- 源文件采用单线程NIO按行读
- 读到的每一行入到队列A
- 开启16个线程(根据CPU核数),去消费这个队列
- 消费之后,把数据写入相关的文件待排序
- 开启8个线程并发排序每个待排序文件(读进来,排序,写)
- 按文件名做合并
经验总结
- 文件的读取先要看清楚是按行还是按字节。 如果按行读,不能用多线程,方法是读1个BUFFERED,判断结束是否是换行,如果不是,就按字节读,一直读到是换行为止,或者按BUFFERED读,然后按换行截取,剩下的就拼在下一个BUFFERED的头部。如果按字节读,可以用多线程(RandomAccessFile
- 读和写,最好设置缓存大小。16M刚好
- Eclipse运行的java程序是独立的JVM,如果内存不够,可以加参数-Xms3072m -Xmx6072m
- 遇到高并发自增,可以采用AtomicInteger
- ByteBuffer.array() 返回的 array 长度为 ByteBuffer allocate的长度,并不是里面所含的内容的长度
//这样会导致,最后读取的肯定不是allocate的长度,但是array返回的带有上一次的冗余数据
//解决办法如下,重新按照剩余容量来制作一个新的byte
byte[] data;
if(buffer.remaining() != buffer.capacity()){
data = new byte[buffer.remaining()];
buffer.get(data, 0, data.length);
}else{
data = buffer.array();
}
String content = new String(data);
6.如果中断线程池里面的线程
可以使用Pool.shutdown. 但是前提是线程里面有阻断方法。如Sleep或者阻塞队列等等。
7.对于阻塞队列,入队和出队所占用的时间比较长,做实时性的性能差,因为阻塞涉及到加锁
8.线程池不能设置setDaemon。如果线程池里面的线程读守候,那线程就无法回收了。矛盾
9.同1时刻,1个CPU运行1个或者多个线程,如8核两线程,那就是一共16个线程
测试报告
-
运行结果
- SSD 10分钟跑完24G
- 机械硬盘 80分钟跑完24G
程序启动使用内存
32位JDK启动程序使用内存 | 64位JDK启动程序使用 | |
---|---|---|
-Xms1g | 11M | 5M |
-Xms1.1g | 12M | |
-Xms1.2g | 报错 | |
-Xms2g | 报错 | 10M |
-Xms3g | 报错 | 15M |
-Xms5g | 报错 | 25M |
-Xms6g | 报错 | 30M |
- BufferedWriter占用内存数(基于64位JDK,-Xms5g)
BufferedWriter bw = new BufferedWriter
(new FileWriter(new File("D:\\temp\\bigdata\\des3g\\"+i+".txt")),内存大小);
- BufferedWriter 缓存 5M 每个对象大概占用10M
创建对象数量 占用内存 2 25M 3 35M 4 45M 500 1265M(GC)
- BufferedWriter 缓存 3M 每个对象大概占用6M
创建对象数量 占用内存 4 25M 5 31M 6 37M 500 507M 1265M (GC)
- BufferedWriter 缓存 1M 每个对象大概占用2M
创建对象数量 占用内存 12 25M 13 27M 14 29M 500 1006M(GC)
程序代码
- 排序代码
package com.bingo4;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class BigSort {
/**************************************** 可配置项 ***********************************/
// 是否开启内存监控,每2秒打印内存情况
public boolean isRamMonitor = false;
// 待排序文件
public String SRC_DATA = "d://temp//bigdata/src/100m.txt";
// 排序完毕生成的文件地址
public String DES_DATA_PATH = "d://temp//bigdata//des//";
// 默认开启,每1位数,就分发成10个待排序文件,如果待排序文件里面最大是60位数,就分发成600个待排序文件.源文件如果超过8G左右,必须开启,否则后面单个文件做排序会导致内存溢出
// 如果关闭,每1位数,就分发成1个待排序文件,这个对于源文件不大的情况下,速度极快。
public boolean isDeliverTen = true;
// 读入待排序文件缓存
final static int BSIZE = 1024 * 1024 * 1; // 3M
// 写入数据区间文件缓存
final static int WRITE_SORT_BSIZE = 1024 * 1024 * 3; // 3M
// 排序读写缓存
final static int SORT_READER_BSIZE = 1024 * 1024 * 1; // 5M
final static int SORT_WRITE_BSIZE = 1024 * 1024 * 1; // 5M
// 合并读写缓存
final static int MERGE_BSIZE = 1024 * 1024 * 2; // 5M
// 分发数据线程大小
public static int DELIVER_DATA_QUEUE_SIZE = 16;
// 每个数据区间监听队列的线程数, 这里设置为1,效率最高
public static int RANG_QUEUE_SIZE = 1;
// 并发排序线程数
public static int SORT_THREAD_SIZE = 8;
/**************************************** 可配置项 ***********************************/
public String DES_SORT_DATA_PATH = DES_DATA_PATH + "sort//";
public String MERGE_FILE = DES_DATA_PATH + "merge//merge.txt";
public String MERGE_FILE_PATH = DES_DATA_PATH + "merge//";
int cpuNums = Runtime.getRuntime().availableProcessors();
// 分发数据队列
public ConcurrentLinkedQueue<String> deliverDataQueue = new ConcurrentLinkedQueue<String>();
// 分发数据线程的执行线程池
public ExecutorService deliverDataThreadES = Executors.newFixedThreadPool(DELIVER_DATA_QUEUE_SIZE);
// 数据分布范围集合
public Map<Integer, ConcurrentLinkedQueue<String>> dataRangMap = new HashMap<Integer, ConcurrentLinkedQueue<String>>();
// 数据分布写入对象
public Map<Integer, BufferedWriter> dataWriteMap = new ConcurrentHashMap<Integer, BufferedWriter>();
// 数据区间线程池
public ExecutorService dataRangeThreadES = Executors.newFixedThreadPool(1);
// CAS:将这个变量更新为新值,但是如果从我上次看到这个变量之后其他线程修改了它的值,那么更新就失败”
// 已经读取完毕的数据行数
public AtomicInteger hasReaderDataLine = new AtomicInteger(0);
// 通过多线程,已经按数据区间处理好的数据行数
public AtomicInteger hasDataRangeWriteLine = new AtomicInteger(0);
// 已排序的总行数
public AtomicInteger hasSortedDataLine = new AtomicInteger(0);
// 已经读到内存等待排序的总行数
public AtomicInteger hasWaitSortedDataLine = new AtomicInteger(0);
// 已排序的文件数
public AtomicInteger hasSortedFile = new AtomicInteger(0);
// 已合并好的文件数
public AtomicInteger hasCombineFile = new AtomicInteger(0);
// 程序启动时间
public long startTime = 0l;
// 读取文件完成时间
public long finishReadFileTime = 0l;
// 等待分发完毕时间
public long finishDeliverFileTime = 0l;
// 排序完成时间
public long finishSortFileTime = 0l;
// 合并完成时间
public long finishCombineFileTime = 0l;
// 内存监控线程
public Thread ramMonitorT = new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
Memory.print();
Thread.sleep(2000);
}
} catch (Exception e) {
}
}
});
public static void main(String[] args) throws Exception {
BigSort sort = new BigSort();
// 待排序文件
if ((args.length > 0) && !args[0].equals("")) {
sort.SRC_DATA = args[0];
}
// 目的文件
if ((args.length > 1) && !args[1].equals("")) {
if (!args[1].endsWith("\\")) {
sort.DES_DATA_PATH = args[1] + "\\";
} else {
sort.DES_DATA_PATH = args[1];
}
sort.DES_SORT_DATA_PATH = sort.DES_DATA_PATH + "sort//";
sort.MERGE_FILE = sort.DES_DATA_PATH + "merge//merge.txt";
sort.MERGE_FILE_PATH = sort.DES_DATA_PATH + "merge//";
}
sort.start();
}
/**
* 程序启动入口
*
* @throws Exception
*
*/
public void start() throws Exception {
System.out.println(String.format("CPU核心数[%s] 最大可用内存:[%sM] 初始化内存:[%sM]", cpuNums,
Memory.getMaxHeapMemory() / 1024 / 1024, Memory.getInitHeapMemory() / 1024 / 1024));
Memory.print();
// 是否开启内存监控
if (isRamMonitor) {
ramMonitorT.setDaemon(true);
ramMonitorT.start();
}
// 1.准备阶段
if (!prepare()) {
return;
}
// 2.对源文件进行读取入队处理
readFile(new File(SRC_DATA));
// 3.等待分发数据线程把数据分发完毕,然后把线程池里面的线程全部终止
waitForFinishWriteDataRange();
System.gc();
// 4.对每个文件单独排序
sort();
// 5.合并
combine();
System.out.println(String.format("[程序已全部完成][一共用时:%s秒][读:%s秒,割:%s秒,排:%s秒,合:%s秒]",
((System.currentTimeMillis() - startTime) / 1000), finishReadFileTime, finishDeliverFileTime,
finishSortFileTime, finishCombineFileTime));
System.out.println(String.format("[已排序完的文件在:%s]", MERGE_FILE));
}
// 1.准备阶段,文件准备
public boolean prepare() {
try {
System.out.println("[文件及目录检查][开始]");
File srcFile = new File(SRC_DATA);
if (!srcFile.exists()) {
System.out.println("[文件及目录检查][失败][待排序文件不存在,程序结束]" + SRC_DATA);
return false;
}
// 删掉已存在的临时文件
File desDataPath = new File(DES_DATA_PATH);
// if(desDataPath.exists()){
// if(deleteDir(desDataPath));
// }
// 创建目录
if (!desDataPath.exists()) {
desDataPath.mkdir();
}
// 创建目录
File desSortDataPath = new File(DES_SORT_DATA_PATH);
if (!desSortDataPath.exists()) {
desSortDataPath.mkdir();
}
// 创建目录
File mergeFilePath = new File(MERGE_FILE_PATH);
if (!mergeFilePath.exists()) {
mergeFilePath.mkdir();
}
File mergeFile = new File(MERGE_FILE);
if (mergeFile.exists()) {
mergeFile.delete();
}
System.out.println(String.format("[文件及目录检查][待排序文件路径:%s]", SRC_DATA));
System.out.println(String.format("[文件及目录检查][排序完毕生成的文件地址:%s]", DES_DATA_PATH));
System.out.println("[文件及目录检查][完毕]");
} catch (Exception e) {
System.out.println("[文件及目录检查][失败,程序结束][原因]" + e.getMessage());
return false;
}
System.out.println("[启动分发数据监听线程][开始]");
startTime = System.currentTimeMillis();
for (int i = 0; i < DELIVER_DATA_QUEUE_SIZE; i++) {
DeliverDataThread ddt = new DeliverDataThread(deliverDataQueue);
deliverDataThreadES.execute(ddt);
}
System.out.println(String.format("[启动分发数据监听线程][完毕][共启动:%s个监听线程]", DELIVER_DATA_QUEUE_SIZE));
return true;
}
// 2.对源文件进行读取入队处理
public void readFile(File file) throws Exception {
System.out.println(String.format("[读取待排序文件][开始][大小:%sM]", file.length() / 1000 / 1000));
// 读监控线程
Thread monitor = new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
System.out.println(String.format("[读取待排序文件][已读:%s行]", hasReaderDataLine.get()));
Thread.sleep(5000);
}
} catch (Exception e) {
}
}
});
monitor.start();
long startTime = System.currentTimeMillis();
FileUtil util = new FileUtil(new FileUtilImpl() {
// 每读到一行,应该怎么处理
public void handlerLin(String line) {
hasReaderDataLine.incrementAndGet();
// 获取到每一行的数据然后入队!
deliverDataQueue.offer(line.trim()); // 这里必须得去换行
}
});
util.nioReadFile(file, BSIZE);
monitor.interrupt();
finishReadFileTime = (System.currentTimeMillis() - startTime) / 1000;
System.out.println(String.format("[读取待排序文件][完毕][一共读取:%S行][用时:%s秒]", hasReaderDataLine.get(), finishReadFileTime,
hasReaderDataLine.get()));
}
// 3.等待分发数据线程把数据分发完毕,然后把线程池里面的线程全部终止
public void waitForFinishWriteDataRange() throws IOException {
System.out.println("[数据分发][正在处理中]");
long cleanDeliverDataThreadStartTime = System.currentTimeMillis();
while (true) {
if (hasReaderDataLine.get() == hasDataRangeWriteLine.get()) {
// 对BW做结束,把内存中残余的数据写到文件
for (Map.Entry<Integer, BufferedWriter> entry : dataWriteMap.entrySet()) {
BufferedWriter bw = entry.getValue();
bw.close();
}
break;
}
}
deliverDataThreadES.shutdownNow();
dataRangeThreadES.shutdownNow();
finishDeliverFileTime = (System.currentTimeMillis() - cleanDeliverDataThreadStartTime) / 1000;
System.out.println(
String.format("[数据分发][完毕][已切割成:%s个待排序文件][用时:%s秒]", dataWriteMap.size(), finishDeliverFileTime));
}
// 4.排序
public void sort() throws IOException {
System.out.println(String.format("[排序][开始][待排序文件数量:%s个][并发排序数量:%s个]", dataWriteMap.size(), SORT_THREAD_SIZE));
long startTime = System.currentTimeMillis();
ExecutorService sortEs = Executors.newFixedThreadPool(SORT_THREAD_SIZE);// 排序线程池
for (Map.Entry<Integer, BufferedWriter> entry : dataWriteMap.entrySet()) {
int dataRange = entry.getKey();
SortThread st = new SortThread(dataRange);
sortEs.execute(st);
}
// 监听排序情况
Thread monitor = new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
System.out.println(String.format("[排序][已排好文件:%s个]", hasSortedFile.get()));
System.out.println(String.format("总共:[%s] 已读[%s] 已排:[%s]", hasReaderDataLine.get(),hasWaitSortedDataLine.get(),hasSortedDataLine.get()));
Thread.sleep(5000);
}
} catch (Exception e) {
}
}
});
monitor.start();
sortEs.shutdown();
while (true) {
if (sortEs.isTerminated()) {
finishSortFileTime = (System.currentTimeMillis() - startTime) / 1000;
System.out.println(String.format("[排序][完毕][已排好文件:%s个][已排好:%s行][用时:%s秒]", hasSortedFile.get(),
hasSortedDataLine.get(), finishSortFileTime));
break;
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// while(true){
// if(hasReaderDataLine.get() == hasSortedDataLine.get()){
// finishSortFileTime = (System.currentTimeMillis() - startTime) / 1000;
// System.out.println(String.format("[排序][完毕][已排好文件:%s个][已排好:%s行][用时:%s秒]",hasSortedFile.get(),hasSortedDataLine.get(),finishSortFileTime));
// break;
// }
// try {
// Thread.sleep(500);
// } catch (InterruptedException e)
// {
// e.printStackTrace();
// }
// }
// sortEs.shutdown();
monitor.interrupt();
}
// 5.合并
public void combine() throws IOException, InterruptedException {
System.out.println(String.format("[合并文件][开始][待合并文件数量:%s个]", dataWriteMap.size()));
// 监听合并情况
Thread monitor = new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
System.out.println(String.format("[合并文件][已合并文件:%s个]", hasCombineFile.get()));
Thread.sleep(5000);
}
} catch (Exception e) {
}
}
});
monitor.start();
File f = new File(DES_SORT_DATA_PATH);
String[] files = f.list();
// 对文件名称列表做排序,按顺序合并
List<Integer> fileList = new ArrayList<Integer>();
for (String s : files) {
fileList.add(Integer.valueOf(s.replaceAll(".txt", "")));
}
Collections.sort(fileList);
String[] mergeFiles = new String[fileList.size()];
for (int i = 0; i < fileList.size(); i++) {
mergeFiles[i] = DES_SORT_DATA_PATH + String.valueOf(fileList.get(i)) + ".txt";
// mergeFiles[i] = String.valueOf(fileList.get(i))+".txt";
}
long mergeStartTime = System.currentTimeMillis();
// 用java读写合并文件
combineFile(MERGE_FILE, mergeFiles);
// 用系统命令合并文件
// combineFileUseSysCom(MERGE_FILE,mergeFiles);
monitor.interrupt();
finishCombineFileTime = (System.currentTimeMillis() - mergeStartTime) / 1000;
System.out.println(String.format("[合并文件][完毕][待排序文件大小:%s][合并完成文件大小:%s][用时:%s秒]", new File(SRC_DATA).length(),
new File(MERGE_FILE).length(), finishCombineFileTime));
}
// 分配队列区间
public int getDataRange(String data) {
int dataRange = data.length();
if (isDeliverTen) {
if (dataRange != 1) {
String dr = data.substring(0, 1);
dataRange = Integer.valueOf(dataRange + "" + dr);
}
}
return dataRange;
}
/**
*
*
* 分发数据线程 1. 从分发数据队列中取数据 2. 获取该数据的位数 3. 根据位数,把该数据放到相应的数据区间队列中等待处理
*
*/
final static Object lock = new Object();
class DeliverDataThread extends Thread {
ConcurrentLinkedQueue<String> deliverDataQueue;
public DeliverDataThread(ConcurrentLinkedQueue<String> deliverDataQueue) {
this.deliverDataQueue = deliverDataQueue;
}
@Override
public void run() {
try {
while (true) {
String data = deliverDataQueue.poll();
if (data == null || data.equals("")) {
// 如果不休眠,当前线程会不停的循环,CPU都在当前线程上面,无法调度另外的线程.
Thread.sleep(0);
continue;
}
// 按照长度范围,把数据放入相关的区间队列
final int dataRange = getDataRange(data);
// 数据区间队列
// 对于2位数,分成10个队列
// 10-19 为1个队列,队列名称是21,20-29为1个队列,队列名称是22
// 对于3位数,分钟10个队列
// 100-199 为1个队列,队列名称是31,200-299为1个队列,队列名称是32以此类推
BufferedWriter bw = dataWriteMap.get(dataRange);
if (bw == null) {
synchronized (lock) {
bw = dataWriteMap.get(dataRange);
if (bw == null) {
// 产生相应的写入对象
bw = new BufferedWriter(new FileWriter(new File(DES_DATA_PATH + dataRange + ".txt")),
WRITE_SORT_BSIZE);
dataWriteMap.put(dataRange, bw);
}
}
}
synchronized (bw) {
bw.write(data);
bw.newLine();
// 增加已经处理的行数
hasDataRangeWriteLine.incrementAndGet();
}
}
} catch (InterruptedException e1) {
// System.out.println("结束分发线程:"+Thread.currentThread().getName()
// + "用时" + (System.currentTimeMillis() - startTime)/1000 +
// "S");
} catch (Exception e) {
e.printStackTrace();
}
}
}
class DeliverDataThread_bak extends Thread {
ConcurrentLinkedQueue<String> deliverDataQueue;
public DeliverDataThread_bak(ConcurrentLinkedQueue<String> deliverDataQueue) {
this.deliverDataQueue = deliverDataQueue;
}
@Override
public void run() {
long startTime = System.currentTimeMillis();
try {
while (true) {
String data = deliverDataQueue.poll();
if (data == null || data.equals("")) {
// 如果不休眠,当前线程会不停的循环,CPU都在当前线程上面,无法调度另外的线程.
Thread.sleep(0);
continue;
}
// 按照长度范围,把数据放入相关的区间队列
int dataRange = getDataRange(data);
// 数据区间队列
// 对于2位数,分成10个队列
// 10-19 为1个队列,队列名称是21,20-29为1个队列,队列名称是22
// 对于3位数,分钟10个队列
// 100-199 为1个队列,队列名称是31,200-299为1个队列,队列名称是32以此类推
ConcurrentLinkedQueue<String> dataRangQueue = dataRangMap.get(dataRange);
if (dataRangQueue == null) {
// 创建队列
dataRangQueue = new ConcurrentLinkedQueue<String>();
// 把当前队列放到MAP中,就可以根据数据位数直接拿到队列
dataRangMap.put(dataRange, dataRangQueue);
// 产生相应的写入对象
BufferedWriter bw = new BufferedWriter(
new FileWriter(new File(DES_DATA_PATH + dataRange + ".txt")), WRITE_SORT_BSIZE);
dataWriteMap.put(dataRange, bw);
// 启动数据区间队列的监听线程
DataRangeThread rq = new DataRangeThread(dataRange, dataRangQueue);
for (int j = 0; j < RANG_QUEUE_SIZE; j++) {
dataRangeThreadES.execute(rq);
}
}
// 按数据位数,把数据放到相应的队列中去
dataRangQueue.offer(data);
}
} catch (InterruptedException e1) {
// System.out.println("结束分发线程:"+Thread.currentThread().getName()
// + "用时" + (System.currentTimeMillis() - startTime)/1000 +
// "S");
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 数据区间写入线程
*
*
* 1. 从队列中获取相应的数据 2. 把该数据写入到相应的数据区间文件中去
*
*/
class DataRangeThread extends Thread {
ConcurrentLinkedQueue<String> dataRangQueue;
int rang;
public DataRangeThread(int rang, ConcurrentLinkedQueue<String> dataRangQueue) {
this.dataRangQueue = dataRangQueue;
this.rang = rang;
}
@Override
public void run() {
long startTime = System.currentTimeMillis();
try {
while (true) {
String data = dataRangQueue.poll();
if (data == null || data.equals("")) {
// 如果不休眠,当前线程会不停的循环,CPU都耗在当前线程上面,无法调度另外的线程.
Thread.sleep(0);
continue;
}
// 按照长度范围,把数据放入相关的区间队列
BufferedWriter bw = dataWriteMap.get(rang);
bw.write(data);
bw.newLine();
// 增加已经处理的行数
hasDataRangeWriteLine.incrementAndGet();
}
} catch (InterruptedException e1) {
// System.out.println("结束数据区间线程:"+rang+" " + "用时" +
// (System.currentTimeMillis() - startTime)/1000 + "S");
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 排序线程
*
*
*/
class SortThread extends Thread {
int dataRange;
public SortThread(int dataRange) {
this.dataRange = dataRange;
}
public void run() {
StringBuilder newlinesBui = null;
String lastLine = null;
try {
int lineCount = 0;
long startTime = System.currentTimeMillis();
long startTime2 = System.currentTimeMillis();
final List<BigInteger> data = new ArrayList<BigInteger>();
File dataFile = new File(DES_DATA_PATH + dataRange + ".txt");
if (!dataFile.exists()) {
return;
}
// 读入文件
FileUtil util = new FileUtil(new FileUtilImpl() {
// 每读到一行,应该怎么处理
public void handlerLin(String line) {
hasWaitSortedDataLine.incrementAndGet();
// 获取到每一行的数据放入集合等待排序
data.add(new BigInteger(line));
}
});
util.nioReadFile(dataFile, SORT_READER_BSIZE);
// util.randomReadFile(dataFile, SORT_READER_BSIZE);
String readEndTime = (System.currentTimeMillis() - startTime) / 1000 + "S";
// 排序
startTime = System.currentTimeMillis();
Collections.sort(data);
String sortTime = (System.currentTimeMillis() - startTime) / 1000 + "S";
// 写到文件
startTime = System.currentTimeMillis();
BufferedWriter bw = new BufferedWriter(
new FileWriter(new File(DES_SORT_DATA_PATH + dataRange + ".txt")), SORT_WRITE_BSIZE);
int i = 0;
for (BigInteger b : data) {
i = i++;
bw.write(b.toString());
bw.newLine();
hasSortedDataLine.incrementAndGet();
// lineCount++;
}
bw.close();
String writeTime = (System.currentTimeMillis() - startTime) / 1000 + "S";
hasSortedFile.incrementAndGet();
// System.out.println(String.format("数据区间[%s] [文件大小:%sM] 排序[%s]行
// 完成时间[%s] 读[%s] 排[%s] 写[%s]",
// dataRange,
// dataFile.length()/1000/1000,
// lineCount,
// (System.currentTimeMillis() - startTime2) / 1000 +"S"
// ,readEndTime,
// sortTime,
// writeTime));
} catch (Exception e) {
e.printStackTrace();
}
}
}
// Windos系统COPY合并程序
public void combineFileUseSysCom(String outFile, String[] files) throws IOException, InterruptedException {
StringBuffer sb = new StringBuffer();
for (String f : files) {
sb.append(f + "+");
}
String cmd = sb.substring(0, sb.length() - 1);
System.out.println(cmd);
String[] cmds = { "cmd", "/C", "copy", "/Y", cmd, MERGE_FILE.replaceAll("//", "\\\\") };
Process p = Runtime.getRuntime().exec(cmds, null, new File(DES_SORT_DATA_PATH.replaceAll("//", "\\\\")));
BufferedReader reader = new BufferedReader(new InputStreamReader(p.getInputStream()));
String line = reader.readLine();
while (line != null) {
line = reader.readLine();
System.out.println(line);
hasCombineFile.incrementAndGet();
}
p.waitFor();
}
// JAVA合并程序
public void combineFile(String outFile, String[] files) {
FileChannel outChannel = null;
try {
outChannel = new FileOutputStream(outFile).getChannel();
for (String f : files) {
FileChannel fc = new FileInputStream(f).getChannel();
ByteBuffer bb = ByteBuffer.allocate(MERGE_BSIZE);
while (fc.read(bb) != -1) {
bb.flip(); // 回绕缓冲区,索引重置为开头
outChannel.write(bb);
bb.clear();
}
fc.close();
hasCombineFile.incrementAndGet();
}
} catch (IOException ioe) {
ioe.printStackTrace();
} finally {
try {
if (outChannel != null) {
outChannel.close();
}
} catch (IOException ignore) {
}
}
}
/**
* 递归删除目录下的所有文件及子目录下所有文件
*
* @param dir
* 将要删除的文件目录
* @return boolean Returns "true" if all deletions were successful. If a
* deletion fails, the method stops attempting to delete and returns
* "false".
*/
private static boolean deleteDir(File dir) {
if (dir.isDirectory()) {
String[] children = dir.list();
for (int i = 0; i < children.length; i++) {
boolean success = deleteDir(new File(dir, children[i]));
if (!success) {
return false;
}
}
}
// 目录此时为空,可以删除
return dir.delete();
}
/************************************************* 读文件工具类 ***************************************/
interface FileUtilImpl {
public void handlerLin(String line);
}
class FileUtil implements FileUtilImpl {
FileUtilImpl impl;
public FileUtil(FileUtilImpl impl) {
this.impl = impl;
}
// 读到的行应该怎么处理
public void handlerLin(String line) {
impl.handlerLin(line);
}
// nio读文件
public void nioReadFile(File file, int SIZE) throws IOException {
String enterStr = "\n";
FileChannel inChannel = new FileInputStream(file).getChannel();
ByteBuffer buffer = ByteBuffer.allocate(SIZE);
StringBuilder newlinesBui = new StringBuilder();
while (inChannel.read(buffer) != -1) {
buffer.flip();
// ByteBuffer.array() 返回的 array 长度为 ByteBuffer
// allocate的长度,并不是里面所含的内容的长度
// 这样会导致,最后读取的肯定不是allocate的长度,但是array返回的带有上一次的冗余数据
// 解决办法如下,重新按照剩余容量来制作一个新的byte
byte[] contentBytes;
if (buffer.remaining() != buffer.capacity()) {
contentBytes = new byte[buffer.remaining()];
buffer.get(contentBytes, 0, contentBytes.length);
} else {
contentBytes = buffer.array();
}
String content = new String(contentBytes);
newlinesBui.append(content);
int fromIndex = 0;
int endIndex = -1;
// 循环找到 \n
String line;
while ((endIndex = newlinesBui.indexOf(enterStr, fromIndex)) > -1) {
// 得到一行
line = newlinesBui.substring(fromIndex, endIndex).trim();
if (line != null && !line.trim().equals("")) {
impl.handlerLin(line);
}
fromIndex = endIndex + 1;
}
newlinesBui.delete(0, fromIndex);
buffer.clear();
}
// 最后一行
String lastLine = newlinesBui.substring(0, newlinesBui.length()).trim();
if (lastLine != null && !lastLine.equals("")) {
impl.handlerLin(lastLine);
}
inChannel.close();
}
}
/************************************************* 读文件工具类 ***************************************/
/************************************************* 内存监控工具类 ***************************************/
static class Memory {
public static long getMaxHeapMemory() {
MemoryMXBean mmb = ManagementFactory.getMemoryMXBean();
return mmb.getHeapMemoryUsage().getMax();
}
public static long getInitHeapMemory() {
MemoryMXBean mmb = ManagementFactory.getMemoryMXBean();
return mmb.getHeapMemoryUsage().getInit();
}
public static long getUsedHeapMemory() {
MemoryMXBean mmb = ManagementFactory.getMemoryMXBean();
return mmb.getHeapMemoryUsage().getUsed();
}
public static void print() {
System.out.println(String.format("已经使用内存:[%sM] 剩余可用内存:[%sM]", Memory.getUsedHeapMemory() / 1024 / 1024,
((Memory.getMaxHeapMemory() / 1024 / 1024) - (Memory.getUsedHeapMemory() / 1024 / 1024))));
}
}
/************************************************* 内存监控工具类 ***************************************/
}
- 测试代码
package com.bingo;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.Random;
public class CreateFile {
public static Thread ramMonitorT = new Thread(new Runnable() {
@Override
public void run() {
try {
while(true){
System.out.println(String.format("已生成文件大小:[%sM]", line/30000));
Thread.sleep(2000);
}
} catch (Exception e) {
}
}
});
public static String SRC_DATA = "d://temp//bigdata/src/100m.txt";
public static int line = 0;
public static void main(String[] args) throws InterruptedException, IOException {
ramMonitorT.setDaemon(true);
ramMonitorT.start();
//待排序文件
if( (args.length > 0) && !args[0].equals("")){
SRC_DATA = args[0];
}
System.out.println("生成文件路径:"+SRC_DATA);
//文件大小
int m = 0;
if( (args.length > 1) && !args[1].equals("")){
m = Integer.valueOf(args[1]);
}
System.out.println("生成文件大小:"+m+"M");
BufferedWriter bw = new BufferedWriter(new FileWriter(SRC_DATA));
//文件大小,1M=30000行,100M = 300W行,1G=3000W行,24G=3000W*24
int fileSize = 30000*m;
for(int j = 0;j < fileSize;j++){
int rang = (int)(Math.random()*60)+1;
StringBuffer num = new StringBuffer();
for(int i = 0; i< rang ; i++){
if(i != 0){
num.append((int)(Math.random()*10));
}else{
num.append((int)(Math.random()*9)+1);
}
}
bw.write(num.toString());
bw.newLine();
line ++;
if(j % 10000 == 0){
bw.flush();
}
}
bw.close();
System.out.println("完!");
}
}