Java零拷贝三步曲——Java层的实现

前两篇已经详细说明了零拷贝的来源,各种实现的原理以及linux层的实现。这一篇讲解Java的零拷贝在NIO中的实现。Java的IO有面向流的IO和NIO,两种IO的区别此处就不做说明了。

场景一:将一个文件通过网络发送出去

Java传统方法数据传输

java传统方法的调用如下

File.read(fileDesc, buf, len);
Socket.send(socket, buf, len);

java传统方法的数据流

image.png

这个过程产生的系统消耗是:

  • 4次数据copy
  • 4次应用程序与内核之间上下文切换

Java层mmap

底层mmap的实现对应到Java层中FileChannel的map方法,但FileChannel实际上是一个抽象类,它的具体实现是FileChannelImpl(该类的源码,需要在openjdk中查看)

public MappedByteBuffer map(MapMode mode, long position, long size)

FileChannelImpl中map关键代码片段

int pagePosition = (int)(position % allocationGranularity);
long mapPosition = position - pagePosition;
long mapSize = size + pagePosition;
try {
    // If no exception was thrown from map0, the address is valid
    addr = map0(imode, mapPosition, mapSize);
} catch (OutOfMemoryError x) {
    // An OutOfMemoryError may indicate that we've exhausted memory
    // so force gc and re-attempt map
    System.gc();
    try {
        Thread.sleep(100);
    } catch (InterruptedException y) {
        Thread.currentThread().interrupt();
    }
    try {
        addr = map0(imode, mapPosition, mapSize);
    } catch (OutOfMemoryError y) {
        // After a second OOME, fail
        throw new IOException("Map failed", y);
    }
}

该方法中调用了map0方法,但是map0方法是一个native方法

private native long map0(int prot, long position, long length)throws IOException;

map0方法的源代码位于openjdk中FileChannelImpl.c文件中

JNIEXPORT jlong JNICALL
Java_sun_nio_ch_FileChannelImpl_map0(JNIEnv *env, jobject this,
                                     jint prot, jlong off, jlong len)
{
    void *mapAddress = 0;
    jobject fdo = (*env)->GetObjectField(env, this, chan_fd);
    jint fd = fdval(env, fdo);
    int protections = 0;
    int flags = 0;

    if (prot == sun_nio_ch_FileChannelImpl_MAP_RO) {
        protections = PROT_READ;
        flags = MAP_SHARED;
    } else if (prot == sun_nio_ch_FileChannelImpl_MAP_RW) {
        protections = PROT_WRITE | PROT_READ;
        flags = MAP_SHARED;
    } else if (prot == sun_nio_ch_FileChannelImpl_MAP_PV) {
        protections =  PROT_WRITE | PROT_READ;
        flags = MAP_PRIVATE;
    }

    mapAddress = mmap64(
        0,                    /* Let OS decide location */
        len,                  /* Number of bytes to map */
        protections,          /* File permissions */
        flags,                /* Changes are shared */
        fd,                   /* File descriptor of mapped file */
        off);                 /* Offset into file */

    if (mapAddress == MAP_FAILED) {
        if (errno == ENOMEM) {
            JNU_ThrowOutOfMemoryError(env, "Map failed");
            return IOS_THROWN;
        }
        return handle(env, -1, "Map failed");
    }

    return ((jlong) (unsigned long) mapAddress);
}

我们可以发现map0最后调用了mmap64,这东东貌似与mmap很像呀,其实它是mmap的一个宏定义

#define mmap64 mmap

到此为止,我们就明白了FileChannel.map实际上是调用的linux中的mmap来实现文件映射的,不过到这里并未结束,我们回到FileChannelImple.map中调用map0的地方

addr = map0(imode, mapPosition, mapSize);

文件映射成功后返回映射的起始地址addr

Unmapper um = new Unmapper(addr, mapSize, isize, mfd);
if ((!writable) || (imode == MAP_RO)) {
    return Util.newMappedByteBufferR(isize,
                                     addr + pagePosition,
                                     mfd,
                                     um);
} else {
    return Util.newMappedByteBuffer(isize,
                                    addr + pagePosition,
                                    mfd,
                                    um);
}
  1. 这里利用这四个参数addr、mapSize、isize、mfd创建了Unmapper类,Unmapper这个类的作用是用于unmap时进行内存释放,这里我们暂且不深入。
  2. 生成MappedByteBuffer,如果imode 为readonly那么创建newMappedByteBufferR,表示创建的这个区域只可读。
    继续看newMappedByteBuffer的源码
static MappedByteBuffer newMappedByteBuffer(int size, long addr,
                                                FileDescriptor fd,
                                                Runnable unmapper)
{
    MappedByteBuffer dbb;
    if (directByteBufferConstructor == null)
        initDBBConstructor();
    try {
        dbb = (MappedByteBuffer)directByteBufferConstructor.newInstance(
          new Object[] { new Integer(size),
                         new Long(addr),
                         fd,
                         unmapper });
    } catch (InstantiationException |
             IllegalAccessException |
             InvocationTargetException e) {
        throw new InternalError(e);
    }
    return dbb;
}

