Netty源码分析——服务端启动

基于Netty源代码版本:netty-all-4.1.33.Final

前言

通过Netty的ServerBootstrap的实例入手对其进行一个简单的分析。
来先看看服务端的代码:

public class MyServer {

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); 
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            //初始化一个服务端引导类
            ServerBootstrap serverBootstrap = new ServerBootstrap(); 
            serverBootstrap.group(bossGroup,workerGroup)//设置线程组
                    .channel(NioServerSocketChannel.class)//设置ServerSocketChannel的IO模型  分为epoll与Nio
                    .childHandler(new MyServerInitializer())//这个handler 将会在每个客户端连接的时候调用。供 SocketChannel 使用。
                    .handler(new SimpleServerHandler())//这个hanlder 只专属于 ServerSocketChannel 而不是 SocketChannel。
                    .option(ChannelOption.SO_BACKLOG, 128)//设置option参数,保存成一个LinkedHashMap<ChannelOption<?>, Object>()
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

本文将从源码的角度分析ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();的内部实现。这样就完成了Netty服务器端启动过程的源码分析。

我们首先要知道,netty服务的启动其实可以分为以下四步:

  • 创建服务端Channel
  • 初始化服务端Channel
  • 注册Selector
  • 端口绑定

一、创建服务端Channel

1、服务端Channel的创建,主要为以下流程

AbstractBootstrap.java

/**
 * 创建一个新的Channel并绑定端口
 */
public ChannelFuture bind(int inetPort) {
    return bind(new InetSocketAddress(inetPort));
}

我们接着看重载的bind

public ChannelFuture bind(SocketAddress localAddress) {
    validate(); //相关参数的检查
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    return doBind(localAddress);//主要实现方法
}

该函数主要看两点:validate()和doBind(localAddress)
validate()方法

//函数功能:检查相关参数是否设置了
public B validate() {
    if (group == null) {//这里的group指的是:serverBootstrap.group(bossGroup, workerGroup)代码中的bossGroup
        throw new IllegalStateException("group not set");
    }
    if (channelFactory == null) {
        throw new IllegalStateException("channel or channelFactory not set");
    }
    return self();
}

该方法主要检查了两个参数,一个是group,一个是channelFactory,在这里可以想一想这两个参数是在哪里以及何时被赋值的?答案是在如下代码块中被赋值的,其中是将bossGroup赋值给了group,将BootstrapChannelFactory赋值给了channelFactory.

 ServerBootstrap serverBootstrap = new ServerBootstrap(); 
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)

doBind(localAddress)方法

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();// 初始化并创建 NioServerSocketChannel
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

doBind这个函数是我们要分析的重点,这个函数的主要工作有如下几点:

  • 1、通过initAndRegister()方法得到一个ChannelFuture的实例regFuture。
  • 2、通过regFuture.cause()方法判断是否在执行initAndRegister方法时产生来异常。如果产生来异常,则直接返回,如果没有产生异常则进行第3步。
  • 3、通过regFuture.isDone()来判断initAndRegister方法是否执行完毕,如果执行完毕来返回true,然后调用doBind0进行socket绑定。如果没有执行完毕则返回false进行第4步。
  • 4、regFuture会添加一个ChannelFutureListener监听,当initAndRegister执行完成时,调用operationComplete方法并执行doBind0进行socket绑定。

第3、4点想干的事就是一个:调用doBind0方法进行socket绑定。

下面将分成4部分对每行代码具体做了哪些工作进行详细分析。
initAndRegister()
该方法的具体代码如下:

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        //结论:这里的channel为一个NioServerSocketChannel对象,具体分析见后面
        channel = channelFactory.newChannel();// 通过 反射工厂创建一个 NioServerSocketChannel
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            channel.unsafe().closeForcibly();
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    // If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    //    added to the event loop's task queue for later execution.
    //    i.e. It's safe to attempt bind() or connect() now:
    //         because bind() or connect() will be executed *after* the scheduled registration task is executed
    //         because register(), bind(), and connect() are all bound to the same thread.

    return regFuture;
}

