Java NIO实现原理之Buffer

nio是基于事件驱动模型的非阻塞io,这篇文章简要介绍了nio,本篇主要介绍Buffer的实现原理。

Buffer

是一块缓冲区,通常使用buffer读写数据为:

RandomAccessFile aFile = new RandomAccessFile("data/nio-data.txt", "rw");
FileChannel inChannel = aFile.getChannel();

//1.create buffer with capacity of 48 bytes
ByteBuffer buf = ByteBuffer.allocate(48);
//2.write into buffer
int bytesRead = inChannel.read(buf);
while (bytesRead != -1) {
//3.make buffer from write mode to read mode
  buf.flip();  

  while(buf.hasRemaining()){
    //4. read 1 byte from buffer
      System.out.print((char) buf.get()); 
  }
//5.调用clear()方法或者compact()方法,make buffer ready for writing
  buf.clear(); 
  bytesRead = inChannel.read(buf);
}
aFile.close();

Buffer的数据结构设计如下:

Buffer数据结构.png

其中:
capacity:buffer的固定大小值
position:在写模式下,表示当前写入数据的位置。在读模式下,表示当前已读到数据的位置
limit:在写模式下,表示最大可写的位置,为capacity ,在读模式下,表示最大可读位置。
此外,Buffer类中还有以下参数:
mark:初始值为-1,用于备份当前的position。
address:buffer对象持有的堆外内存(DirectByteBuffer)的内存地址,方便JNI 快速找到堆外内存地址。
Buffer相关的类结构如下:
Buffer类结构.png

Buffer或ByteBuffer的方法简介:
1.Buffer的分配:

//分配一份堆内内存
public static ByteBuffer allocate(int capacity) {
        if (capacity < 0)
            throw new IllegalArgumentException();
        return new HeapByteBuffer(capacity, capacity);
    }
//分配一份堆外内存
public static ByteBuffer allocateDirect(int capacity) {
        return new DirectByteBuffer(capacity);
    }

2.向buffer写入数据:

//各种put方法,也可从channel中读取,inChannel.read(buf)
put(....)

3.从buffer中读取数据

//也可以这样写入channel。 int bytesWritten = inChannel.write(buf);
get()

4.flip():将Buffer从写模式切换到读模式

public final Buffer flip() {
        limit = position;
        position = 0;
        mark = -1;
        return this;
    }

5.rewind():将position设回0,所以你可以重读Buffer中的所有数据

public final Buffer rewind() {
        position = 0;
        mark = -1;
        return this;
 }

6.clear()与compact():一旦读完Buffer中的数据,需要让Buffer准备好再次被写入。可以通过clear()或compact()方法来完成。
compact()方法将所有未读的数据拷贝到Buffer起始处。然后将position设到最后一个未读元素正后面。limit属性依然像clear()方法一样,设置成capacity。现在Buffer准备好写数据了,但是不会覆盖未读的数据。

 public final Buffer clear() {
        position = 0;
        limit = capacity;
        mark = -1;
        return this;
    }

7.mark()与reset()
通过调用Buffer.mark()方法,可以标记Buffer中的一个特定position。之后可以通过调用Buffer.reset()方法恢复到这个position。方便数据的重新读取,而流只能是单向读取。

ByteBuffer的两种实现:
HeapByteBuffer:Java中分配的非空对象都是由Java虚拟机的垃圾收集器管理的,也称为堆内内存(on-heap memory)。虚拟机会定期对垃圾内存进行回收,在某些特定的时间点,它会进行一次彻底的回收(full gc)。彻底回收时,垃圾收集器会对所有分配的堆内内存进行完整的扫描,这意味着一个重要的事实——这样一次垃圾收集对Java应用造成的影响,跟堆的大小是成正比的。过大的堆会影响Java应用的性能。JVM参数中 -Xmx的值是新生代和老生代的和的最大值,我们在jvm参数里通常还会加一个参数-XX:MaxPermSize来指定持久代的最大值,那么我们认识的Java堆的最大值其实是-Xmx和-XX:MaxPermSize的总和,在分代算法下,新生代,老生代和持久代是连续的虚拟地址,因为它们是一起分配的。
DirectByteBuffer:由该对象创建的在jvm之外的内存,对于生命期中等或较长的对象,正是堆外内存要解决的。堆外内存有以下特点:
对于大内存有良好的伸缩性
对垃圾回收停顿的改善可以明显感觉到
在进程间可以共享,减少虚拟机间的复制
接下来看一下DirectByteBuffer的实现:

 DirectByteBuffer(int cap) {                   // package-private

        super(-1, 0, cap, cap);
        boolean pa = VM.isDirectMemoryPageAligned();
        int ps = Bits.pageSize();
        long size = Math.max(1L, (long)cap + (pa ? ps : 0));
       //确认堆外内存是否够用
        Bits.reserveMemory(size, cap);

        long base = 0;
        try {
           //分配堆外内存,并返回堆外内存的基地址
            base = unsafe.allocateMemory(size);
        } catch (OutOfMemoryError x) {
            Bits.unreserveMemory(size, cap);
            throw x;
        }
        unsafe.setMemory(base, size, (byte) 0);
        if (pa && (base % ps != 0)) {
            // Round up to page boundary
            address = base + ps - (base & (ps - 1));
        } else {
            address = base;
        }
        cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
        att = null;



    }

