Netty的高性能总结

1.主从Reactor主从多线程模型的运用
主从Reactor多线程.png
2.巧妙的通过在EventLoop持有Thread的引用,调用inEventLoop()来避免了多线程的竞争,实现了无锁化.
3.对jdk原生ByteBuffer的优化,前面总结过一遍,再来总结一遍

jdk原生ByteBuffer的缺点

(1)

public abstract class ByteBuffer
    extends Buffer
    implements Comparable<ByteBuffer>
{

    // These fields are declared here rather than in Heap-X-Buffer in order to
    // reduce the number of virtual method invocations needed to access these
    // values, which is especially costly when coding small buffers.
    //
    final byte[] hb;                  // Non-null only for heap buffers
    final int offset;
    boolean isReadOnly;                 // Valid only for heap buffers

final byte hb;这是JDKde ByteBuffer对象中用于存储数据的对象声明;可以看到,其字节数组是被声明为final的,也就是长度是固定不变的,一旦分配好后不能动态扩容与收缩;而且当待存储的数据字节很大时就很有可能出现IndexOutOfBoundsException,如果需要预防这个异常,就需要在存储之前完全确定好待存储的字节大小。

(2)

ByteBuffer只使用一个position指针来标识位置信息,在进行读写切换时就需要调用flip方法或是rewind方法,使用起来很不方便。

0 <= mark <= position <= limit <= capacity

JDK中ByteBuffer中的方法
flip()方法
1.将limit值设为当前的position
2.将position设为0.

clear()方法
1.将limit值设为capacity.
2.将position值设为0.

compact()方法
1.将所有未读的数据复制到buffer起始位置处.
2.将position设为最后一个未读元素的后面.
3.将limit设为capacity.
4.现在buffer就准备好了,但不会覆盖未读的数据

Netty针对此做了优化

1.Netty的ByteBuf采用了读写索引分离的策略(readerIndex与writerIndex),一个初始化(里面尚未有任何数据)的ByteBuf的readerIndex与writerIndex值都是0.
2.当读索引与写索引处于同一个位置时,如果我们继续读取,那么就会抛出IndexOutOfBoundException.
3.对于ByteBuf的任何读写分离操作都会分别单独维护读索引与写索引.maxCapacity最大容默认的限制就是Integer.Max_VALUE.

netty本身提供3种缓冲区类型
1.heap buffer
2.direct buffer
3.composite buffer

Heap buffer(堆缓存区)

这是最常用的类型,ByteBuf将数据存储到JVM的堆缓冲区中,并且将实际的数据存放到byte array中来实现

优点:由于数据是存储在JVM堆中,因此可以快速的创建与快速的释放,并且它提供了直接访问内部字节数据的方法
缺点:每次读写数据时,都需要先将数据复制到直接缓存区中再进行网络传输

Direct Buffer(直接缓冲区)

在堆外直接分配内存空间,直接缓冲区并不会占用堆的容量空间,因为它是由操作系统在本地内存进行的数据分配。

优点:在使用Socket进行数据传输时,性能非常好,因为数据直接位于操作系统的本地内存中,所以不需要从JVM将数据复制到直接缓冲区中。
缺点:因为Direct Buffer是直接在操作系统内存中的,所以内存空间的分配与释放要比堆空间更加复杂,而且速度要慢一些。

Netty通过提供内存池来解决这个问题,直接缓冲区并不支持通过字节数组来分配内存。


4.优化了jdk原生的future

(1)原生的future的使用方式,future.get().还是需要我们调用该方法阻塞在那里,而不是运行完了自动通知我们.
原生Future提供的方法如下

public interface Future<V> {
    //取消等待结果
    boolean cancel(boolean mayInterruptIfRunning);
    //是否是已取消状态
    boolean isCancelled();
    //是否是已完成状态(包含了已成功或者已失败,或者是已经被取消,netty把这几个状态拆分了出来)
    boolean isDone();
    //阻塞获取结果
    V get() throws InterruptedException, ExecutionException;
    //加了超时时间
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Netty针对此做出的优化

netty的future源码如下

public interface Future<V> extends java.util.concurrent.Future<V> {

    boolean isSuccess();

    boolean isCancellable();

    Throwable cause();

    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

    Future<V> sync() throws InterruptedException;

    Future<V> syncUninterruptibly();

    Future<V> await() throws InterruptedException;

    Future<V> awaitUninterruptibly();

    boolean await(long timeout, TimeUnit unit) throws InterruptedException;

    boolean await(long timeoutMillis) throws InterruptedException;
    
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);
    
    boolean awaitUninterruptibly(long timeoutMillis);

    V getNow();

    @Override
    boolean cancel(boolean mayInterruptIfRunning);
}

我们可以看到它继承了jdk原生的future并且新增了一些方法,比较重要的是可以添加回调即监听者了,和isSuccess()方法等。

不过这里需要注意的是

/**
 * Listens to the result of a {@link ChannelFuture}.  The result of the
 * asynchronous {@link Channel} I/O operation is notified once this listener
 * is added by calling {@link ChannelFuture#addListener(GenericFutureListener)}.
 *
 * <h3>Return the control to the caller quickly</h3>
 *
 * {@link #operationComplete(Future)} is directly called by an I/O
 * thread.  Therefore, performing a time consuming task or a blocking operation
 * in the handler method can cause an unexpected pause during I/O.  If you need
 * to perform a blocking operation on I/O completion, try to execute the
 * operation in a different thread using a thread pool.
 */
public interface ChannelFutureListener extends GenericFutureListener<ChannelFuture>


从接口注释我们可以看到
void operationComplete(F future)
是由I/O线程去调用的,所以不要在这个方法内执行耗时的操作,如果非要做,可以用线程池


对JDK的nio的空轮询的优化

关键代码如下:

private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
                 ....
                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;

                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    // The selector returned prematurely many times in a row.
                    // Rebuild the selector to work around the problem.
                    logger.warn(
                            "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                            selectCnt, selector);

                    rebuildSelector();
                    selector = this.selector;

                    // Select again to populate selectedKeys.
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

也就是根据time+currentTimeNanos是否小于超时时间,来判断你是否真的阻塞了这么久,有没有真正干活.没有的话我就计数
超过SELECTOR_AUTO_REBUILD_THRESHOLD默认为512,我就把你开掉,直接换人rebuildSelector(),简单而且高效;


6.对原生ThreadLocal的优化

FastThreadLocal的构造方法如下

private final int index;

    public FastThreadLocal() {
        index = InternalThreadLocalMap.nextVariableIndex();
    }

这个index其实就等于AtomicInteger.getAndIncrement().也就是说明,在JVM里面,每一个FastThreadLocal都有唯一的一个标识.

它的get()方法获取对象的三个步骤

1.获取到一个ThreadLocalMap对象
2.直接通过索引取出对象(ThreadLocalMap内部维护了一个数组)
3.没有的话就创建

FastThreadLocal的get()方法如下

public final V get() {
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
        Object v = threadLocalMap.indexedVariable(index);
        if (v != InternalThreadLocalMap.UNSET) {
            return (V) v;
        }

        V value = initialize(threadLocalMap);
        registerCleaner(threadLocalMap);
        return value;
    }

InternalThreadLocalMap的get()如下

public static InternalThreadLocalMap get() {
        Thread thread = Thread.currentThread();
        if (thread instanceof FastThreadLocalThread) {
            return fastGet((FastThreadLocalThread) thread);
        } else {
            return slowGet();
        }
    }

fastGet()

private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
        InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
        if (threadLocalMap == null) {
            thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
        }
        return threadLocalMap;
    }

slowGet()

private static InternalThreadLocalMap slowGet() {
        ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
        InternalThreadLocalMap ret = slowThreadLocalMap.get();
        if (ret == null) {
            ret = new InternalThreadLocalMap();
            slowThreadLocalMap.set(ret);
        }
        return ret;
    }

也就是slowGet()其实是多了这一步的

 ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;

FastThreadLocalThread 它自己维护了一个InternalThreadLocalMap

public class FastThreadLocalThread extends Thread {
    // This will be set to true if we have a chance to wrap the Runnable.
    private final boolean cleanupFastThreadLocals;

    private InternalThreadLocalMap threadLocalMap;

然后就是通过索引来获取对象

public Object indexedVariable(int index) {
        Object[] lookup = indexedVariables;
        return index < lookup.length? lookup[index] : UNSET;
    }

简单总结下就是
原生的ThreadLocal获取对象流程如下

ThreadLocal-->ThreadLocalMap-->Entry-->Value

Netty的FastThreadLocal

继承了Thread,内部的局部变量是数组,而原生的ThreadLocalMap是个map,而且哈希冲突的方法是开放地址法,而Netty由于是数组,每个FastThreadLocal都有唯一的索引,所以它是不会产生冲突的,所以光从这一点,FastThreadLocal就赢了,当然FastThreadLocal还解决了原生ThreadLocal的内存泄漏问题,要写的话可以单独写一篇文章来分析,这里就不多赘述了


7.自适应缓冲区的分配
NioServerSocketChannel#NioServerSocketChannel()
 ->NioServerSocketChannel.NioServerSocketChannelConfig
  ->DefaultChannelConfig#DefaultChannelConfig(Channel,RecvByteBufAllocator)
 

DefaultChannelConfig的构造方法如下

this(channel, new AdaptiveRecvByteBufAllocator());

核心代码如下

/**
     * Creates a new predictor with the specified parameters.
     *
     * @param minimum  the inclusive lower bound of the expected buffer size
       估计的一个最小缓存值
     * @param initial  the initial buffer size when no feed back was received
       没有回调被接收时候的缓存值
     * @param maximum  the inclusive upper bound of the expected buffer size
      估计的最大缓存值
     */
    public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
        if (minimum <= 0) {
            throw new IllegalArgumentException("minimum: " + minimum);
        }
        if (initial < minimum) {
            throw new IllegalArgumentException("initial: " + initial);
        }
        if (maximum < initial) {
            throw new IllegalArgumentException("maximum: " + maximum);
        }

        int minIndex = getSizeTableIndex(minimum);
        if (SIZE_TABLE[minIndex] < minimum) {
            this.minIndex = minIndex + 1;
        } else {
            this.minIndex = minIndex;
        }

        int maxIndex = getSizeTableIndex(maximum);
        if (SIZE_TABLE[maxIndex] > maximum) {
            this.maxIndex = maxIndex - 1;
        } else {
            this.maxIndex = maxIndex;
        }

        this.initial = initial;
    }

默认值如下

static final int DEFAULT_MINIMUM = 64;
    static final int DEFAULT_INITIAL = 1024;
    static final int DEFAULT_MAXIMUM = 65536;

8.MpscLinkedQueue多生产者单消费者下的队列实现,以及缓存行填充;

首先先解释下缓存行的填充,一个缓冲行通常是64个字节,一个long类型是8个字节,所以一般会左边7个long对象,右边7个long对象,这样就保证它会独占一个缓存行,避免频繁的更新,Disruptor,jdk底层的atomic包也用到了这个机制,就不详细讲了.

还是着重分析一下它的高性能的队列

其实select线程可以看作是消费者,客户端连接看作是生产者

netty里因为有inEventLoop方法,假如不是当前线程,其实它是放到队列里面去的

public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
            startThread();
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

解释说明

LinkedBlockingQueue也是一个高效的线程安全的队列,它使用了takeLock和putLock两个锁分别作用与消费线程和生产线程,避免了消费者和生产者直接的竞争。然而在消费者之间、生产者之间依然需要竞争各自端的锁。

对于NioEventLoop来说,taskQueue只有一个消费者,即运行NioEventLoop.run()的那个线程

此外,一般无锁,基本都是采用自旋+CAS

MpscLinkedQueue的
多个生产者生产数据元素的方法如下

public boolean add(E e) {
        if (offer(e)) {
            return true;
        }
        throw new IllegalStateException("queue full");
        }
        public boolean offer(E value) {
        if (value == null) {
            throw new NullPointerException("value");
        }
        
        // 如果传入的是node则直接使用,否则实例化一个newTail
        final MpscLinkedQueueNode<E> newTail;
        if (value instanceof MpscLinkedQueueNode) {
            newTail = (MpscLinkedQueueNode<E>) value;
            newTail.setNext(null);
        } else {
            newTail = new DefaultNode<E>(value);
        }

        MpscLinkedQueueNode<E> oldTail = replaceTail(newTail);
        oldTail.setNext(newTail);
        return true;
    }

private MpscLinkedQueueNode<E> replaceTail(MpscLinkedQueueNode<E> node) {
        return getAndSet(node);
    }
 
 //采用原子更新的方式来添加节点
public final V getAndSet(V newValue) {
        return (V)unsafe.getAndSetObject(this, valueOffset, newValue);
    }

单个消费者消费数据的代码如下

public E poll() {
       //获取链表中的第一个元素
        final MpscLinkedQueueNode<E> next = peekNode();
        if (next == null) {
            return null;
        }

        // 下一个节点变成新的头结点
        MpscLinkedQueueNode<E> oldHead = headRef.get();
        // 直接将此次获取到的数据修改成头结点
        headRef.lazySet(next);

        // 将原头结点的next置为null,去除oldHead与新头结点之间的关联
        oldHead.setNext(null);
        
        // 获取节点中的数据,并将value置为null,去除节点与数据直接的关联
        return next.clearMaybe();
    }

  //获取链表中的第一个元素
  private MpscLinkedQueueNode<E> peekNode() {
        for (;;) {
            final MpscLinkedQueueNode<E> head = headRef.get();
            final MpscLinkedQueueNode<E> next = head.next();
           
            // 当头结点与尾节点不同时,说明肯定已经有数据插入了;
            if (next != null) {
                return next;
            }
           
            // 头结点与尾节点相同,说明还在初始化状态,直接返回null
            if (head == getTail()) {
                return null;
            }
        }
    }

小结如下

MpscLinkedQueue通过使用链表存储数据以及巧妙的CAS操作,实现单消费者多生产者队列,代码简洁高效,适合netty中的无锁化串行设计。


9.以及对Unsafe对象是使用,堆内存的分配,以及直接内存的访问,实现了零拷贝;

NIO零拷贝.png

使用到了Java NIO的Scatter与Gather

Scatter(分散):分散读取,从管道Channel中读取的数据分散到一个或者多个缓冲区Buffer中,分散的时候会依次的按缓冲区的顺序一个一个的进行。
Gather(聚集):聚集写入,把缓冲区position到limit之间的数据依次的写入到管道中。
这样,一个大文件就可以分割成许多小的文件进行传输。

这里的socket buffer不再完全拷贝kernel buffer中的完整内容,而只是记录了kernel buffer的内存地址与长度.

而protocaol engine读的时候,就从两个地方开始读,一个是kernel buffer,一个是socket buffer.(这就是Gather操作)

内存映射.png

对应的代码操作为

        File file = new File("data.zip");
        RandomAccessFile raf = new RandomAccessFile(file, "rw");
        FileChannel fileChannel = raf.getChannel();
        MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileChannel.size());
