Netty对象池实现分析

什么是对象池技术?对象池应用在哪些地方?

对象池其实就是缓存一些对象从而避免大量创建同一个类型的对象,类似线程池的概念。对象池缓存了一些已经创建好的对象,避免需要时才创建对象,同时限制了实例的个数。池化技术最终要的就是重复的使用池内已经创建的对象。从上面的内容就可以看出对象池适用于以下几个场景:

  1. 创建对象的开销大
  2. 会创建大量的实例
  3. 限制一些资源的使用

如果创建一个对象的开销特别大,那么提前创建一些可以使用的并且缓存起来(池化技术就是重复使用对象,提前创建并缓存起来重复使用就是池化)可以降低创建对象时的开销。

会大量创建实例的场景,重复的使用对象可减少创建的对象数量,降低GC的压力(如果这些对象的生命周期都很短暂,那么可以降低YoungGC的频率;如果生命周期很长,那么可以避免掉这些对象被FullGC——生命周期长,且大量创建,这里就要结合系统的TPS等考虑池的大小了)。

对于限制资源的使用更多的是一种保护策略,比如数据库链接池。除去这些对象本身的开销外,他们对外部系统也会造成压力,比如大量创建链接对DB也是有压力的。那么池化除了优化资源以外,本身限制了资源数,对外部系统也起到了一层保护作用。

如何实现对象池?

开源实现:Apache Commons Pool
自己实现:Netty轻量级对象池实现

Apache Commons Pool开源软件库提供了一个对象池API和一系列对象池的实现,支持各种配置,比如活跃对象数或者闲置对象个数等。DBCP数据库连接池基于Apache Commons Pool实现。

Netty自己实现了一套轻量级的对象池。在Netty中,通常会有多个IO线程独立工作,基于NioEventLoop的实现,每个IO线程轮询单独的Selector实例来检索IO事件,并在IO来临时开始处理。最常见的IO操作就是读写,具体到NIO就是从内核缓冲区拷贝数据到用户缓冲区或者从用户缓冲区拷贝数据到内核缓冲区。这里会涉及到大量的创建和回收Buffer,Netty对Buffer进行了池化从而降低系统开销。

<h3>Netty对象池实现分析</h3>

上面提到了IO操作中会涉及到大量的缓冲区操作,NIO提供了两种Buffer最为缓冲区:DirectByteBuffer和HeapByteBuffer。Netty在两种缓冲区的基础上进行了池化进而提升性能。

DirectByteBuffer
DirectByteBuffer顾名思义是直接内存(Direct Memory)上的Byte缓存区,直接内存不是JVM Runtime数据区域的一部分,也不是Java虚拟机规范中定义的内存区域。简单的说这部分就是机器内存,分配的大小等都和虚拟机限制无关。JDK1.4中开始我们可以使用native方法在直接内存上来分配内存,并在JVM堆内存上维持一个引用来进行访问,当JVM堆内存上的引用被回收后,这块内存被操作系统回收。

HeapByteBuffer
HeapByteBuffer是在JVM堆内存上分配的Byte缓冲区,可以简单的理解为byte[]数组的一种封装。基于HeapByteBuffer的写流程通常要先在直接内存上分配一个临时的缓冲区,将数据从Heap拷贝到直接内存,然后再将直接内存的数据发送到IO设备的缓冲区,之后回收直接内存。读流程也类似。使用DirectByteBuffer避免了不必要的拷贝工作,所以在性能上会有提升。

DirectByteBuffer的缺点在于分配和回收的的代价相对较大,因此DirectByteBuffer适用于缓冲区可以重复使用的场景。

Netty的池化实现

以Buffer为例,对应直接内存和堆内存,Netty的池化分别为PooledDirectByteBuffer和PolledHeapByteBuffer。

ByteBuffer继承关系

通过PooledDirectByteBuffer的API定义可以看到,它的构造方法是私有的,而创建一个实例的入口是:

    static PooledDirectByteBuf newInstance(int maxCapacity) {
        PooledDirectByteBuf buf = RECYCLER.get();
        buf.reuse(maxCapacity);
        return buf;
    }

可见RECYCLER是池化的核心,创建对象时都通过RECYCLER.get来获得一个实例(Recycler就是Netty实轻量级池化技术的核心)。

Recycler实现分析(源码分析)

/**
 * Light-weight object pool based on a thread-local stack.
 *
 * @param <T> the type of the pooled object
 */
public abstract class Recycler<T>

从注释可以看出Netty基于thread-local实现了轻量级的对象池。

Recycler成员

Recycler的API非常简单:

  • get():获取一个实例
  • recycle(T, Handle<T>):回收一个实例
  • newObject(Handle<T>):创建一个实例

get流程

    @SuppressWarnings("unchecked")
    public final T get() {
        if (maxCapacity == 0) {
            return newObject((Handle<T>) NOOP_HANDLE);
        }
        Stack<T> stack = threadLocal.get();
        DefaultHandle<T> handle = stack.pop();
        if (handle == null) {
            handle = stack.newHandle();
            handle.value = newObject(handle);
        }
        return (T) handle.value;
    }

