前两篇已经详细说明了零拷贝的来源,各种实现的原理以及linux层的实现。这一篇讲解Java的零拷贝在NIO中的实现。Java的IO有面向流的IO和NIO,两种IO的区别此处就不做说明了。
场景一:将一个文件通过网络发送出去
Java传统方法数据传输
java传统方法的调用如下
File.read(fileDesc, buf, len);
Socket.send(socket, buf, len);
java传统方法的数据流
这个过程产生的系统消耗是:
- 4次数据copy
- 4次应用程序与内核之间上下文切换
Java层mmap
底层mmap的实现对应到Java层中FileChannel的map方法,但FileChannel实际上是一个抽象类,它的具体实现是FileChannelImpl(该类的源码,需要在openjdk中查看)
public MappedByteBuffer map(MapMode mode, long position, long size)
FileChannelImpl中map关键代码片段
int pagePosition = (int)(position % allocationGranularity);
long mapPosition = position - pagePosition;
long mapSize = size + pagePosition;
try {
// If no exception was thrown from map0, the address is valid
addr = map0(imode, mapPosition, mapSize);
} catch (OutOfMemoryError x) {
// An OutOfMemoryError may indicate that we've exhausted memory
// so force gc and re-attempt map
System.gc();
try {
Thread.sleep(100);
} catch (InterruptedException y) {
Thread.currentThread().interrupt();
}
try {
addr = map0(imode, mapPosition, mapSize);
} catch (OutOfMemoryError y) {
// After a second OOME, fail
throw new IOException("Map failed", y);
}
}
该方法中调用了map0方法,但是map0方法是一个native方法
private native long map0(int prot, long position, long length)throws IOException;
map0方法的源代码位于openjdk中FileChannelImpl.c文件中
JNIEXPORT jlong JNICALL
Java_sun_nio_ch_FileChannelImpl_map0(JNIEnv *env, jobject this,
jint prot, jlong off, jlong len)
{
void *mapAddress = 0;
jobject fdo = (*env)->GetObjectField(env, this, chan_fd);
jint fd = fdval(env, fdo);
int protections = 0;
int flags = 0;
if (prot == sun_nio_ch_FileChannelImpl_MAP_RO) {
protections = PROT_READ;
flags = MAP_SHARED;
} else if (prot == sun_nio_ch_FileChannelImpl_MAP_RW) {
protections = PROT_WRITE | PROT_READ;
flags = MAP_SHARED;
} else if (prot == sun_nio_ch_FileChannelImpl_MAP_PV) {
protections = PROT_WRITE | PROT_READ;
flags = MAP_PRIVATE;
}
mapAddress = mmap64(
0, /* Let OS decide location */
len, /* Number of bytes to map */
protections, /* File permissions */
flags, /* Changes are shared */
fd, /* File descriptor of mapped file */
off); /* Offset into file */
if (mapAddress == MAP_FAILED) {
if (errno == ENOMEM) {
JNU_ThrowOutOfMemoryError(env, "Map failed");
return IOS_THROWN;
}
return handle(env, -1, "Map failed");
}
return ((jlong) (unsigned long) mapAddress);
}
我们可以发现map0最后调用了mmap64,这东东貌似与mmap很像呀,其实它是mmap的一个宏定义
#define mmap64 mmap
到此为止,我们就明白了FileChannel.map实际上是调用的linux中的mmap来实现文件映射的,不过到这里并未结束,我们回到FileChannelImple.map中调用map0的地方
addr = map0(imode, mapPosition, mapSize);
文件映射成功后返回映射的起始地址addr
Unmapper um = new Unmapper(addr, mapSize, isize, mfd);
if ((!writable) || (imode == MAP_RO)) {
return Util.newMappedByteBufferR(isize,
addr + pagePosition,
mfd,
um);
} else {
return Util.newMappedByteBuffer(isize,
addr + pagePosition,
mfd,
um);
}
- 这里利用这四个参数addr、mapSize、isize、mfd创建了Unmapper类,Unmapper这个类的作用是用于unmap时进行内存释放,这里我们暂且不深入。
- 生成MappedByteBuffer,如果imode 为readonly那么创建newMappedByteBufferR,表示创建的这个区域只可读。
继续看newMappedByteBuffer的源码
static MappedByteBuffer newMappedByteBuffer(int size, long addr,
FileDescriptor fd,
Runnable unmapper)
{
MappedByteBuffer dbb;
if (directByteBufferConstructor == null)
initDBBConstructor();
try {
dbb = (MappedByteBuffer)directByteBufferConstructor.newInstance(
new Object[] { new Integer(size),
new Long(addr),
fd,
unmapper });
} catch (InstantiationException |
IllegalAccessException |
InvocationTargetException e) {
throw new InternalError(e);
}
return dbb;
}
利用directByteBufferConstructor生成了一个实例,看看directByteBufferConstructor是什么
private static void initDBBConstructor() {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
try {
Class<?> cl = Class.forName("java.nio.DirectByteBuffer");
Constructor<?> ctor = cl.getDeclaredConstructor(
new Class<?>[] { int.class,
long.class,
FileDescriptor.class,
Runnable.class });
ctor.setAccessible(true);
directByteBufferConstructor = ctor;
} catch (ClassNotFoundException |
NoSuchMethodException |
IllegalArgumentException |
ClassCastException x) {
throw new InternalError(x);
}
return null;
}});
}
原来directByteBufferConstructor是java.nio.DirectByteBuffer,所以最终创建的实例是DirectByteBuffer,但是返回的是MappedByteBuffer,MappedByteBuffer是DirectByteBuffer的父类。
这个过程产生的系统消耗是:
- 3次数据copy
- 4次应用程序与内核之间上下文切换
总结:FileChannel.map是java层的提供的文件映射方法,最终返回的是MappedByteBuffer类,MappedByteBuffer类是Java层提供给开发人员对文件映射内存访问和操作的统一视图,它封装了基地址addr、映射的数据size、文件描述符mfd、内存回收时的回调Unmapper
public MappedByteBuffer map(MapMode mode, long position, long size)
Java层sendfile
底层sendfile的实现对应到Java层中FileChannel的transferTo方法,但FileChannel实际上是一个抽象类,它的具体实现是FileChannelImpl(该类的源码,需要在openjdk中查看)
public long transferTo(long position, long count, WritableByteChannel target)
下面是transferTo的核心代码
public long transferTo(long position, long count,
WritableByteChannel target)
throws IOException
{
long n;
// Attempt a direct transfer, if the kernel supports it
if ((n = transferToDirectly(position, icount, target)) >= 0)
return n;
// Attempt a mapped transfer, but only to trusted channel types
if ((n = transferToTrustedChannel(position, icount, target)) >= 0)
return n;
// Slow path for untrusted targets
return transferToArbitraryChannel(position, icount, target);
}
这段核心代码包含了我们所描述的三种数据传输方式:
- 如果系统支持sendfile的方式,那么transferToDirectly方法中会调用sendfile进行发送,这种方式效率是最高的;否则
- transferToTrustedChannel中使用map的方式进行数据传输;否则
- 先将文件数据读到一个临时的DirectBuffer中,然后再将数据从这个临时的DirectBuffer写入到目标target中;
为了让我们的主线更清晰,我们还是回归到transferToDirectly的调用
transferToDirectly中调用transferTo0,它是一个native方法,截取核心代码片段如下:
JNIEXPORT jlong JNICALL
Java_sun_nio_ch_FileChannelImpl_transferTo0(JNIEnv *env, jobject this,
jint srcFD,
jlong position, jlong count,
jint dstFD)
{
result = sendfile(srcFD, dstFD, position, &numBytes, NULL, 0);
}
可以发现transferTo最终调用的是sendfile系统接口。
这个过程产生的系统消耗是:
- 1次数据copy
- 2次应用程序与内核之间上下文切换
总结
transferTo(带有DMA收集拷贝功能的sendfile)它与mmap的区别就是少了一次应用程序与内核之间上下文切换和2次数据copy,但是它们的使用场景是不一样的:
- transferTo:适用于应用程序无需对文件数据进行任何操作的场景;
- map:适用于应用程序需要操作文件数据的场景;
场景二:将应用程序中的内存中的数据通过网络发送出去(非磁盘上的)
场景一与场景二的主要区别就是文件数据和内存中的数据的区别,现在我们需要去阅读以下Nio中ChannelSocket的源码,
同样的ChannelSocket的实现是SocketChannelImpl(也是在OpenJDK中),它的write方法有两个:
public int write(ByteBuffer buf)
public long write(ByteBuffer[] srcs, int offset, int length)
我们主要关注public int write(ByteBuffer buf)方法
public int write(ByteBuffer buf) throws IOException {
if (buf == null)
throw new NullPointerException();
synchronized (writeLock) {
ensureWriteOpen();
int n = 0;
try {
begin();
synchronized (stateLock) {
if (!isOpen())
return 0;
writerThread = NativeThread.current();
}
for (;;) {
n = IOUtil.write(fd, buf, -1, nd);
if ((n == IOStatus.INTERRUPTED) && isOpen())
continue;
return IOStatus.normalize(n);
}
} finally {
writerCleanup();
end(n > 0 || (n == IOStatus.UNAVAILABLE));
synchronized (stateLock) {
if ((n <= 0) && (!isOutputOpen))
throw new AsynchronousCloseException();
}
assert IOStatus.check(n);
}
}
}
该方法的核心是IOUtil.write(fd, buf, -1, nd);
static int write(FileDescriptor fd, ByteBuffer src, long position,
NativeDispatcher nd)
throws IOException
{
if (src instanceof DirectBuffer)
return writeFromNativeBuffer(fd, src, position, nd);
// Substitute a native buffer
int pos = src.position();
int lim = src.limit();
assert (pos <= lim);
int rem = (pos <= lim ? lim - pos : 0);
ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
try {
bb.put(src);
bb.flip();
// Do not update src until we see how many bytes were written
src.position(pos);
int n = writeFromNativeBuffer(fd, bb, position, nd);
if (n > 0) {
// now update src
src.position(pos + n);
}
return n;
} finally {
Util.offerFirstTemporaryDirectBuffer(bb);
}
}
IOUtil.write中的逻辑分为两个部分:
- 如果src为DirectBuffer,那么就直接调用writeFromNativeBuffer;
- 否则src为一个HeapBuffer,先通过getTemporaryDirectBuffer创建一个临时的DirectBuffer,然后将HeapBuffer中的数据拷贝到这个临时的DirectBuffer,最后再调用writeFromNativeBuffer发送数据;
private static int writeFromNativeBuffer(FileDescriptor fd, ByteBuffer bb,
long position, NativeDispatcher nd)
throws IOException
{
int pos = bb.position();
int lim = bb.limit();
assert (pos <= lim);
int rem = (pos <= lim ? lim - pos : 0);
int written = 0;
if (rem == 0)
return 0;
if (position != -1) {
written = nd.pwrite(fd,
((DirectBuffer)bb).address() + pos,
rem, position);
} else {
written = nd.write(fd, ((DirectBuffer)bb).address() + pos, rem);
}
if (written > 0)
bb.position(pos + written);
return written;
}
调用nd.write,这个nd其实是SocketDispatcher,SocketDispatcher的write方法
int write(FileDescriptor fd, long address, int len) throws IOException {
return FileDispatcherImpl.write0(fd, address, len);
}
最终调用了FileDescriptor的write0,write0是一个native方法
static native int write0(FileDescriptor fd, long address, int len)
throws IOException;
场景三:从网络读数据到Java应用程序
同样的ChannelSocket的实现是SocketChannelImpl(也是在OpenJDK中),它的read方法也有两个:
public int read(ByteBuffer buf)
public long read(ByteBuffer[] dsts, int offset, int length)
我们主要关注public int read(ByteBuffer buf)方法,该方法的核心是调用IOUtil.read方法:
n = IOUtil.read(fd, buf, -1, nd);
static int read(FileDescriptor fd, ByteBuffer dst, long position,
NativeDispatcher nd)
throws IOException
{
if (dst.isReadOnly())
throw new IllegalArgumentException("Read-only buffer");
if (dst instanceof DirectBuffer)
return readIntoNativeBuffer(fd, dst, position, nd);
// Substitute a native buffer
ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining());
try {
int n = readIntoNativeBuffer(fd, bb, position, nd);
bb.flip();
if (n > 0)
dst.put(bb);
return n;
} finally {
Util.offerFirstTemporaryDirectBuffer(bb);
}
}
我们发现它和write方法类似,中间都要经过DirectByteBuffer,不同的是它的方向是read(Linux->DirectByteBuffer->HeapByteBuffer)
同样的如果你去阅读FileChannelImpl的write和read方法,它们同样是调用IOUtil.write和IOUtil.read方法。
总结:
之所以要将场景二和场景三进行分析,是为了引出Java中的堆内内存HeapByteBuffer和堆外内存DirectByteBuffer。
通过上面的的分析,可以确定DirectByteBuffer比HeapByteBuffer的效率高,因为在SocketChannelImpl的分析中,它的拷贝少了一次。
那么还有如下疑问?
- DirectByteBuffer和HeapByteBuffer是什么?
- DirectByteBuffer、MappedByteBuffer是什么关系?
- DirectByteBuffer和map都能减少一次数据的拷贝,它们有什么区别呢?
这个我们将在下一篇文章中进行讲解。