前言
笔者使用http上传大文件,采用分片上传的方式,下把大文件分割成等分的小文件上传到服务器目录再合并到写入大文件中,起初应对2G以内的文件都没有问题,随着文件增大出现问题了,文件上传的状态一直merging,即小文件都传上来了,却没有合并。
分析问题
考虑到合并文件是个耗时的操作,http会响应超时,就用启动一个线程异步上传,且合并是互斥的,也就是说如果已经在合并中就不能重复提交。为了简化模型,启用了一个单线程的线程池执行合并任务,合并开始前存入merging状态,后续相同的请求进来就会拒绝,不同的请求进来就会加入到线程池的阻塞队列。
分析内存
从本机开发环境开始调试,加入jmx监控,使用jconsole和VisualVM获取内存变化情况。
-server
-Xrs
-Xmx6G
-XX:NewSize=2G
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.authenticate=false
-Djava.rmi.server.hostname=127.0.0.1
-Dcom.sun.management.jmxremote.port=10028
如图红线圈中的部分是文件分片上传时期,黑线圈中的部分是服务器文件合并时期,明显看出分片上传时期新生代内存消耗大,频繁触发YGC;文件合并时期,内存曲线斜率变化不大与没有上传文件前相差不大,说明文件合并不会消耗多大的内存。
分析IO
由于jconsole和VisualVM无法监控磁盘IO,用的win7系统自带的磁盘监控工具。发现磁盘队列长度在合并期间长期维持在5左右,说明磁盘IO到达瓶颈。
分析线程dump
以上两项分析,说明合并文件只跟系统磁盘的IO有关系,但是去服务器上看IO情况,发现IO读写很低。说明文件合并请求很可能并没有执行。下面分析服务器jvm的线程dump。
> jstack -l pid > log.log
> vim log.log
"pool-2-thread-2" #64 prio=5 os_prio=0 tid=0x00007f931400b000 nid=0x7af5 waiting on condition [0x00007f93a8046000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006268ccbc0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- None
几次合并请求发现都没有执行,原来是这样的,单线程线程池,内部有个无界队列,线程忙时,就把请求放到阻塞队列中,实际上后续的合并请求并没有执行。因此看不到报错,也看不到任何反应。
创建线程执行
明白是线程池捣乱后,采用new Thread()的方式创建线程执行。结果发起合并请求日志中就暴露问题了。
Exception in thread "merge-thread-83" java.lang.StackOverflowError
at java.io.SequenceInputStream.close(SequenceInputStream.java:232)
at java.io.SequenceInputStream.nextStream(SequenceInputStream.java:106)
at java.io.SequenceInputStream.close(SequenceInputStream.java:232)
at java.io.SequenceInputStream.nextStream(SequenceInputStream.java:106)
at java.io.SequenceInputStream.close(SequenceInputStream.java:232)
at java.io.SequenceInputStream.nextStream(SequenceInputStream.java:106)
at java.io.SequenceInputStream.close(SequenceInputStream.java:232)
at java.io.SequenceInputStream.nextStream(SequenceInputStream.java:106)
at java.io.SequenceInputStream.close(SequenceInputStream.java:232)
at java.io.SequenceInputStream.nextStream(SequenceInputStream.java:106)
at java.io.SequenceInputStream.close(SequenceInputStream.java:232)
at java.io.SequenceInputStream.nextStream(SequenceInputStream.java:106)
at java.io.SequenceInputStream.close(SequenceInputStream.java:232)
at java.io.SequenceInputStream.nextStream(SequenceInputStream.java:106)
at java.io.SequenceInputStream.close(SequenceInputStream.java:232)
......
jvm虚拟机栈溢出了。开始真正的分析代码。
// 按照小文件的名称的数字顺序把小文件转成字节流,并放到SequenceInputStream内部实现的集合中,集合实际上是个Vector保存文件流的顺序。
SequenceInputStream s ;
InputStream s1 = new FileInputStream(mergePath + 0 + ext);
InputStream s2 = new FileInputStream(mergePath + 1 + ext);
s = new SequenceInputStream(s1, s2);
for (int i = 2; i < chunksNumber; i++) {
InputStream s3 = new FileInputStream(mergePath + i + ext);
s = new SequenceInputStream(s, s3);
}
//通过输出流向文件写入数据
String writeToPath = uploadFolderPath;
if (uploadFolderPath.endsWith(ConfigConstant.File_Separator)) {
writeToPath = uploadFolderPath.substring(0, uploadFolderPath.length()-1);
}
// 开始向写入大文件
StreamUtil.saveStreamToFile(s, writeToPath + ext);
StreamUtil.saveStreamToFile方法
public static void saveStreamToFile(@NotNull final InputStream inputStream,
@NotNull final String filePath)
throws Exception {
/*创建输出流,写入数据,合并分块*/
logger.info("{}: 文件‘{}’大文件流写入开始", Thread.currentThread().getName(), filePath);
OutputStream outputStream = new FileOutputStream(filePath);
byte[] buffer = new byte[1024];
int len = 0;
try {
while ((len = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, len);
outputStream.flush();
}
logger.info("{}: 文件‘{}’大文件流写入成功", Thread.currentThread().getName(), filePath);
} catch (Exception e) {
e.printStackTrace();
throw e;
} finally {
outputStream.close();
inputStream.close();
}
}
文件输入流时SequenceInputStream,其类结构如图:
SequenceInputStream调用了子类的read(b [])方法,子类调用了父类的read(byte b[], int off, int len)方法,如下图所示:
public int read(byte b[], int off, int len) throws IOException {
if (in == null) {
return -1;
} else if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
do {
int n = in.read(b, off, len);
if (n > 0) {
return n;
}
nextStream();
} while (in != null);
return -1;
}
读取一个小文件流末尾时,调用nextStream()方法获取下一个文件输入流。
final void nextStream() throws IOException {
if (in != null) {
in.close();
}
if (e.hasMoreElements()) {
in = (InputStream) e.nextElement();
if (in == null)
throw new NullPointerException();
}
else in = null;
}
nextStream()的第一步是关闭已经读完的文件流,但是这个SequenceInputStream的close()方法有问题。
public void close() throws IOException {
do {
nextStream();
} while (in != null);
}
这里是循环中带有递归方法,关闭输入流前,遍历非空的输入流,有点搞不懂作者的用意何在,这里的就是刚才栈溢出时压栈的方法。这就是问题的根源