Netty源码-服务端启动过程

1 概述

本文主要介绍Netty的服务端启动过程。

2 服务端的典型编码

我们首先看下如果使用Netty作为通信框架,服务端源码一般长啥样,下面先列一下简单的代码示例:

public class TimeServer {
    public void bind(int port) throws Exception {
        //定义两个EventLoopGroup,一个负责用户接收并
        //建立客户端连接,另一个负责处理已经建立连接的客户端
        //事件处理,当然也可以只用一个group
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                //因为这里是服务端,所以使用的channel类型
                //为NioServerSocketChannel
                .channel(NioServerSocketChannel.class)
                //设置一些客户端channel的选项,后续接受客户端
                //连接时会用到
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                //设置一些客户端channel属性,后续接受客户端
                //连接时会用到
                .childAttr(AttributeKey.valueOf("childChannelAttr"), "attrValue")
                //设置服务端channel选项,服务端channel实例化后会用到
                .option(ChannelOption.AUTO_READ, true)
                //设置服务端channel属性,服务端channel实例化后会用到
                .attr(AttributeKey.valueOf("serverChannelAttr"), "attrValue")
                //设置客户端pipeline的handler,客户端建立连接之后会
                //在其pipeline中添加该handler
                //在有事件继续时会调用该handler进行处理。
                .childHandler(new ChildChannelHandler())
                //设置服务端的handler,服务端相关事件就绪之后会调用该
                //handler
                .handler(new ServerLoggerHandler());
            //绑定端口启动服务端
            ChannelFuture f = b.bind(port).sync();
            
            f.channel().closeFuture().sync();
            
        } finally {
            //优雅停机
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new TimeServer().bind(8080);
    }
}

//服务端中每个客户端channel使用的handler,ChannelInitializer类型的
//handler一般用于在通道初始化之后注册进行实际工作的handler
class ChildChannelHandler extends ChannelInitializer<Channel> {

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast(new TimerServerHandler());
    }
}

//服务端中每个客户端channel使用的handler
class TimerServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server channelRead");
        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
    }
}

//服务端channel使用的handler
class ServerLoggerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //do some log operation
        ctx.fireChannelRead(msg);
    }
}

3 一些配置函数

在上面的示例代码中,我们连缀调用了许多类ServerBootstrap的配置函数,如下所示,类ServerBootstrap如其名字所示,负责服务端的配置和启动,相应的还有一个类Bootstrap负责客户端的配置和启动,下面我们分别进行讲解:

b.group(bossGroup, workerGroup)
//因为这里是服务端,所以使用的channel类型
//为NioServerSocketChannel
.channel(NioServerSocketChannel.class)
//设置一些客户端channel的选项,后续接受客户端
//连接时会用到
.childOption(ChannelOption.SO_KEEPALIVE, true)
//设置服务端channel选项,服务端channel实例化后会用到
.option(ChannelOption.AUTO_READ, true)
//设置一些客户端channel属性,后续接受客户端
//连接时会用到
.childAttr(AttributeKey.valueOf("childChannelAttr"), "attrValue")
//设置服务端channel属性,服务端channel实例化后会用到
.attr(AttributeKey.valueOf("serverChannelAttr"), "attrValue")
//设置客户端pipeline的handler,客户端建立连接之后会
//在其pipeline中添加该handler
//在有事件继续时会调用该handler进行处理。
.childHandler(new ChildChannelHandler())
//设置服务端的handler,服务端相关事件就绪之后会调用该
//handler
.handler(new ServerLoggerHandler());
  • group
