1.主从Reactor主从多线程模型的运用
2.巧妙的通过在EventLoop持有Thread的引用,调用inEventLoop()来避免了多线程的竞争,实现了无锁化.
3.对jdk原生ByteBuffer的优化,前面总结过一遍,再来总结一遍
jdk原生ByteBuffer的缺点
(1)
public abstract class ByteBuffer
extends Buffer
implements Comparable<ByteBuffer>
{
// These fields are declared here rather than in Heap-X-Buffer in order to
// reduce the number of virtual method invocations needed to access these
// values, which is especially costly when coding small buffers.
//
final byte[] hb; // Non-null only for heap buffers
final int offset;
boolean isReadOnly; // Valid only for heap buffers
final byte hb;这是JDKde ByteBuffer对象中用于存储数据的对象声明;可以看到,其字节数组是被声明为final的,也就是长度是固定不变的,一旦分配好后不能动态扩容与收缩;而且当待存储的数据字节很大时就很有可能出现IndexOutOfBoundsException,如果需要预防这个异常,就需要在存储之前完全确定好待存储的字节大小。
(2)
ByteBuffer只使用一个position指针来标识位置信息,在进行读写切换时就需要调用flip方法或是rewind方法,使用起来很不方便。
0 <= mark <= position <= limit <= capacity
JDK中ByteBuffer中的方法
flip()方法
1.将limit值设为当前的position
2.将position设为0.
clear()方法
1.将limit值设为capacity.
2.将position值设为0.
compact()方法
1.将所有未读的数据复制到buffer起始位置处.
2.将position设为最后一个未读元素的后面.
3.将limit设为capacity.
4.现在buffer就准备好了,但不会覆盖未读的数据
Netty针对此做了优化
1.Netty的ByteBuf采用了读写索引分离的策略(readerIndex与writerIndex),一个初始化(里面尚未有任何数据)的ByteBuf的readerIndex与writerIndex值都是0.
2.当读索引与写索引处于同一个位置时,如果我们继续读取,那么就会抛出IndexOutOfBoundException.
3.对于ByteBuf的任何读写分离操作都会分别单独维护读索引与写索引.maxCapacity最大容默认的限制就是Integer.Max_VALUE.
netty本身提供3种缓冲区类型
1.heap buffer
2.direct buffer
3.composite buffer
Heap buffer(堆缓存区)
这是最常用的类型,ByteBuf将数据存储到JVM的堆缓冲区中,并且将实际的数据存放到byte array中来实现
优点:由于数据是存储在JVM堆中,因此可以快速的创建与快速的释放,并且它提供了直接访问内部字节数据的方法
缺点:每次读写数据时,都需要先将数据复制到直接缓存区中再进行网络传输
Direct Buffer(直接缓冲区)
在堆外直接分配内存空间,直接缓冲区并不会占用堆的容量空间,因为它是由操作系统在本地内存进行的数据分配。
优点:在使用Socket进行数据传输时,性能非常好,因为数据直接位于操作系统的本地内存中,所以不需要从JVM将数据复制到直接缓冲区中。
缺点:因为Direct Buffer是直接在操作系统内存中的,所以内存空间的分配与释放要比堆空间更加复杂,而且速度要慢一些。
Netty通过提供内存池来解决这个问题,直接缓冲区并不支持通过字节数组来分配内存。
4.优化了jdk原生的future
(1)原生的future的使用方式,future.get().还是需要我们调用该方法阻塞在那里,而不是运行完了自动通知我们.
原生Future提供的方法如下
public interface Future<V> {
//取消等待结果
boolean cancel(boolean mayInterruptIfRunning);
//是否是已取消状态
boolean isCancelled();
//是否是已完成状态(包含了已成功或者已失败,或者是已经被取消,netty把这几个状态拆分了出来)
boolean isDone();
//阻塞获取结果
V get() throws InterruptedException, ExecutionException;
//加了超时时间
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Netty针对此做出的优化
netty的future源码如下
public interface Future<V> extends java.util.concurrent.Future<V> {
boolean isSuccess();
boolean isCancellable();
Throwable cause();
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
Future<V> sync() throws InterruptedException;
Future<V> syncUninterruptibly();
Future<V> await() throws InterruptedException;
Future<V> awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
V getNow();
@Override
boolean cancel(boolean mayInterruptIfRunning);
}
我们可以看到它继承了jdk原生的future并且新增了一些方法,比较重要的是可以添加回调即监听者了,和isSuccess()方法等。
不过这里需要注意的是
/**
* Listens to the result of a {@link ChannelFuture}. The result of the
* asynchronous {@link Channel} I/O operation is notified once this listener
* is added by calling {@link ChannelFuture#addListener(GenericFutureListener)}.
*
* <h3>Return the control to the caller quickly</h3>
*
* {@link #operationComplete(Future)} is directly called by an I/O
* thread. Therefore, performing a time consuming task or a blocking operation
* in the handler method can cause an unexpected pause during I/O. If you need
* to perform a blocking operation on I/O completion, try to execute the
* operation in a different thread using a thread pool.
*/
public interface ChannelFutureListener extends GenericFutureListener<ChannelFuture>
从接口注释我们可以看到
void operationComplete(F future)
是由I/O线程去调用的,所以不要在这个方法内执行耗时的操作,如果非要做,可以用线程池
对JDK的nio的空轮询的优化
关键代码如下:
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
....
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
也就是根据time+currentTimeNanos是否小于超时时间,来判断你是否真的阻塞了这么久,有没有真正干活.没有的话我就计数
超过SELECTOR_AUTO_REBUILD_THRESHOLD默认为512,我就把你开掉,直接换人rebuildSelector(),简单而且高效;
6.对原生ThreadLocal的优化
FastThreadLocal的构造方法如下
private final int index;
public FastThreadLocal() {
index = InternalThreadLocalMap.nextVariableIndex();
}
这个index其实就等于AtomicInteger.getAndIncrement().也就是说明,在JVM里面,每一个FastThreadLocal都有唯一的一个标识.
它的get()方法获取对象的三个步骤
1.获取到一个ThreadLocalMap对象
2.直接通过索引取出对象(ThreadLocalMap内部维护了一个数组)
3.没有的话就创建
FastThreadLocal的get()方法如下
public final V get() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
V value = initialize(threadLocalMap);
registerCleaner(threadLocalMap);
return value;
}
InternalThreadLocalMap的get()如下
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
return fastGet((FastThreadLocalThread) thread);
} else {
return slowGet();
}
}
fastGet()
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) {
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}
slowGet()
private static InternalThreadLocalMap slowGet() {
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
InternalThreadLocalMap ret = slowThreadLocalMap.get();
if (ret == null) {
ret = new InternalThreadLocalMap();
slowThreadLocalMap.set(ret);
}
return ret;
}
也就是slowGet()其实是多了这一步的
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
FastThreadLocalThread 它自己维护了一个InternalThreadLocalMap
public class FastThreadLocalThread extends Thread {
// This will be set to true if we have a chance to wrap the Runnable.
private final boolean cleanupFastThreadLocals;
private InternalThreadLocalMap threadLocalMap;
然后就是通过索引来获取对象
public Object indexedVariable(int index) {
Object[] lookup = indexedVariables;
return index < lookup.length? lookup[index] : UNSET;
}
简单总结下就是
原生的ThreadLocal获取对象流程如下
ThreadLocal-->ThreadLocalMap-->Entry-->Value
Netty的FastThreadLocal
继承了Thread,内部的局部变量是数组,而原生的ThreadLocalMap是个map,而且哈希冲突的方法是开放地址法,而Netty由于是数组,每个FastThreadLocal都有唯一的索引,所以它是不会产生冲突的,所以光从这一点,FastThreadLocal就赢了,当然FastThreadLocal还解决了原生ThreadLocal的内存泄漏问题,要写的话可以单独写一篇文章来分析,这里就不多赘述了
7.自适应缓冲区的分配
NioServerSocketChannel#NioServerSocketChannel()
->NioServerSocketChannel.NioServerSocketChannelConfig
->DefaultChannelConfig#DefaultChannelConfig(Channel,RecvByteBufAllocator)
DefaultChannelConfig的构造方法如下
this(channel, new AdaptiveRecvByteBufAllocator());
核心代码如下
/**
* Creates a new predictor with the specified parameters.
*
* @param minimum the inclusive lower bound of the expected buffer size
估计的一个最小缓存值
* @param initial the initial buffer size when no feed back was received
没有回调被接收时候的缓存值
* @param maximum the inclusive upper bound of the expected buffer size
估计的最大缓存值
*/
public AdaptiveRecvByteBufAllocator(int minimum, int initial, int maximum) {
if (minimum <= 0) {
throw new IllegalArgumentException("minimum: " + minimum);
}
if (initial < minimum) {
throw new IllegalArgumentException("initial: " + initial);
}
if (maximum < initial) {
throw new IllegalArgumentException("maximum: " + maximum);
}
int minIndex = getSizeTableIndex(minimum);
if (SIZE_TABLE[minIndex] < minimum) {
this.minIndex = minIndex + 1;
} else {
this.minIndex = minIndex;
}
int maxIndex = getSizeTableIndex(maximum);
if (SIZE_TABLE[maxIndex] > maximum) {
this.maxIndex = maxIndex - 1;
} else {
this.maxIndex = maxIndex;
}
this.initial = initial;
}
默认值如下
static final int DEFAULT_MINIMUM = 64;
static final int DEFAULT_INITIAL = 1024;
static final int DEFAULT_MAXIMUM = 65536;
8.MpscLinkedQueue多生产者单消费者下的队列实现,以及缓存行填充;
首先先解释下缓存行的填充,一个缓冲行通常是64个字节,一个long类型是8个字节,所以一般会左边7个long对象,右边7个long对象,这样就保证它会独占一个缓存行,避免频繁的更新,Disruptor,jdk底层的atomic包也用到了这个机制,就不详细讲了.
还是着重分析一下它的高性能的队列
其实select线程可以看作是消费者,客户端连接看作是生产者
netty里因为有inEventLoop方法,假如不是当前线程,其实它是放到队列里面去的
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
解释说明
LinkedBlockingQueue也是一个高效的线程安全的队列,它使用了takeLock和putLock两个锁分别作用与消费线程和生产线程,避免了消费者和生产者直接的竞争。然而在消费者之间、生产者之间依然需要竞争各自端的锁。
对于NioEventLoop来说,taskQueue只有一个消费者,即运行NioEventLoop.run()的那个线程
此外,一般无锁,基本都是采用自旋+CAS
MpscLinkedQueue的
多个生产者生产数据元素的方法如下
public boolean add(E e) {
if (offer(e)) {
return true;
}
throw new IllegalStateException("queue full");
}
public boolean offer(E value) {
if (value == null) {
throw new NullPointerException("value");
}
// 如果传入的是node则直接使用,否则实例化一个newTail
final MpscLinkedQueueNode<E> newTail;
if (value instanceof MpscLinkedQueueNode) {
newTail = (MpscLinkedQueueNode<E>) value;
newTail.setNext(null);
} else {
newTail = new DefaultNode<E>(value);
}
MpscLinkedQueueNode<E> oldTail = replaceTail(newTail);
oldTail.setNext(newTail);
return true;
}
private MpscLinkedQueueNode<E> replaceTail(MpscLinkedQueueNode<E> node) {
return getAndSet(node);
}
//采用原子更新的方式来添加节点
public final V getAndSet(V newValue) {
return (V)unsafe.getAndSetObject(this, valueOffset, newValue);
}
单个消费者消费数据的代码如下
public E poll() {
//获取链表中的第一个元素
final MpscLinkedQueueNode<E> next = peekNode();
if (next == null) {
return null;
}
// 下一个节点变成新的头结点
MpscLinkedQueueNode<E> oldHead = headRef.get();
// 直接将此次获取到的数据修改成头结点
headRef.lazySet(next);
// 将原头结点的next置为null,去除oldHead与新头结点之间的关联
oldHead.setNext(null);
// 获取节点中的数据,并将value置为null,去除节点与数据直接的关联
return next.clearMaybe();
}
//获取链表中的第一个元素
private MpscLinkedQueueNode<E> peekNode() {
for (;;) {
final MpscLinkedQueueNode<E> head = headRef.get();
final MpscLinkedQueueNode<E> next = head.next();
// 当头结点与尾节点不同时,说明肯定已经有数据插入了;
if (next != null) {
return next;
}
// 头结点与尾节点相同,说明还在初始化状态,直接返回null
if (head == getTail()) {
return null;
}
}
}
小结如下
MpscLinkedQueue通过使用链表存储数据以及巧妙的CAS操作,实现单消费者多生产者队列,代码简洁高效,适合netty中的无锁化串行设计。
9.以及对Unsafe对象是使用,堆内存的分配,以及直接内存的访问,实现了零拷贝;
使用到了Java NIO的Scatter与Gather
Scatter(分散):分散读取,从管道Channel中读取的数据分散到一个或者多个缓冲区Buffer中,分散的时候会依次的按缓冲区的顺序一个一个的进行。
Gather(聚集):聚集写入,把缓冲区position到limit之间的数据依次的写入到管道中。
这样,一个大文件就可以分割成许多小的文件进行传输。
这里的socket buffer不再完全拷贝kernel buffer中的完整内容,而只是记录了kernel buffer的内存地址与长度.
而protocaol engine读的时候,就从两个地方开始读,一个是kernel buffer,一个是socket buffer.(这就是Gather操作)
对应的代码操作为
File file = new File("data.zip");
RandomAccessFile raf = new RandomAccessFile(file, "rw");
FileChannel fileChannel = raf.getChannel();
MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileChannel.size());
对应的代码为
File file = new File("test.zip");
RandomAccessFile raf = new RandomAccessFile(file, "rw");
FileChannel fileChannel = raf.getChannel();
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("", 1234));
// 直接使用了transferTo()进行通道间的数据传输
fileChannel.transferTo(0, fileChannel.size(), socketChannel);
10.内存池的使用
结合零拷贝,效率会很高
11.使用引用计数来回收对象
但对象的引用不是每个对象维护一个AtomicLong,而是使用了AtomicReferenceFieldUpdater节省了内存空间
首先看下使用到的地方
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
private volatile int refCnt;//注意这里的volatile 保证可见性和有序性
再来看下它的定义
/**
* A reflection-based utility that enables atomic updates to
* designated {@code volatile int} fields of designated classes.
* This class is designed for use in atomic data structures in which
* several fields of the same node are independently subject to atomic
* updates.
使用反射原子性的去更新被 volatile 修饰的Int型变量。
其中同一节点的多个字段独立地使用CAS
*
* <p>Note that the guarantees of the {@code compareAndSet}
* method in this class are weaker than in other atomic classes.
* Because this class cannot ensure that all uses of the field
* are appropriate for purposes of atomic access, it can
* guarantee atomicity only with respect to other invocations of
* {@code compareAndSet} and {@code set} on the same updater.
*
这段的大概意思,就是你要保证原子性,就得都使用CAS来赋值,不能一个使用CAS,另一个就直接赋值,这样是没办法保证的。
* @since 1.5
* @author Doug Lea
* @param <T> The type of the object holding the updatable field
*/
public abstract class AtomicIntegerFieldUpdater<T>
最后再来看一下,常用的retain()和release()
private ByteBuf retain0(final int increment) {
int oldRef = refCntUpdater.getAndAdd(this, increment);
if (oldRef <= 0 || oldRef + increment < oldRef) {
// Ensure we don't resurrect (which means the refCnt was 0) and also that we encountered an overflow.
//引用计数为0就不可以retain继续持有了
refCntUpdater.getAndAdd(this, -increment);
throw new IllegalReferenceCountException(oldRef, increment);
}
return this;
}
release()
private boolean release0(int decrement) {
int oldRef = refCntUpdater.getAndAdd(this, -decrement);
if (oldRef == decrement) {
deallocate();
return true;
} else if (oldRef < decrement || oldRef - decrement > oldRef) {
// Ensure we don't over-release, and avoid underflow.
refCntUpdater.getAndAdd(this, decrement);
throw new IllegalReferenceCountException(oldRef, -decrement);
}
return false;
}
12.Set<SelectionKey>重写;
在NioEventLoop的构造函数中openSelector();
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
openSelector()代码如下;
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
//这里假如不需要优化 直接返回
if (DISABLE_KEYSET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
if (!(maybeSelectorImplClass instanceof Class) ||
// ensure the current selector implementation is what we can instrument.
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
return new SelectorTuple(unwrappedSelector);
}
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
可以看下
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
源码如下
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
SelectionKey[] keys;
int size;
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
keys[size++] = o;
if (size == keys.length) {
increaseCapacity();
}
return true;
}
@Override
public int size() {
return size;
}
@Override
public boolean remove(Object o) {
return false;
}
@Override
public boolean contains(Object o) {
return false;
}
@Override
public Iterator<SelectionKey> iterator() {
throw new UnsupportedOperationException();
}
void reset() {
reset(0);
}
void reset(int start) {
Arrays.fill(keys, start, size, null);
size = 0;
}
private void increaseCapacity() {
SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
System.arraycopy(keys, 0, newKeys, 0, size);
keys = newKeys;
}
}
我们可以看到内部其实就是一个数组
13.线程池与任务之间的分离
我们在 NioEventLoopGroup boss = new NioEventLoopGroup(); 的时候其实就把线程池创建好了
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
->chooser = chooserFactory.newChooser(children)
->
这里就是工厂模式
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {//是否是2的幂次方
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
PowerOfTwoEventExecutorChooser(executors)如下
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
new GenericEventExecutorChooser(executors)如下
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
小结,如下
如果分配的总数量是2的幂次方,那么就用 index &(lengtn -1 )
否则index %length
至于为什么与操作会更快?因为cpu层面与就是一个操作指令,而%要先用除指令,再取余.
至于为什么index&(length-1)和index%length相等?
X % 2^n = X & (2^n - 1)
假设n为3,则2^3 = 8,表示成2进制就是1000。2^3 -1 = 7 ,即0111。
此时X & (2^3 - 1) 就相当于取X的2进制的最后三位数。
从2进制角度来看,X / 8相当于 X >> 3,即把X右移3位,此时得到了X / 8的商,而被移掉的部分(后三位),则是X % 8,也就是余数。
如:
6 % 8 = 6 ,6 & 7 = 6
10 % 8 = 2 ,10 & 7 = 2
暂时就先总结这么多,此外内存分配算法感觉也是很精髓的一部分,单独写吧