Bits.reserveMemory(size, cap) 方法,该方法用于在系统中保存总分配内存(按页分配)的大小和实际内存的大小。

 // These methods should be called whenever direct memory is allocated or
    // freed.  They allow the user to control the amount of direct memory
    // which a process may access.  All sizes are specified in bytes.
    static void reserveMemory(long size, int cap) {

        if (!memoryLimitSet && VM.isBooted()) {
            maxMemory = VM.maxDirectMemory();
            memoryLimitSet = true;
        }

        // optimist!内存够用!
        if (tryReserveMemory(size, cap)) {
            return;
        }
  
        final JavaLangRefAccess jlra = SharedSecrets.getJavaLangRefAccess();

        // retry while helping enqueue pending Reference objects
        // which includes executing pending Cleaner(s) which includes
        // Cleaner(s) that free direct buffer memory
        while (jlra.tryHandlePendingReference()) {
            if (tryReserveMemory(size, cap)) {
                return;
            }
        }

        // trigger VM's Reference processing
        System.gc();

        // a retry loop with exponential back-off delays
        // (this gives VM some time to do it's job)
        boolean interrupted = false;
        try {
            long sleepTime = 1;
            int sleeps = 0;
            while (true) {
                if (tryReserveMemory(size, cap)) {
                    return;
                }
                if (sleeps >= MAX_SLEEPS) {
                    break;
                }
                if (!jlra.tryHandlePendingReference()) {
                    try {
                        Thread.sleep(sleepTime);
                        sleepTime <<= 1;
                        sleeps++;
                    } catch (InterruptedException e) {
                        interrupted = true;
                    }
                }
            }

            // no luck
            throw new OutOfMemoryError("Direct buffer memory");

        } finally {
            if (interrupted) {
                // don't swallow interrupts
                Thread.currentThread().interrupt();
            }
        }
    }