//ServerBootstrap
/**
* Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
* {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
* {@link Channel}'s.
*/
//在ServerBootstrap中有两个线程group,分别为group和childGroup,
//每个group对应一个线程数组,每一个线程(其实就是NioEventLoop类)
//对应一个Selector,负责进行select,服务端如果绑定了多个端口,
//那么每个端口会绑定到group线程组中的某个线程上,
//负责端口接受客户端连接,接受到的客户端
//channel则会绑定到childGroup中的某个线程上,负责select客户端事件
//通过group和childGroup的不同配置,可以实现不同的线程模型,也就是
//Netty实现的不同类型的Reactor模式,具体可自行查询
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    super.group(parentGroup);
    if (childGroup == null) {
        throw new NullPointerException("childGroup");
    }
    if (this.childGroup != null) {
        throw new IllegalStateException("childGroup set already");
    }
    this.childGroup = childGroup;
    return this;
}
  • channel
//
/**
* The {@link Class} which is used to create {@link Channel} instances from.
* You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your
* {@link Channel} implementation has no no-args constructor.
*/
public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

/**
* {@link io.netty.channel.ChannelFactory} which is used to create {@link Channel} instances from
* when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)}
* is not working for you because of some more complex needs. If your {@link Channel} implementation
* has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for
* simplify your code.
*/
@SuppressWarnings({ "unchecked", "deprecation" })
public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
    return channelFactory((ChannelFactory<C>) channelFactory);
}

/**
* @deprecated Use {@link #channelFactory(io.netty.channel.ChannelFactory)} instead.
*/
@Deprecated
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
    if (channelFactory == null) {
        throw new NullPointerException("channelFactory");
    }
    if (this.channelFactory != null) {
        throw new IllegalStateException("channelFactory set already");
    }
    //配置channel会构造一个channelFactory负责实例化配置的channel
    this.channelFactory = channelFactory;
    return self();
}


ServerBootstrap.channel则负责配置服务端对应的channel,Netty对Java NIO的原生channel进行了封装,使其使用更加统一,这里我们配置的是NioServerSocketChannel

  • childOption
//ServerBootstrap
/**
* Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they get created
* (after the acceptor accepted the {@link Channel}). Use a value of {@code null} to remove a previous set
* {@link ChannelOption}.
*/
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
    if (childOption == null) {
        throw new NullPointerException("childOption");
    }
    if (value == null) {
        synchronized (childOptions) {
            childOptions.remove(childOption);
        }
    } else {
        synchronized (childOptions) {
            childOptions.put(childOption, value);
        }
    }
    return this;
}

childOption主要负责在后续接受到客户端连接并建立了客户端channel时,对channel的配置,可配置的一些选项都定义在ChannelOption类的常量中,其中我们常见的一些配置如下:

//ChannelOption
public static final ChannelOption<Boolean> SO_BROADCAST = valueOf("SO_BROADCAST");
public static final ChannelOption<Boolean> SO_KEEPALIVE = valueOf("SO_KEEPALIVE");
public static final ChannelOption<Integer> SO_SNDBUF = valueOf("SO_SNDBUF");
public static final ChannelOption<Integer> SO_RCVBUF = valueOf("SO_RCVBUF");
public static final ChannelOption<Boolean> SO_REUSEADDR = valueOf("SO_REUSEADDR");
public static final ChannelOption<Integer> SO_LINGER = valueOf("SO_LINGER");
public static final ChannelOption<Integer> SO_BACKLOG = valueOf("SO_BACKLOG");
public static final ChannelOption<Integer> SO_TIMEOUT = valueOf("SO_TIMEOUT");

这些配置看着很眼熟,其实对应了java对socket的配置参数,接口java.net.SocketOptions中如下图所示,这里不再介绍,源码中有比较详细的介绍,可自行翻阅其源码,childOption指定的配置会保存在Map childOptions中。

SocketOptions配置.png
  • option
    optionchildOption同理,但是childOption主要负责对建立的客户端连接channel进行配置,而option则负责对服务端的channel进行配置,通过option方法指定的服务端channel配置选项会保存在Map options中。

  • childAttr

//ServerBootstrap
/**
* Set the specific {@link AttributeKey} with the given value on every child {@link Channel}. If the value is
* {@code null} the {@link AttributeKey} is removed
*/
public <T> ServerBootstrap childAttr(AttributeKey<T> childKey, T value) {
    if (childKey == null) {
        throw new NullPointerException("childKey");
    }
    if (value == null) {
        childAttrs.remove(childKey);
    } else {
        childAttrs.put(childKey, value);
    }
    return this;
}