通过函数名以及内部调用的函数可以猜测该函数干了两件事情:
1、初始化一个Channel,要想初始化,肯定要先得到一个Channel。

Channel channel = channelFactory.newChannel();
init(channel);

2、将Channel进行注册。

ChannelFuture regFuture = config().group().register(channel);

下面我们将分析这几行代码内部干来些什么。
Channel channel = channelFactory.newChannel();
在上一篇文章中Netty源码分析--ServerBootstrap分析中,我们知道serverBootstrap.channel(NioServerSocketChannel.class)的功能为:设置父类属性channelFactory 为: ReflectiveChannelFactory类的对象。其中这里ReflectiveChannelFactory对象中包括一个clazz属性为:NioServerSocketChannel.class

因此,final Channel channel = channelFactory().newChannel();就是调用的ReflectiveChannelFactory类中的newChannel()方法,该方法的具体内容为:

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Constructor<? extends T> constructor;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        ObjectUtil.checkNotNull(clazz, "clazz");
        try {
            this.constructor = clazz.getConstructor();
        } catch (NoSuchMethodException e) {
            throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
                    " does not have a public non-arg constructor", e);
        }
    }

    @Override
    public T newChannel() {
        try {
            return constructor.newInstance();//反射创建
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
        }
    }

    @Override
    public String toString() {
        return StringUtil.simpleClassName(ReflectiveChannelFactory.class) +
                '(' + StringUtil.simpleClassName(constructor.getDeclaringClass()) + ".class)";
    }
}

看到这个类,我们可以得到的结论:final Channel channel = channelFactory().newChannel();这行代码的作用为通过反射产生来一个NioServerSocketChannel类的实例。

NioServerSocketChannel构造器

下面将看下NioServerSocketChannel类的构造函数做了哪些工作。
NioServerSocketChannel类的继承体系结构如下:


其无参构造函数如下:

 public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));//传入默认的SelectorProvider
}

无参构造函数中private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

函数newSocket的功能为:利用SelectorProvider产生一个ServerSocketChannel对象。

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        /**
         *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
         *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
         *
         *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
         */
        return provider.openServerSocketChannel();//可以看到创建的是jdk底层的ServerSocketChannel 
    } catch (IOException e) {
        throw new ChannelException(
                "Failed to open a server socket.", e);
    }
}

public ServerSocketChannel openServerSocketChannel() throws IOException {
    return new ServerSocketChannelImpl(this);
}

无参构造函数通过newSocket函数产生了一个ServerSocketChannel对象
然后调用了如下构造函数,我们继续看

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);//调用父类构造函数,传入创建的channel
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

//父类AbstractNioMessageChannel的构造函数
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent, ch, readInterestOp);
}

//父类 AbstractNioChannel的构造函数
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;//这个ch就是传入的通过jdk创建的Channel
    this.readInterestOp = readInterestOp;//SelectionKey.OP_ACCEPT
    try {
        ch.configureBlocking(false);//设置当前的ServerSocketChannel为非阻塞的
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to close a partially initialized socket.", e2);
            }
        }

        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}

//父类AbstractChannel的构造函数
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();//创建Channel唯一标识 
    unsafe = newUnsafe();//netty封装的TCP 相关操作类
    pipeline = newChannelPipeline();//逻辑链
}

ReflectiveChannelFactory类中的newChannel()方法反射产生一个NioServerSocketChannel实例对象时,调用上面这么多构造函数主要干了两件事情:

  • 1、产生来一个ServerSocketChannel类的实例,设置到ch属性中,并设置为非阻塞的。
  • 2、调用AbstractChannel这个抽象类的构造函数设置Channel的id(每个Channel都有一个id,唯一标识),unsafe(tcp相关底层操作),pipeline(逻辑链)等,而不管是服务的Channel还是客户端的Channel都继承自这个抽象类,他们也都会有上述相应的属性