真正的零拷贝.png

对应的代码为

       File file = new File("test.zip");
        RandomAccessFile raf = new RandomAccessFile(file, "rw");
        FileChannel fileChannel = raf.getChannel();
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("", 1234));
        // 直接使用了transferTo()进行通道间的数据传输
        fileChannel.transferTo(0, fileChannel.size(), socketChannel);


10.内存池的使用

netty内存池.png

结合零拷贝,效率会很高


11.使用引用计数来回收对象

但对象的引用不是每个对象维护一个AtomicLong,而是使用了AtomicReferenceFieldUpdater节省了内存空间

首先看下使用到的地方

public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {

    private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater =
            AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");

    private volatile int refCnt;//注意这里的volatile  保证可见性和有序性

再来看下它的定义

/**
 * A reflection-based utility that enables atomic updates to
 * designated {@code volatile int} fields of designated classes.
 * This class is designed for use in atomic data structures in which
 * several fields of the same node are independently subject to atomic
 * updates.
使用反射原子性的去更新被 volatile 修饰的Int型变量。
其中同一节点的多个字段独立地使用CAS

 *
 * <p>Note that the guarantees of the {@code compareAndSet}
 * method in this class are weaker than in other atomic classes.
 * Because this class cannot ensure that all uses of the field
 * are appropriate for purposes of atomic access, it can
 * guarantee atomicity only with respect to other invocations of
 * {@code compareAndSet} and {@code set} on the same updater.
 *
   这段的大概意思,就是你要保证原子性,就得都使用CAS来赋值,不能一个使用CAS,另一个就直接赋值,这样是没办法保证的。

 * @since 1.5
 * @author Doug Lea
 * @param <T> The type of the object holding the updatable field
 */
public abstract class AtomicIntegerFieldUpdater<T> 

最后再来看一下,常用的retain()和release()

private ByteBuf retain0(final int increment) {
        int oldRef = refCntUpdater.getAndAdd(this, increment);
        if (oldRef <= 0 || oldRef + increment < oldRef) {
            // Ensure we don't resurrect (which means the refCnt was 0) and also that we encountered an overflow.
           //引用计数为0就不可以retain继续持有了
            refCntUpdater.getAndAdd(this, -increment);
            throw new IllegalReferenceCountException(oldRef, increment);
        }
        return this;
    }

release()

private boolean release0(int decrement) {
        int oldRef = refCntUpdater.getAndAdd(this, -decrement);
        if (oldRef == decrement) {
            deallocate();
            return true;
        } else if (oldRef < decrement || oldRef - decrement > oldRef) {
            // Ensure we don't over-release, and avoid underflow.
            refCntUpdater.getAndAdd(this, decrement);
            throw new IllegalReferenceCountException(oldRef, -decrement);
        }
        return false;
    }

12.Set<SelectionKey>重写;

在NioEventLoop的构造函数中openSelector();

        NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }

openSelector()代码如下;

private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
        try {
            unwrappedSelector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }
       
        //这里假如不需要优化  直接返回
        if (DISABLE_KEYSET_OPTIMIZATION) {
            return new SelectorTuple(unwrappedSelector);
        }

        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (Throwable cause) {
                    return cause;
                }
            }
        });

        if (!(maybeSelectorImplClass instanceof Class) ||
                // ensure the current selector implementation is what we can instrument.
                !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
            if (maybeSelectorImplClass instanceof Throwable) {
                Throwable t = (Throwable) maybeSelectorImplClass;
                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
            }
            return new SelectorTuple(unwrappedSelector);
        }

        final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                    Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }
                    cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }

                    selectedKeysField.set(unwrappedSelector, selectedKeySet);
                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                    return null;
                } catch (NoSuchFieldException e) {
                    return e;
                } catch (IllegalAccessException e) {
                    return e;
                }
            }
        });

        if (maybeException instanceof Exception) {
            selectedKeys = null;
            Exception e = (Exception) maybeException;
            logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
            return new SelectorTuple(unwrappedSelector);
        }
        selectedKeys = selectedKeySet;
        logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
        return new SelectorTuple(unwrappedSelector,
                                 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    }

可以看下

 final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

源码如下

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

    SelectionKey[] keys;
    int size;

    SelectedSelectionKeySet() {
        keys = new SelectionKey[1024];
    }

    @Override
    public boolean add(SelectionKey o) {
        if (o == null) {
            return false;
        }

        keys[size++] = o;
        if (size == keys.length) {
            increaseCapacity();
        }

        return true;
    }

    @Override
    public int size() {
        return size;
    }

    @Override
    public boolean remove(Object o) {
        return false;
    }

    @Override
    public boolean contains(Object o) {
        return false;
    }

    @Override
    public Iterator<SelectionKey> iterator() {
        throw new UnsupportedOperationException();
    }

    void reset() {
        reset(0);
    }

    void reset(int start) {
        Arrays.fill(keys, start, size, null);
        size = 0;
    }

    private void increaseCapacity() {
        SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
        System.arraycopy(keys, 0, newKeys, 0, size);
        keys = newKeys;
    }
}