childAttr方法用于为建立的客户端连接的channel指定一些属性,主要是应用自己指定的一些属性,比如打上标签,然后后续进行分类等。保存在childAttrs中。

  • attr
    attrchildAttr方法类似,但是attr则用于为server channel指定一些属性,保存在attrs中。

  • childHandler

//ServerBootstrap
/**
* Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s.
*/
public ServerBootstrap childHandler(ChannelHandler childHandler) {
    if (childHandler == null) {
        throw new NullPointerException("childHandler");
    }
    this.childHandler = childHandler;
    return this;
}

在文章Netty源码-ChannelPipeline和ChannelHandler我们介绍了每个channel都会有一个pipeline,childHandler就是指定在服务端接收客户端连接并建立了客户端连接对应的channel之后,该channel pipeline中对应的handler,常规编码中我们一般会指定一个ChannelInitializer对象,在客户端channel注册之后,向其pipeline中注册多个handler,比如编码handler、解码handler、实际业务handler等。ChannelInitializer handler和其他handler不同的是,在其channelRegistered方法调用完之后其会从pipeline中移除自己。

  • handler
//AbstractBootstrap
/**
* the {@link ChannelHandler} to use for serving the requests.
*/
public B handler(ChannelHandler handler) {
    if (handler == null) {
        throw new NullPointerException("handler");
    }
    this.handler = handler;
    return self();
}

handler方法负责指定服务端channel pipeline中的handler,其实在ServerBootstrap初始化方法init中会向服务端channel注册一个ChannelInitializer,这里通过handler方法指定的handler会在ChannelInitializerinitChannel被添加到pipeline中,除此之外,还会添加一个默认的ServerBootstrapAcceptor,负责处理客户端连接的相关逻辑,其实这个handler方法一般可以不同调用配置的。

上面还有一个知识点,设置通道选项ChannelOption或者属性AttributeKey时都使用了常量池ConstantPool,可见笔者文章Netty源码-常量池ConstantPool对其的简单介绍。

上面介绍了常用的配置方法,下面我们具体看服务端的启动过程。

4 服务端启动

服务端的启动通过ServerBootstrap.bind方法触发,其定义在ServerBootstrap父类AbstractBootstrap中:

//AbstractBootstrap
/**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind(int inetPort) {
    return bind(new InetSocketAddress(inetPort));
}

/**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind(SocketAddress localAddress) {
    //对配置进行验证
    validate();
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    //调用实际的绑定方法
    return doBind(localAddress);
}

/**
* Validate all the parameters. Sub-classes may override this, but should
* call the super method in that case.
*/
public B validate() {
    //首先验证group不能为空
    if (group == null) {
        throw new IllegalStateException("group not set");
    }
    //验证channelFactory不能为空,channelFactory会在上面调用
    //channel配置通道时被初始化,主要用于生成配置的channel实例
    if (channelFactory == null) {
        throw new IllegalStateException("channel or channelFactory not set");
    }
    return self();
}

//doBind主要有三个步骤,第一是实例化channel对象实例,第二是向group
//中的某个NioEventLoop注册自己,即向selector注册channel,第三是在上面
//两件事完成之后,向channel绑定端口
private ChannelFuture doBind(final SocketAddress localAddress) {
    //这里的方法完成第一和第二件个步骤,初始化并注册自己
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    //注册完成之后,则进行第三件步骤,绑定端口
    //如果一二没有完成,则注册一个回调Listener,
    //在一二完成之后子再自动调用Listener进行端口绑定
    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

4.1 channel初始化和注册

AbstractBootstrap.initAndRegister负责通道初始化和注册:

//AbstractBootstrap
final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        //通过channelFactory实例化我们配置的服务端channel
        //NioServerSocketChannel
        channel = channelFactory.newChannel();
        //进行初始化
        init(channel);
    } catch (Throwable t) {
        //异常处理
        if (channel != null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            channel.unsafe().closeForcibly();
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    //向group中的某个NioEventLoop注册自己
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }

    // If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    //    added to the event loop's task queue for later execution.
    //    i.e. It's safe to attempt bind() or connect() now:
    //         because bind() or connect() will be executed *after* the scheduled registration task is executed
    //         because register(), bind(), and connect() are all bound to the same thread.

    return regFuture;
}