this.ch = ch;//ch为一个ServerSocketChannel
ch.configureBlocking(false);
  • 2、设置了config属性
ServerSocketChannelConfig config = new NioServerSocketChannelConfig(this, javaChannel().socket());
  • 3、设置SelectionKey.OP_ACCEPT事件
this.readInterestOp = readInterestOp;//SelectionKey.OP_ACCEPT
  • 4、设置unsafe属性
@Override
protected AbstractNioUnsafe newUnsafe() {
    return new NioByteUnsafe();
}

主要作用为:用来负责底层的connect、register、read和write等操作。

  • 5、设置pipeline属性
pipeline = newChannelPipeline();

protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}

每个Channel都有自己的pipeline,当有请求事件发生时,pipeline负责调用相应的hander进行处理。
这些属性在后面都会用到,至于NioServerSocketChannel 对象中的unsafe、pipeline属性的具体实现后面进行分析。

结论:
Channel channel = channelFactory().newChannel();这行代码的作用为通过反射产生来一个NioServerSocketChannel类的实例,其中这个NioServerSocketChannel类对象有这样几个属性:SocketChannel、NioServerSocketChannelConfig 、SelectionKey.OP_ACCEPT事件、NioMessageUnsafe、DefaultChannelPipeline

二、初始化服务端创建的Channel

init(channel);// 初始化这个 NioServerSocketChannel

我们首先列举下init(channel)中具体都做了哪了些功能:

  • 设置ChannelOptions、ChannelAttrs ,配置服务端Channel的相关属性
  • 设置ChildOptions、ChildAttrs,配置每个新连接的Channel的相关属性
  • Config handler,配置服务端pipeline
  • add ServerBootstrapAcceptor,添加连接器,对accpet接受到的新连接进行处理,添加一个nio线程
    init方法的具体代码如下:
@Override
void init(Channel channel) throws Exception {
    //1、设置新接入channel的option
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        setChannelOptions(channel, options, logger);//设置NioServerSocketChannel相应的TCP参数,其实这一步就是把options设置到channel的config中
    }
    //2、设置新接入channel的attr
    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }
    //3、设置handler到pipeline上
    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;

         //可以看到两个都是局部变量,会在下面设置pipeline时用到
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
    }

    //p.addLast()向ChannelPipeline添加一个ChannelInitializer并重写initChannel()方法
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            //这里的handler()返回的就是serverBootstrap.handler(new SimpleServerHandler())所设置的SimpleServerHandler
            ChannelHandler handler = config.handler();
            if (handler != null) {
                //如果serverBootstrap设置了handler()那么将handler()方法的值设置到pipeline的最后
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                     //在这里会把我们自定义的ChildGroup、ChildHandler、ChildOptions、ChildAttrs相关配置传入到ServerBootstrapAcceptor接收器的构造函数中,并绑定到新的连接上
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

该函数的功能为:

  • 1、设置channel的options
    如果没有设置,则options为空,该属性在ServerBootstrap类中的定义如下
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();

options可能如下:DefaultChannelConfig中的setOption()

public <T> boolean setOption(ChannelOption<T> option, T value) {
    validate(option, value);

    if (option == CONNECT_TIMEOUT_MILLIS) {
        setConnectTimeoutMillis((Integer) value);
    } else if (option == MAX_MESSAGES_PER_READ) {
        setMaxMessagesPerRead((Integer) value);
    } else if (option == WRITE_SPIN_COUNT) {
        setWriteSpinCount((Integer) value);
    } else if (option == ALLOCATOR) {
        setAllocator((ByteBufAllocator) value);
    } else if (option == RCVBUF_ALLOCATOR) {
        setRecvByteBufAllocator((RecvByteBufAllocator) value);
    } else if (option == AUTO_READ) {
        setAutoRead((Boolean) value);
    } else if (option == AUTO_CLOSE) {
        setAutoClose((Boolean) value);
    } else if (option == WRITE_BUFFER_HIGH_WATER_MARK) {
        setWriteBufferHighWaterMark((Integer) value);
    } else if (option == WRITE_BUFFER_LOW_WATER_MARK) {
        setWriteBufferLowWaterMark((Integer) value);
    } else if (option == WRITE_BUFFER_WATER_MARK) {
        setWriteBufferWaterMark((WriteBufferWaterMark) value);
    } else if (option == MESSAGE_SIZE_ESTIMATOR) {
        setMessageSizeEstimator((MessageSizeEstimator) value);
    } else if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) {
        setPinEventExecutorPerGroup((Boolean) value);
    } else {
        return false;
    }

    return true;
}
  • 2、设置channel的attrs
    如果没有设置,则attrs为空,该属性在ServerBootstrap类中的定义如下