如果堆内存不够分配的话,jlra.tryHandlePendingReference()将触发一次非阻塞的Reference#tryHandlePending(false),该方法会将已经被JVM垃圾回收的DirectBuffer对象的堆外内存释放。
如果还是无法释放足够的内存,将会触发System.gc(),该方法会触发一个full gc,如果JVM参数没有设置-XX:+DisableExplicitGC。但是调用System.gc()并不能够保证full gc马上就能被执行。所以在后面打代码中,会进行最多9次尝试,看是否有足够的可用堆外内存来分配堆外内存。并且每次尝试之前,都对延迟等待时间,已给JVM足够的时间去完成full gc操作。如果9次尝试后依旧没有足够的可用堆外内存来分配本次堆外内存,则抛出OutOfMemoryError("Direct buffer memory”)异常。
之所以用使用full gc的很重要的一个原因是:System.gc()会对新生代和老生代都进行内存回收,这样会比较彻底地回收DirectByteBuffer对象以及他们关联的堆外内存。
有两个问题:堆外内存是多大?
代码中maxMemory = VM.maxDirectMemory();

private static long directMemory = 64 * 1024 * 1024; //64MB
public static long maxDirectMemory() {
        return directMemory;
    }

实际上在 JVM启动时,会对System做初始化,实际上堆外内存的大小设置逻辑为:
如果通过-Dsun.nio.MaxDirectMemorySize指定了这个属性,只要它不等于-1,那效果和加了-XX:MaxDirectMemorySize一样的,如果两个参数都没指定,那么最大堆外内存的值来自于directMemory = Runtime.getRuntime().maxMemory()。
其中在我们使用CMS GC的情况下的实现如下,其实是新生代的最大值-一个survivor的大小+老生代的最大值,也就是我们设置的-Xmx的值里除去一个survivor的大小就是默认的堆外内存的大小。

堆外内存的回收机制是什么?
Cleaner是PhantomReference的子类,并通过自身的next和prev字段维护的一个双向链表PhantomReference的作用在于跟踪垃圾回收过程,并不会对对象的垃圾回收过程造成任何的影响。
DirectByteBuffer对象在创建的时候关联了一个Cleaner,(cleaner = Cleaner.create(this, new Deallocator(base, size, cap));)说到PhantomReference它其实主要是用来跟踪对象何时被回收的,它不能影响gc决策,但是gc过程中如果发现某个对象除了只有PhantomReference引用它之外,并没有其他的地方引用它了,那将会把这个引用放到java.lang.ref.Reference.pending队列里,在gc完毕的时候通知ReferenceHandler这个守护线程去执行一些后置处理,而DirectByteBuffer关联的cleaner是PhantomReference的一个子类,在最终的处理里会通过Unsafe的free接口来释放DirectByteBuffer对应的堆外内存块.

private static class ReferenceHandler extends Thread {

        ReferenceHandler(ThreadGroup g, String name) {
            super(g, name);
        }

        public void run() {
            for (;;) {

                Reference r;
                synchronized (lock) {
                    if (pending != null) {
                        r = pending;
                        Reference rn = r.next;
                        pending = (rn == r) ? null : rn;
                        r.next = r;
                    } else {
                        try {
                            lock.wait();
                        } catch (InterruptedException x) { }
                        continue;
                    }
                }

                // Fast path for cleaners
                if (r instanceof Cleaner) {
                    //
                    ((Cleaner)r).clean();
                    continue;
                }

                ReferenceQueue q = r.queue;
                if (q != ReferenceQueue.NULL) q.enqueue(r);
            }
        }
    }

//如果System.gc();被禁止,也会触发堆外内存的回收
Reference#tryHandlePending(false)

static boolean tryHandlePending(boolean var0) {
        Reference var1;
        Cleaner var2;
        try {
            Reference.Lock var3 = lock;
            synchronized(lock) {
                if (pending == null) {
                    if (var0) {
                        lock.wait();
                    }

                    return var0;
                }

                var1 = pending;
              //cleaner对象
                var2 = var1 instanceof Cleaner ? (Cleaner)var1 : null;
                pending = var1.discovered;
                var1.discovered = null;
            }
        } catch (OutOfMemoryError var6) {
            Thread.yield();
            return true;
        } catch (InterruptedException var7) {
            return true;
        }

        if (var2 != null) {
          //clean方法回收
            var2.clean();
            return true;
        } else {
            ReferenceQueue var8 = var1.queue;
            if (var8 != ReferenceQueue.NULL) {
                var8.enqueue(var1);
            }

            return true;
        }
    }

public void clean() {
//将当前Cleaner从Cleaner链表中移除,这样当clean()执行完后,Cleaner就是一个无引用指向的对象了,也就是可被GC回收的对象
        if (remove(this)) {
            try {
               //thunk 在directByteBuffer 是Deallocator 对象
                this.thunk.run();
            } catch (final Throwable var2) {
                AccessController.doPrivileged(new PrivilegedAction<Void>() {
                    public Void run() {
                        if (System.err != null) {
                            (new Error("Cleaner terminated abnormally", var2)).printStackTrace();
                        }

                        System.exit(1);
                        return null;
                    }
                });
            }

        }
    }
        private Deallocator(long address, long size, int capacity) {
            assert (address != 0);
            this.address = address;
            this.size = size;
            this.capacity = capacity;
        }

        public void run() {
            if (address == 0) {
                // Paranoia
                return;
            }
            //回收掉堆外内存
            unsafe.freeMemory(address);
            address = 0;
           //修改堆外内存的剩余容量大小
            Bits.unreserveMemory(size, capacity);
        }

所以如果一直触发不了cms gc或者full gc,老年代的DirectByteBuffer对象不能被回收,那么堆外内存就一直不能被回收,可能导致内存泄漏。

参考资料:
http://ifeve.com/buffers/
http://lovestblog.cn/blog/2015/05/12/direct-buffer/
https://www.jianshu.com/p/007052ee3773

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,919评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,567评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,316评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,294评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,318评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,245评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,120评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,964评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,376评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,592评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,764评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,460评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,070评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,697评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,846评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,819评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,665评论 2 354

推荐阅读更多精彩内容