AbstractBootstrap.init初始化方法根据服务端和客户端有不同的定义,服务端定义在其子类ServerBootstrap中定义如下:

//ServerBootstrap
@Override
void init(Channel channel) throws Exception {
    //用上面介绍的option方法指定的选项对channel进行配置
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }

    //将上面介绍使用attr方法指定的属性附加到channel上
    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    //获取该服务端channel的pipeline
    ChannelPipeline p = channel.pipeline();

    //下面获取childGroup/childHandler/childOptions以及
    //childAtts
    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }

    //向服务端channel的pipeline中注册ChannelInitializer
    //负责初始化其pipeline
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            //首先添加通过ServerBootstrap.handler方法配置的
            //handler
            if (handler != null) {
                pipeline.addLast(handler);
            }
            //然后如果在该channel注册的NioEventLoop自己的线程中
            //则表示该线程已启动,此时则向服务端channel的pipeline
            //中注册负责处理客户端连接的ServerBootstrapAcceptor
            //传入上面通过child*方法配置的childGroup/
            //childHandler/childOptions以及childAtts
            //其实这里还引申出编码时应该注意的一个问题,因为
            //这里向多个客户端channel的pipeline注册的childHandler
            //为同一个,所以需要多个线程修改数据的可能,比如解码器
            //所以我们一般使用ChannelInitializer注册handler,然后在
            //初始化通道时通过addLast(new *Handler())的方法为每个通道
            //注册一个新的对象实例,避免共享同一个对象实例
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

初始化的逻辑比较简单,到这里已经介绍完毕,下面看server channel是如何进行注册的,在AbstractBootstrap.initAndRegister方法中通过config().group().register(channel)进行注册。其中config返回该channel的默认配置类ServerBootstrapConfigServerBootstrapConfig.groupServerBootstrap.group线程组,所以实际调用的是NioEventLoopGroup.register方法,在其父类MultithreadEventLoopGroup定义如下:

//MultithreadEventLoopGroup
@Override
public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

每个NioEventLoopGroup在其构造函数会实例化多个NioEventLoop对象实例,并通过数组持有这些实例,next方法则使用配置的EventExecutorChooserFactory.EventExecutorChooser选择器从这些EventLoop中选出一个进行注册,默认的选择器工厂类实现DefaultEventExecutorChooserFactory会根据EventLoop数组长度是否为2的指数次方,决定默认的选择器为PowerOfTwoEventExecutorChooser还是GenericEventExecutorChooser,二者目前都采用轮询方式从NioEventLoopGroup中选择NioEventLoop,不同的是如果长度为2的指数次方,则在轮询求余时可以采用& length进行优化,具体可查看二者实现源码,限于篇幅,这里不进行介绍。所以最终调用的是NioEventLoop.register方法,在其父类SingleThreadEventLoop中定义如下:

//SingleThreadEventLoop
@Override
public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

@Override
public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

可见,在SingleThreadEventLoop.register中最终调用了Unsafe.register方法,在其实现类AbstractUnsafe定义如下:

//AbstractUnsafe
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    //如果eventLoop为空,则抛错
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    }
    //如果已经注册了则返回失败
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    //判断该eventloop是否兼容,也就是group里的eventLoop是否
    //和该通道的类型兼容,比如nio类型的通道必须对应nio类型
    //的eventloop
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    AbstractChannel.this.eventLoop = eventLoop;

    //如果当前线程就是该eventLoop对应的线程,则直接调用register0进行
    //实际注册操作,否则将该任务加入到该eventLoop的任务队列中,如果是第
    //一次向其任务队列加入任务,也会同时初始化并启动该eventLoop关联的
    //线程
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}