private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
  • 3、如果serverBootstrap.handler()有值,那么将handler的参数设置到channel的pipeline上。
  • 4、在pipeline上添加来一个ChannelInitializer对象,其中重写来initChannel方法。该方法通过p.addLast()向serverChannel的流水线处理器中加入了一个 ServerBootstrapAcceptor,
    从名字上就可以看出来,这是一个接入器,专门接受新请求,把新的请求扔给某个事件循环器
    看到这里,我们发现其实init只是初始化了一些基本的配置和属性,以及在pipeline上加入了一个接入器,用来专门接受新连接,并没有启动服务。

三、注册Selector

一个服务端的Channel创建完毕后,下一步就是要把它注册到一个事件轮询器Selector上,在initAndRegister()中我们把上面初始化的Channel进行注册

ChannelFuture regFuture = config().group().register(channel);//注册我们已经初始化过的Channel

回到 initAndRegister 方法中,继续看 config().group().register(channel) 这行代码,config 方法返回了 ServerBootstrapConfig,这个 ServerBootstrapConfig 调用了 group 方法,实际上就是 bossGroup。bossGroup 调用了 register 方法。

前面的分析我们知道group为:NioEvenLoopGroup,其继承MultithreadEventLoopGroup,该类中的register方法如下:

@Override
public ChannelFuture register(Channel channel) {
    //调用了NioEvenLoop对象中的register方法,NioEventLoop extends SingleThreadEventLoop
    return next().register(channel);
}

next()方法的代码如下,其功能为选择下一个NioEventLoop对象。

@Override
public EventLoop next() {
    return (EventLoop) super.next();
}

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
    ......
    @Override
    public EventExecutor next() {
        return chooser.next();
    }
    ......
}

根据线程个数nThreads是否为2的幂次方来选择chooser,其中这两个chooser为: PowerOfTwoEventExecutorChooser、GenericEventExecutorChooser,这两个chooser功能都是一样,只是求余的方式不一样。

next()方法返回的是一个NioEvenLoop对象

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        return executors[idx.getAndIncrement() & executors.length - 1];
    }
}

private static final class GenericEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    GenericEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        return executors[Math.abs(idx.getAndIncrement() % executors.length)];
    }
}

结论:由于NioEventLoopGroup中维护着多个NioEventLoop,next方法回调用chooser策略找到下一个NioEventLoop,并执行该对象的register方法进行注册。

由于NioEventLoop extends SingleThreadEventLoop,NioEventLoop没有重写该方法,因此看 SingleThreadEventLoop类中的register方法

@Override
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

@Deprecated
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    if (promise == null) {
        throw new NullPointerException("promise");
    }

    channel.unsafe().register(this, promise);
    return promise;
}

在本文NioServerSocketChannel实例化中设置来unsafe属性,具体是调用如下的方法来设置的,因此这里的channel.unsafe()就是NioMessageUnsafe实例。