利用directByteBufferConstructor生成了一个实例,看看directByteBufferConstructor是什么

private static void initDBBConstructor() {
        AccessController.doPrivileged(new PrivilegedAction<Void>() {
                public Void run() {
                    try {
                        Class<?> cl = Class.forName("java.nio.DirectByteBuffer");
                        Constructor<?> ctor = cl.getDeclaredConstructor(
                            new Class<?>[] { int.class,
                                             long.class,
                                             FileDescriptor.class,
                                             Runnable.class });
                        ctor.setAccessible(true);
                        directByteBufferConstructor = ctor;
                    } catch (ClassNotFoundException   |
                             NoSuchMethodException    |
                             IllegalArgumentException |
                             ClassCastException x) {
                        throw new InternalError(x);
                    }
                    return null;
                }});
        }

原来directByteBufferConstructor是java.nio.DirectByteBuffer,所以最终创建的实例是DirectByteBuffer,但是返回的是MappedByteBuffer,MappedByteBuffer是DirectByteBuffer的父类。

image.png

这个过程产生的系统消耗是:

  • 3次数据copy
  • 4次应用程序与内核之间上下文切换
总结:FileChannel.map是java层的提供的文件映射方法,最终返回的是MappedByteBuffer类,MappedByteBuffer类是Java层提供给开发人员对文件映射内存访问和操作的统一视图,它封装了基地址addr、映射的数据size、文件描述符mfd、内存回收时的回调Unmapper
public MappedByteBuffer map(MapMode mode, long position, long size)

Java层sendfile

底层sendfile的实现对应到Java层中FileChannel的transferTo方法,但FileChannel实际上是一个抽象类,它的具体实现是FileChannelImpl(该类的源码,需要在openjdk中查看)

public long transferTo(long position, long count, WritableByteChannel target)

下面是transferTo的核心代码

public long transferTo(long position, long count,
                           WritableByteChannel target)
        throws IOException
{
    long n;

    // Attempt a direct transfer, if the kernel supports it
    if ((n = transferToDirectly(position, icount, target)) >= 0)
        return n;

    // Attempt a mapped transfer, but only to trusted channel types
    if ((n = transferToTrustedChannel(position, icount, target)) >= 0)
        return n;

    // Slow path for untrusted targets
    return transferToArbitraryChannel(position, icount, target);
}

这段核心代码包含了我们所描述的三种数据传输方式:

  1. 如果系统支持sendfile的方式,那么transferToDirectly方法中会调用sendfile进行发送,这种方式效率是最高的;否则
  2. transferToTrustedChannel中使用map的方式进行数据传输;否则
  3. 先将文件数据读到一个临时的DirectBuffer中,然后再将数据从这个临时的DirectBuffer写入到目标target中;

为了让我们的主线更清晰,我们还是回归到transferToDirectly的调用
transferToDirectly中调用transferTo0,它是一个native方法,截取核心代码片段如下:

JNIEXPORT jlong JNICALL
Java_sun_nio_ch_FileChannelImpl_transferTo0(JNIEnv *env, jobject this,
                                            jint srcFD,
                                            jlong position, jlong count,
                                            jint dstFD)
{
    result = sendfile(srcFD, dstFD, position, &numBytes, NULL, 0);                                          
}

可以发现transferTo最终调用的是sendfile系统接口。


image.png

这个过程产生的系统消耗是:

  • 1次数据copy
  • 2次应用程序与内核之间上下文切换

总结

transferTo(带有DMA收集拷贝功能的sendfile)它与mmap的区别就是少了一次应用程序与内核之间上下文切换和2次数据copy,但是它们的使用场景是不一样的:

  • transferTo:适用于应用程序无需对文件数据进行任何操作的场景;
  • map:适用于应用程序需要操作文件数据的场景;

场景二:将应用程序中的内存中的数据通过网络发送出去(非磁盘上的)

场景一与场景二的主要区别就是文件数据和内存中的数据的区别,现在我们需要去阅读以下Nio中ChannelSocket的源码,
同样的ChannelSocket的实现是SocketChannelImpl(也是在OpenJDK中),它的write方法有两个:

public int write(ByteBuffer buf)
public long write(ByteBuffer[] srcs, int offset, int length)    

我们主要关注public int write(ByteBuffer buf)方法

public int write(ByteBuffer buf) throws IOException {
        if (buf == null)
            throw new NullPointerException();
        synchronized (writeLock) {
            ensureWriteOpen();
            int n = 0;
            try {
                begin();
                synchronized (stateLock) {
                    if (!isOpen())
                        return 0;
                    writerThread = NativeThread.current();
                }
                for (;;) {
                    n = IOUtil.write(fd, buf, -1, nd);
                    if ((n == IOStatus.INTERRUPTED) && isOpen())
                        continue;
                    return IOStatus.normalize(n);
                }
            } finally {
                writerCleanup();
                end(n > 0 || (n == IOStatus.UNAVAILABLE));
                synchronized (stateLock) {
                    if ((n <= 0) && (!isOutputOpen))
                        throw new AsynchronousCloseException();
                }
                assert IOStatus.check(n);
            }
        }
    }