//进行实际的注册工作
private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        //真正的注册工作是放在AbstractUnsafe外部类AbstractChannel
        //中定义的
        doRegister();
        neverRegistered = false;
        //将是否注册过的标识置为true,上面有判断该通道是否已经注册过
        //就是通过该变量标识的
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        //注册完成,触发handler的channelRegistered方法
        pipeline.fireChannelRegistered();
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (isActive()) {
            if (firstRegistration) {
                //如果通道已经处于激活状态,则触发
                //handler的channelActive方法
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                //向eventLoop的selector注册通道时并没有设置
                //感兴趣的事件,这里调用此方法注册感兴趣的事件
                //比如OP_ACCEPT等
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

AbstractChannel.doRegister方法,我们看其在子类AbstractNioChannel的定义:

@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    //通过多次循环,如果没有抛错,则持续注册,直到注册成功
    for (;;) {
        try {
            //这里也看到了熟悉的Java NIO的注册操作,将该Netty channel
            //关联的java channel注册到该eventLoop持有的selector中
            //注册时,并没有指定感兴趣的事件
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            //如果抛出异常,则尝试调用selectNow看是否能再次注册
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

上面提到AbstractUnsafe.register0注册成功之后调用beginRead方法,其实现比较简单,可自行查看源码,主要是向方法doRegister注册之后返回的SelectionKey注册感兴趣的对象。Java NIO编程中,事件注册有两种方式,一种是在向Selector注册channel时同时指定感兴趣的事件,另一种是设置返回的SelectionKey感兴趣的事件。这里的beginRead方法调用还需要配置ChannelOption.AUTO_READ为true,否则可以在自己定义的Handler中调用ChannelHandlerContext.read方法,这个方法的调用最终也会调用beginRead注册OP_ACCEPT事件。

至此,我们已经介绍了服务端启动过程中三个步骤中的前两个:初始化和注册,下面介绍绑定。

4.2 绑定

我们在第4节的开始介绍ServerBootstrap.doBind方法时,介绍了服务端启动主要完成三个步骤:初始化、注册和绑定,初始化和注册完成之后,会进行绑定操作。绑定操作在doBind0方法中完成,其在AbstractBootstrap定义如下:

//AbstractBootstrap
private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                //在通道中绑定
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

channel.bind方法定义如下:

//AbstractChannel
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
}

调用pipeline.bind方法进行绑定,可见最终是通过调用pipeline中注册的handler进行绑定的,因为绑定属于outbound事件(关于pipeline、handler、outbound事件可见笔者文章Netty源码-ChannelPipeline和ChannelHandler),所以最后调用HeadContext.bind

//HeadContext
@Override
public void bind(
        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
        throws Exception {
    //调用Unsafe.bind进行绑定
    unsafe.bind(localAddress, promise);
}

//AbstractUnsafe
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();

    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }

    // See: https://github.com/netty/netty/issues/576
    if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
        localAddress instanceof InetSocketAddress &&
        !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
        !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
        // Warn a user about the fact that a non-root user can't receive a
        // broadcast packet on *nix if the socket is bound on non-wildcard address.
        logger.warn(
                "A non-root user can't receive a broadcast packet if the socket " +
                "is not bound to a wildcard address; binding to a non-wildcard " +
                "address (" + localAddress + ") anyway as requested.");
    }

    boolean wasActive = isActive();
    try {
        //调用实际的绑定工作,doBind
        //是AbstractUnsafe外部类AbstractChannel的方法
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }

    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }

    safeSetSuccess(promise);
}

AbstractChannel.doBind我们看下其在子类NioServerSocketChannel中的实现:

//NioServerSocketChannel
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    //根据Java版本号,在java channel上进行绑定
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}

Netty服务端的启动大概就是这些内容,这里主要介绍了主要的启动流程,其中的具体细节比如PromiseNioEventLoop实例化等没有详细介绍。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容