public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
    ......
    @Override
    protected AbstractNioUnsafe newUnsafe() {
        return new NioMessageUnsafe();
    }
    ......
}

channel.unsafe().register(this, promise)这行代码调用的是AbstractUnsafe类中的register方法,具体代码如下:

 /**
  * 1、先是一系列的判断。
  * 2、判断当前线程是否是给定的 eventLoop 线程。注意:这点很重要,Netty 线程模型的高性能取决于对于当前执行的Thread 的身份的确定。如果不在当前线程,那么就需要很多同步措施(比如加锁),上下文切换等耗费性能的操作。
  * 3、异步(因为我们这里直到现在还是 main 线程在执行,不属于当前线程)的执行 register0 方法。
  */
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    }
    //判断该channel是否已经被注册到EventLoop中
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    //1 将eventLoop设置在NioServerSocketChannel上
    AbstractChannel.this.eventLoop = eventLoop;

     //判断当前线程是否为该EventLoop中拥有的线程,如果是,则直接注册,如果不是,则添加一个任务到该线程中
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {//重点
                @Override
                public void run() {
                    register0(promise);//实际的注册过程
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}

上面的重点是register0(promise)方法。基本逻辑为:

  • 1、通过调用eventLoop.inEventLoop()方法判断当前线程是否为该EventLoop中拥有的线程,如果是,则直接注册,如果不是,说明该EventLoop在等待并没有执行权,则进行第二步。

对整个注册的流程做一个梳理


AbstractEventExecutor

public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
    @Override
    public boolean inEventLoop() {
        return inEventLoop(Thread.currentThread());
    }
}

SingleThreadEventExecutor

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    @Override
    public boolean inEventLoop(Thread thread) {
        return thread == this.thread;
    }
}
  • 2、既然该EventLoop中的线程此时没有执行权,但是我们可以提交一个任务到该线程中,等该EventLoop的线程有执行权的时候就自然而然的会执行此任务,而该任务负责调用register0方法,这样也就达到了调用register0方法的目的。

下面看register0这个方法,具体代码如下:

private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        //当寄存器调用在事件循环之外时,检查通道是否仍然打开,因为它可以在同一时间关闭
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        doRegister();//jdk channel的底层注册
        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
                // 触发绑定的handler事件
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        //执行完,控制台输出:channelRegistered,最终会调用到ChannelInboundHandlerAdapter实现类的channelRegistered()
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

在上面的代码中,是通过调用AbstractNioChannel中doRegister()的具体实现就是把jdk底层的channel绑定到eventLoop的selecor上

public abstract class AbstractNioChannel extends AbstractChannel {
    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                //把channel注册到eventLoop上的selector上
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

    protected SelectableChannel javaChannel() {
        return ch;
    }
}

在本文的NioServerSocketChannel的实例化分析中,我们知道这里的javaChannel()方法返回的ch为实例化NioServerSocketChannel时产生的一个SocketChannelImpl类的实例,并设置为非阻塞的。
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);就完成了ServerSocketChannel注册到Selector中。

ServerSocketChannel注册完之后,接着执行pipeline.fireChannelRegistered方法。

@Override
public final ChannelPipeline fireChannelRegistered() {
    AbstractChannelHandlerContext.invokeChannelRegistered(head);
    return this;
}

static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }
}

看next.invokeChannelRegistered();

private void invokeChannelRegistered() {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRegistered(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRegistered();
    }
}

接着看看this.handler(),实际上就是head的handler()

public class DefaultChannelPipeline implements ChannelPipeline {
    @Override
    public ChannelHandler handler() {
        return this;
    }
}

返回的是this,那接着看((ChannelInboundHandler) handler()).channelRegistered(this);

public class DefaultChannelPipeline implements ChannelPipeline {
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) {
        invokeHandlerAddedIfNeeded();
        ctx.fireChannelRegistered();
    }
}