该方法的核心是IOUtil.write(fd, buf, -1, nd);

static int write(FileDescriptor fd, ByteBuffer src, long position,
                     NativeDispatcher nd)
        throws IOException
{
    if (src instanceof DirectBuffer)
        return writeFromNativeBuffer(fd, src, position, nd);

    // Substitute a native buffer
    int pos = src.position();
    int lim = src.limit();
    assert (pos <= lim);
    int rem = (pos <= lim ? lim - pos : 0);
    ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
    try {
        bb.put(src);
        bb.flip();
        // Do not update src until we see how many bytes were written
        src.position(pos);

        int n = writeFromNativeBuffer(fd, bb, position, nd);
        if (n > 0) {
            // now update src
            src.position(pos + n);
        }
        return n;
    } finally {
        Util.offerFirstTemporaryDirectBuffer(bb);
    }
}

IOUtil.write中的逻辑分为两个部分:

  1. 如果src为DirectBuffer,那么就直接调用writeFromNativeBuffer;
  2. 否则src为一个HeapBuffer,先通过getTemporaryDirectBuffer创建一个临时的DirectBuffer,然后将HeapBuffer中的数据拷贝到这个临时的DirectBuffer,最后再调用writeFromNativeBuffer发送数据;
private static int writeFromNativeBuffer(FileDescriptor fd, ByteBuffer bb,
                                             long position, NativeDispatcher nd)
        throws IOException
{
    int pos = bb.position();
    int lim = bb.limit();
    assert (pos <= lim);
    int rem = (pos <= lim ? lim - pos : 0);

    int written = 0;
    if (rem == 0)
        return 0;
    if (position != -1) {
        written = nd.pwrite(fd,
                            ((DirectBuffer)bb).address() + pos,
                            rem, position);
    } else {
        written = nd.write(fd, ((DirectBuffer)bb).address() + pos, rem);
    }
    if (written > 0)
        bb.position(pos + written);
    return written;
}

调用nd.write,这个nd其实是SocketDispatcher,SocketDispatcher的write方法

int write(FileDescriptor fd, long address, int len) throws IOException {
        return FileDispatcherImpl.write0(fd, address, len);
}

最终调用了FileDescriptor的write0,write0是一个native方法

static native int write0(FileDescriptor fd, long address, int len)
        throws IOException;

场景三:从网络读数据到Java应用程序

同样的ChannelSocket的实现是SocketChannelImpl(也是在OpenJDK中),它的read方法也有两个:

public int read(ByteBuffer buf)
public long read(ByteBuffer[] dsts, int offset, int length)   

我们主要关注public int read(ByteBuffer buf)方法,该方法的核心是调用IOUtil.read方法:

n = IOUtil.read(fd, buf, -1, nd);
static int read(FileDescriptor fd, ByteBuffer dst, long position,
                    NativeDispatcher nd)
        throws IOException
    {
        if (dst.isReadOnly())
            throw new IllegalArgumentException("Read-only buffer");
        if (dst instanceof DirectBuffer)
            return readIntoNativeBuffer(fd, dst, position, nd);

        // Substitute a native buffer
        ByteBuffer bb = Util.getTemporaryDirectBuffer(dst.remaining());
        try {
            int n = readIntoNativeBuffer(fd, bb, position, nd);
            bb.flip();
            if (n > 0)
                dst.put(bb);
            return n;
        } finally {
            Util.offerFirstTemporaryDirectBuffer(bb);
        }
    }

我们发现它和write方法类似,中间都要经过DirectByteBuffer,不同的是它的方向是read(Linux->DirectByteBuffer->HeapByteBuffer)

同样的如果你去阅读FileChannelImpl的write和read方法,它们同样是调用IOUtil.write和IOUtil.read方法。

总结:

之所以要将场景二和场景三进行分析,是为了引出Java中的堆内内存HeapByteBuffer和堆外内存DirectByteBuffer。
通过上面的的分析,可以确定DirectByteBuffer比HeapByteBuffer的效率高,因为在SocketChannelImpl的分析中,它的拷贝少了一次。

那么还有如下疑问?

  1. DirectByteBuffer和HeapByteBuffer是什么?
  2. DirectByteBuffer、MappedByteBuffer是什么关系?
  3. DirectByteBuffer和map都能减少一次数据的拷贝,它们有什么区别呢?

这个我们将在下一篇文章中进行讲解。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,734评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,931评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,133评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,532评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,585评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,462评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,262评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,153评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,587评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,792评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,919评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,635评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,237评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,855评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,983评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,048评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,864评论 2 354

推荐阅读更多精彩内容