get的简化流程(这里先不深究细节):

  1. 拿到当前线程对应的stack
  2. 从stack中pop出一个元素
  3. 如果不为空则返回,否则创建一个新的实例

可以大概明白Stack是对象池化背后存储实例的数据结构:如果能从stack中拿到可用的实例就不再创建新的实例。

recycle流程

一个“池子”最核心的就是做两件事情,第一个是上面的Get,即从池子中拿出一个可用的实例。另一个就是在用完后将数据放回到池子中(线程池、连接池都是这样)。

    public final boolean recycle(T o, Handle<T> handle) {
        if (handle == NOOP_HANDLE) {
            return false;
        }

        DefaultHandle<T> h = (DefaultHandle<T>) handle;
        if (h.stack.parent != this) {
            return false;
        }

        h.recycle(o);
        return true;
    }


    public void recycle(Object object) {
        if (object != value) {
            throw new IllegalArgumentException("object does not belong to handle");
        }
        Thread thread = Thread.currentThread();
        if (thread == stack.thread) {
            stack.push(this);
            return;
        }
        // we don't want to have a ref to the queue as the value in our weak map
        // so we null it out; to ensure there are no races with restoring it later
        // we impose a memory ordering here (no-op on x86)
        Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
        WeakOrderQueue queue = delayedRecycled.get(stack);
        if (queue == null) {
            delayedRecycled.put(stack, queue = new WeakOrderQueue(stack, thread));
        }
        queue.add(this);
    }

回收一个实例核心的步骤由以上两个方法组成:Recycler的recycle方法和DefaultHandle的recycle方法。
Recycler的recycle方法主要做了一些参数验证。
DefaultHandle的recycle方法流程如下:

  1. 如果当前线程是当前stack对象的线程,那么将实例放入stack中,否则:
  2. 获取当前线程对应的Map<Stack, WeakOrderQueue>,并将实例加入到Stack对应的Queue中。

从获取实例和回收实例的代码可以看出,整个对象池的核心实现由ThreadLocal和Stack及WrakOrderQueue构成,接着来看Stack和WrakOrderQueue的具体实现,最后概括整体实现。

Stack实体

Stack<T>
    parent:Recycler               // 关联对应的Recycler
    thread:Thread                 // 对应的Thread
    elements:DefaultHandle<?>[]   // 存储DefaultHandle的数组
    head:WeakOrderQueue           // 指向WeakOrderQueue元素组成的链表的头部“指针”
    cursor,prev:WrakOrderQueue    // 当前游标和前一元素的“指针”

pop实现

    DefaultHandle<T> pop() {
        int size = this.size;
        if (size == 0) {
            if (!scavenge()) {
                return null;
            }
            size = this.size;
        }
        size --;
        DefaultHandle ret = elements[size];
        if (ret.lastRecycledId != ret.recycleId) {
            throw new IllegalStateException("recycled multiple times");
        }
        ret.recycleId = 0;
        ret.lastRecycledId = 0;
        this.size = size;
        return ret;
    }

  1. 如果size为0(这里的size表示stack中可用的元素),尝试进行scavenge。
  2. 返回elements中的最后一个元素。
    boolean scavenge() {
        // continue an existing scavenge, if any
        if (scavengeSome()) {
            return true;
        }

        // reset our scavenge cursor
        prev = null;
        cursor = head;
        return false;
    }

    boolean scavengeSome() {
        WeakOrderQueue cursor = this.cursor;
        if (cursor == null) {
            cursor = head;
            if (cursor == null) {
                return false;
            }
        }

        boolean success = false;
        WeakOrderQueue prev = this.prev;
        do {
            if (cursor.transfer(this)) {
                success = true;
                break;
            }

            WeakOrderQueue next = cursor.next;
            if (cursor.owner.get() == null) {
                // If the thread associated with the queue is gone, unlink it, after
                // performing a volatile read to confirm there is no data left to collect.
                // We never unlink the first queue, as we don't want to synchronize on updating the head.
                if (cursor.hasFinalData()) {
                    for (;;) {
                        if (cursor.transfer(this)) {
                            success = true;
                        } else {
                            break;
                        }
                    }
                }
                if (prev != null) {
                    prev.next = next;
                }
            } else {
                prev = cursor;
            }

            cursor = next;

        } while (cursor != null && !success);

        this.prev = prev;
        this.cursor = cursor;
        return success;
    }

简要概括上面的流程就是Stack从“背后”的Queue中获取可用的实例,如果Queue中没有可用实例就遍历到下一个Queue(Queue组成了一个链表)。