继续看ctx.fireChannelRegistered();

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {
    @Override
    public ChannelHandlerContext fireChannelRegistered() {
        invokeChannelRegistered(findContextInbound());
        return this;
    }
}

我们看看findContextInbound()

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}

我们看到 ctx = ctx.next; 实际上是从head开始找,找到第一个 inbound 的hander

static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }
}

最后执行next.invokeChannelRegistered();

pipeline中维护了handler链表,还记得之前.handler(new SimpleServerHandler())初始化的handler中介绍了此handler被添加到此pipeline中了,通过遍历链表,执行InBound类型handler的channelRegistered方法

因此执行到这里,我们的控制台就回输出:channelRegistered,这行信息。
到这里,我们就将doBind方法final ChannelFuture regFuture = initAndRegister();给分析完了,得到的结论如下:

  • 1、通过反射产生了一个NioServerSocketChannle对象。
  • 2、完成了初始化
  • 3、将NioServerSocketChannel进行了注册。

四、端口绑定

首先我们梳理下netty中服务端口绑定的流程


接下来我们分析doBind方法的剩余部分代码主要做了什么,

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

doBind0(regFuture, channel, localAddress, promise);

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

该函数主要是提交了一个Runnable任务到NioEventLoop线程中来进行处理。,这里先看一下NioEventLoop类的execute方法

@Override
public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }

    //判断当前线程是否为该NioEventLoop所关联的线程,如果是,则添加任务到任务队列中,
    //如果不是,则先启动线程,然后添加任务到任务队列中去
    boolean inEventLoop = inEventLoop();
    addTask(task);
    if (!inEventLoop) {
        startThread();
        if (isShutdown()) {
            boolean reject = false;
            try {
                if (removeTask(task)) {
                    reject = true;
                }
            } catch (UnsupportedOperationException e) {
                // The task queue does not support removal so the best thing we can do is to just move on and
                // hope we will be able to pick-up the task before its completely terminated.
                // In worst case we will log on termination.
            }
            if (reject) {
                reject();
            }
        }
    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {
        wakeup(inEventLoop);
    }
}

当提交的任务被线程执行后,则会执行channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);这行代码,这行代码完成的功能为:实现channel与端口的绑定。

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    @Override
    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
    }
}

在该方法中直接调用了pipeline的bind方法,这里的pipeline时DefaultChannelPipeline的实例。

tail = new TailContext(this);

@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}

在上面方法中直接调用了TailContext实例tail的bind方法,tail在下一篇博文中有详细的介绍。继续看tail实例的bind方法

@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }

    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeBind(localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeBind(localAddress, promise);
            }
        }, promise, null);
    }
    return promise;
}

此上面bind函数中的这行代码:final AbstractChannelHandlerContext next = findContextOutbound();所完成的任务就是在pipeline所持有的以AbstractChannelHandlerContext为节点的双向链表中从尾节点tail开始向前寻找第一个outbound=true的handler节点。

private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
}

在 DefaultChannelPipeline 的构造器中, 会实例化两个对象: head 和 tail, 并形成了双向链表的头和尾。 head 是 HeadContext 的实例, 它实现了 ChannelOutboundHandler 接口和ChannelInboundHandler 接口, 并且它的 outbound 字段为 true.而tail 是 TailContext 的实例,它实现了ChannelInboundHandler 接口,并且其outbound 字段为 false,inbound 字段为true。 基于此在如上的bind函数中调用了 findContextOutbound方法 找到的 AbstractChannelHandlerContext 对象其实就是 head.

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

继续看,在pipelie的双向链表中找到第一个outbound=true的AbstractChannelHandlerContext节点head后,然后调用此节点的invokeConnect方法,该方法的代码如下:

private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
    if (invokeHandler()) {
        try {
            ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    } else {
        bind(localAddress, promise);
    }
}

HeadContext类中的handler()方法代码如下:

@Override
public ChannelHandler handler() {
    return this;
}