我们可以看到内部其实就是一个数组


13.线程池与任务之间的分离

我们在 NioEventLoopGroup boss = new NioEventLoopGroup(); 的时候其实就把线程池创建好了

executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
  ->chooser = chooserFactory.newChooser(children)
     ->

这里就是工厂模式

public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {//是否是2的幂次方
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

PowerOfTwoEventExecutorChooser(executors)如下

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

new GenericEventExecutorChooser(executors)如下

private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }

小结,如下

如果分配的总数量是2的幂次方,那么就用 index &(lengtn -1 )
否则index %length

至于为什么与操作会更快?因为cpu层面与就是一个操作指令,而%要先用除指令,再取余.

至于为什么index&(length-1)和index%length相等?
X % 2^n = X & (2^n - 1)
假设n为3,则2^3 = 8,表示成2进制就是1000。2^3 -1 = 7 ,即0111。
此时X & (2^3 - 1) 就相当于取X的2进制的最后三位数。
从2进制角度来看,X / 8相当于 X >> 3,即把X右移3位,此时得到了X / 8的商,而被移掉的部分(后三位),则是X % 8,也就是余数。

如:

6 % 8 = 6 ,6 & 7 = 6
10 % 8 = 2 ,10 & 7 = 2

暂时就先总结这么多,此外内存分配算法感觉也是很精髓的一部分,单独写吧

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,377评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,390评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,967评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,344评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,441评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,492评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,497评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,274评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,732评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,008评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,184评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,837评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,520评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,156评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,407评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,056评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,074评论 2 352

推荐阅读更多精彩内容