push实现

    void push(DefaultHandle<?> item) {
        if ((item.recycleId | item.lastRecycledId) != 0) {
            throw new IllegalStateException("recycled already");
        }
        item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
        
        int size = this.size;
        if (size >= maxCapacity) {
            // Hit the maximum capacity - drop the possibly youngest object.
            return;
        }
        if (size == elements.length) {
            elements = Arrays.copyOf(elements, Math.min(size << 1, maxCapacity));
        }

        elements[size] = item;
        this.size = size + 1;
    }

push相对pop流程要更加简单,直接将回收的元素放到队尾(实际是一个数组)。

WeakOrderQueue实体

WeakOrderQueue
    head,tail:Link          // 内部元素的指针(WeakOrderQueue内部存储的是一个Link的链表)
    next:WeakOrderQueue     // 指向下一个WeakOrderQueue的指针
    owner:Thread            // 对应的线程

WeakOrderQueue核心包含两个方法,add方法将元素添加到自身的“队列”中,transfer方法将自己拥有的元素“传输”到Stack中。

Linke结构如下

    private static final class Link extends AtomicInteger {
        private final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY];

        private int readIndex;
        private Link next;
    }

Link内部包含了一个数组用于存放实例,同时标记了读取位置的索引和下一个Link元素的指针。
结合Link的结构,Weak的结构如下:


Link结构

add方法

    void add(DefaultHandle<?> handle) {
        handle.lastRecycledId = id;

        Link tail = this.tail;
        int writeIndex;
        if ((writeIndex = tail.get()) == LINK_CAPACITY) {
            this.tail = tail = tail.next = new Link();
            writeIndex = tail.get();
        }
        tail.elements[writeIndex] = handle;
        handle.stack = null;
        // we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread;
        // this also means we guarantee visibility of an element in the queue if we see the index updated
        tail.lazySet(writeIndex + 1);
    }

add操作将元素添加到tail指向的Link对象中,如果Link已满则创建一个新的Link实例。

transfer方法

boolean transfer(Stack<?> dst) {

    Link head = this.head;
    if (head == null) {
        return false;
    }

    if (head.readIndex == LINK_CAPACITY) {
        if (head.next == null) {
            return false;
        }
        this.head = head = head.next;
    }

    final int srcStart = head.readIndex;
    int srcEnd = head.get();
    final int srcSize = srcEnd - srcStart;
    if (srcSize == 0) {
        return false;
    }

    final int dstSize = dst.size;
    final int expectedCapacity = dstSize + srcSize;

    if (expectedCapacity > dst.elements.length) {
        final int actualCapacity = dst.increaseCapacity(expectedCapacity);
        srcEnd = Math.min(srcStart + actualCapacity - dstSize, srcEnd);
    }

    if (srcStart != srcEnd) {
        final DefaultHandle[] srcElems = head.elements;
        final DefaultHandle[] dstElems = dst.elements;
        int newDstSize = dstSize;
        for (int i = srcStart; i < srcEnd; i++) {
            DefaultHandle element = srcElems[i];
            if (element.recycleId == 0) {
                element.recycleId = element.lastRecycledId;
            } else if (element.recycleId != element.lastRecycledId) {
                throw new IllegalStateException("recycled already");
            }
            element.stack = dst;
            dstElems[newDstSize ++] = element;
            srcElems[i] = null;
        }
        dst.size = newDstSize;

        if (srcEnd == LINK_CAPACITY && head.next != null) {
            this.head = head.next;
        }

        head.readIndex = srcEnd;
        return true;
    } else {
        // The destination stack is full already.
        return false;
    }
}

transfer方法收件根据stack的容量和自身拥有的实例数,计算出最终需要转移的实例数。之后就是数组的拷贝和指标的调整。
基本上所有的流程有个大致的了解,下面从整体的角度回顾一下Netty对象池的实现。

整体实现
结构

整体结构

整个设计上核心的几点:
1. Stack相当于是一级缓存,同一个线程内的使用和回收都将使用一个Stack
2. 每个线程都会有一个自己对应的Stack,如果回收的线程不是Stack的线程,将元素放入到Queue中
3. 所有的Queue组合成一个链表,Stack可以从这些链表中回收元素(实现了多线程之间共享回收的实例)

整体结构
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,390评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,821评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,632评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,170评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,033评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,098评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,511评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,204评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,479评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,572评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,341评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,893评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,171评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,486评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,676评论 2 335

推荐阅读更多精彩内容

  • 从三月份找实习到现在,面了一些公司,挂了不少,但最终还是拿到小米、百度、阿里、京东、新浪、CVTE、乐视家的研发岗...
    时芥蓝阅读 42,137评论 11 349
  • 1、Netty基础入门 Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应...
    我是嘻哈大哥阅读 4,679评论 0 31
  • Java SE 基础: 封装、继承、多态 封装: 概念:就是把对象的属性和操作(或服务)结合为一个独立的整体,并尽...
    Jayden_Cao阅读 2,076评论 0 8
  • 一 我终于下决心要去那个城市了。 那个城市在哪其实我并不知道,只是我觉得非去不可了。一无所知,毫无记忆,却在某一天...
    西江无锋阅读 351评论 5 2