该方法返回的是其本身,这是因为HeadContext由于其继承AbstractChannelHandlerContext以及实现了ChannelHandler接口使其具有Context和Handler双重特性。

继续看,看HeadContext类中的bind方法,代码如下:

@Override
public void bind(
        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
        throws Exception {
    unsafe.bind(localAddress, promise);
}

unsafe这个字段是在HeadContext构造函数中被初始化的,如下:

HeadContext(DefaultChannelPipeline pipeline) {
    super(pipeline, null, HEAD_NAME, true, true);
    unsafe = pipeline.channel().unsafe();
    setAddComplete();
}

而此构造函数中的pipeline.channel().unsafe()这行代码返回的就是在本博文前面研究NioServerSocketChannel这个类的构造函数中所初始化的一个实例,如下:

unsafe = newUnsafe();//newUnsafe()方法返回的是NioMessageUnsafe对象。  

接下来看NioMessageUnsafe类中的bind方法(准确来说:该方法在AbstractUnsafe中),该类bind具体方法代码如下:

@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();

    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }

    // See: https://github.com/netty/netty/issues/576
    if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
        localAddress instanceof InetSocketAddress &&
        !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
        !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
        // Warn a user about the fact that a non-root user can't receive a
        // broadcast packet on *nix if the socket is bound on non-wildcard address.
        logger.warn(
                "A non-root user can't receive a broadcast packet if the socket " +
                "is not bound to a wildcard address; binding to a non-wildcard " +
                "address (" + localAddress + ") anyway as requested.");
    }

    boolean wasActive = isActive();//判断绑定是否完成
    try {
        doBind(localAddress);//底层jdk绑定端口
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }

    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();//触发ChannelActive事件
            }
        });
    }

    safeSetSuccess(promise);
}

上面的核心代码就是:doBind(localAddress);需要注意的是,此doBind方法是在NioServerSocketChannel类中的doBind方法,不是其他类中的。
在doBind(localAddress, config.getBacklog())中netty实现了jdk底层端口的绑定

@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

上面方法中javaChannel()方法返回的是NioServerSocketChannel实例初始化时所产生的Java NIO ServerSocketChannel实例(更具体点为ServerSocketChannelImple实例)。 等价于serverSocketChannel.socket().bind(localAddress)完成了指定端口的绑定,这样就开始监听此端口。绑定端口成功后,是这里调用了我们自定义handler的channelActive方法,在绑定之前,isActive()方法返回false,绑定之后返回true。

在 pipeline.fireChannelActive()中会触发pipeline中的channelActive()方法

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.fireChannelActive();

    readIfIsAutoRead();
}

在channelActive中首先会把ChannelActive事件往下传播,然后调用readIfIsAutoRead()方法出触发channel的read事件,而它最终调用AbstractNioChannel中的doBeginRead()方法

@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;
    if (!selectionKey.isValid()) {
        return;
    }

    readPending = true;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);//readInterestOp为  SelectionKey.OP_ACCEPT
    }
}

在doBeginRead()方法,netty会把accept事件注册到Selector上。

到此我们对netty服务端的启动流程有了一个大致的了解,整体可以概括为下面四步:

  • 1、channelFactory.newChannel(),其实就是创建jdk底层channel,并初始化id、piepline等属性。
  • 2、init(channel),初始化NioServerSocketChannel,设置option、attr等属性,并添加ServerBootstrapAcceptor连接器,并触发addHandler事件。
  • 3、config().group().register(channel),通过 ServerBootstrap 的 bossGroup 根据group长度取模得到NioEventLoop ,将 NioServerSocketChannel 注册到 NioEventLoop 中的 selector 上,然后触发 channelRegistered事件。
  • 4、doBind0(regFuture, channel, localAddress, promise),完成服务端端口的监听,并把accept事件注册到selector上。

参考:
https://www.cnblogs.com/java-chen-hao/p/11460395.html

https://www.cnblogs.com/dafanjoy/p/9810189.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容