
1 概述


2 服务端的典型编码


public class TimeServer {
    public void bind(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
  , workerGroup)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childAttr(AttributeKey.valueOf("childChannelAttr"), "attrValue")
                .option(ChannelOption.AUTO_READ, true)
                .attr(AttributeKey.valueOf("serverChannelAttr"), "attrValue")
                .childHandler(new ChildChannelHandler())
                .handler(new ServerLoggerHandler());
            ChannelFuture f = b.bind(port).sync();
        } finally {

    public static void main(String[] args) throws Exception {
        new TimeServer().bind(8080);

class ChildChannelHandler extends ChannelInitializer<Channel> {

    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast(new TimerServerHandler());

class TimerServerHandler extends ChannelInboundHandlerAdapter {

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server channelRead");

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

class ServerLoggerHandler extends ChannelInboundHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //do some log operation

3 一些配置函数

在上面的示例代码中,我们连缀调用了许多类ServerBootstrap的配置函数,如下所示,类ServerBootstrap如其名字所示,负责服务端的配置和启动,相应的还有一个类Bootstrap负责客户端的配置和启动,下面我们分别进行讲解:, workerGroup)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.AUTO_READ, true)
.childAttr(AttributeKey.valueOf("childChannelAttr"), "attrValue")
.attr(AttributeKey.valueOf("serverChannelAttr"), "attrValue")
.childHandler(new ChildChannelHandler())
.handler(new ServerLoggerHandler());
  • group
* Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
* {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
* {@link Channel}'s.
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {;
    if (childGroup == null) {
        throw new NullPointerException("childGroup");
    if (this.childGroup != null) {
        throw new IllegalStateException("childGroup set already");
    this.childGroup = childGroup;
    return this;
  • channel
* The {@link Class} which is used to create {@link Channel} instances from.
* You either use this or {@link #channelFactory(} if your
* {@link Channel} implementation has no no-args constructor.
public B channel(Class<? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));

* {@link} which is used to create {@link Channel} instances from
* when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)}
* is not working for you because of some more complex needs. If your {@link Channel} implementation
* has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for
* simplify your code.
@SuppressWarnings({ "unchecked", "deprecation" })
public B channelFactory(<? extends C> channelFactory) {
    return channelFactory((ChannelFactory<C>) channelFactory);

* @deprecated Use {@link #channelFactory(} instead.
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
    if (channelFactory == null) {
        throw new NullPointerException("channelFactory");
    if (this.channelFactory != null) {
        throw new IllegalStateException("channelFactory set already");
    this.channelFactory = channelFactory;
    return self();

ServerBootstrap.channel则负责配置服务端对应的channel,Netty对Java NIO的原生channel进行了封装,使其使用更加统一,这里我们配置的是NioServerSocketChannel

  • childOption
* Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they get created
* (after the acceptor accepted the {@link Channel}). Use a value of {@code null} to remove a previous set
* {@link ChannelOption}.
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
    if (childOption == null) {
        throw new NullPointerException("childOption");
    if (value == null) {
        synchronized (childOptions) {
    } else {
        synchronized (childOptions) {
            childOptions.put(childOption, value);
    return this;


public static final ChannelOption<Boolean> SO_BROADCAST = valueOf("SO_BROADCAST");
public static final ChannelOption<Boolean> SO_KEEPALIVE = valueOf("SO_KEEPALIVE");
public static final ChannelOption<Integer> SO_SNDBUF = valueOf("SO_SNDBUF");
public static final ChannelOption<Integer> SO_RCVBUF = valueOf("SO_RCVBUF");
public static final ChannelOption<Boolean> SO_REUSEADDR = valueOf("SO_REUSEADDR");
public static final ChannelOption<Integer> SO_LINGER = valueOf("SO_LINGER");
public static final ChannelOption<Integer> SO_BACKLOG = valueOf("SO_BACKLOG");
public static final ChannelOption<Integer> SO_TIMEOUT = valueOf("SO_TIMEOUT");

这些配置看着很眼熟,其实对应了java对socket的配置参数,接口中如下图所示,这里不再介绍,源码中有比较详细的介绍,可自行翻阅其源码,childOption指定的配置会保存在Map childOptions中。

  • option
    optionchildOption同理,但是childOption主要负责对建立的客户端连接channel进行配置,而option则负责对服务端的channel进行配置,通过option方法指定的服务端channel配置选项会保存在Map options中。

  • childAttr

* Set the specific {@link AttributeKey} with the given value on every child {@link Channel}. If the value is
* {@code null} the {@link AttributeKey} is removed
public <T> ServerBootstrap childAttr(AttributeKey<T> childKey, T value) {
    if (childKey == null) {
        throw new NullPointerException("childKey");
    if (value == null) {
    } else {
        childAttrs.put(childKey, value);
    return this;


  • attr
    attrchildAttr方法类似,但是attr则用于为server channel指定一些属性,保存在attrs中。

  • childHandler

* Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s.
public ServerBootstrap childHandler(ChannelHandler childHandler) {
    if (childHandler == null) {
        throw new NullPointerException("childHandler");
    this.childHandler = childHandler;
    return this;

在文章Netty源码-ChannelPipeline和ChannelHandler我们介绍了每个channel都会有一个pipeline,childHandler就是指定在服务端接收客户端连接并建立了客户端连接对应的channel之后,该channel pipeline中对应的handler,常规编码中我们一般会指定一个ChannelInitializer对象,在客户端channel注册之后,向其pipeline中注册多个handler,比如编码handler、解码handler、实际业务handler等。ChannelInitializer handler和其他handler不同的是,在其channelRegistered方法调用完之后其会从pipeline中移除自己。

  • handler
* the {@link ChannelHandler} to use for serving the requests.
public B handler(ChannelHandler handler) {
    if (handler == null) {
        throw new NullPointerException("handler");
    this.handler = handler;
    return self();

handler方法负责指定服务端channel pipeline中的handler,其实在ServerBootstrap初始化方法init中会向服务端channel注册一个ChannelInitializer,这里通过handler方法指定的handler会在ChannelInitializerinitChannel被添加到pipeline中,除此之外,还会添加一个默认的ServerBootstrapAcceptor,负责处理客户端连接的相关逻辑,其实这个handler方法一般可以不同调用配置的。



4 服务端启动


* Create a new {@link Channel} and bind it.
public ChannelFuture bind(int inetPort) {
    return bind(new InetSocketAddress(inetPort));

* Create a new {@link Channel} and bind it.
public ChannelFuture bind(SocketAddress localAddress) {
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    return doBind(localAddress);

* Validate all the parameters. Sub-classes may override this, but should
* call the super method in that case.
public B validate() {
    if (group == null) {
        throw new IllegalStateException("group not set");
    if (channelFactory == null) {
        throw new IllegalStateException("channel or channelFactory not set");
    return self();

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel =;
    if (regFuture.cause() != null) {
        return regFuture;

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See

                    doBind0(regFuture, channel, localAddress, promise);
        return promise;

4.1 channel初始化和注册


final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        channel = channelFactory.newChannel();
    } catch (Throwable t) {
        if (channel != null) {
            // channel can be null if newChannel crashed (eg SocketException("too many open files"))
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);

    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
        } else {

    // If we are here and the promise is not failed, it's one of the following cases:
    // 1) If we attempted registration from the event loop, the registration has been completed at this point.
    //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
    // 2) If we attempted registration from the other thread, the registration request has been successfully
    //    added to the event loop's task queue for later execution.
    //    i.e. It's safe to attempt bind() or connect() now:
    //         because bind() or connect() will be executed *after* the scheduled registration task is executed
    //         because register(), bind(), and connect() are all bound to the same thread.

    return regFuture;


void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        setChannelOptions(channel, options, logger);

    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));

    p.addLast(new ChannelInitializer<Channel>() {
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
            //初始化通道时通过addLast(new *Handler())的方法为每个通道
            ch.eventLoop().execute(new Runnable() {
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));

初始化的逻辑比较简单,到这里已经介绍完毕,下面看server channel是如何进行注册的,在AbstractBootstrap.initAndRegister方法中通过config().group().register(channel)进行注册。其中config返回该channel的默认配置类ServerBootstrapConfigServerBootstrapConfig.groupServerBootstrap.group线程组,所以实际调用的是NioEventLoopGroup.register方法,在其父类MultithreadEventLoopGroup定义如下:

public ChannelFuture register(Channel channel) {
    return next().register(channel);

每个NioEventLoopGroup在其构造函数会实例化多个NioEventLoop对象实例,并通过数组持有这些实例,next方法则使用配置的EventExecutorChooserFactory.EventExecutorChooser选择器从这些EventLoop中选出一个进行注册,默认的选择器工厂类实现DefaultEventExecutorChooserFactory会根据EventLoop数组长度是否为2的指数次方,决定默认的选择器为PowerOfTwoEventExecutorChooser还是GenericEventExecutorChooser,二者目前都采用轮询方式从NioEventLoopGroup中选择NioEventLoop,不同的是如果长度为2的指数次方,则在轮询求余时可以采用& length进行优化,具体可查看二者实现源码,限于篇幅,这里不进行介绍。所以最终调用的是NioEventLoop.register方法,在其父类SingleThreadEventLoop中定义如下:

public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));

public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");, promise);
    return promise;


public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    if (eventLoop == null) {
        throw new NullPointerException("eventLoop");
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
    if (!isCompatible(eventLoop)) {
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));

    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
    } else {
        try {
            eventLoop.execute(new Runnable() {
                public void run() {
        } catch (Throwable t) {
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            safeSetFailure(promise, t);

private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
        boolean firstRegistration = neverRegistered;
        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.

        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (isActive()) {
            if (firstRegistration) {
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                // See
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        safeSetFailure(promise, t);


protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            //这里也看到了熟悉的Java NIO的注册操作,将该Netty channel
            //关联的java channel注册到该eventLoop持有的selector中
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no operation was called yet.
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;

上面提到AbstractUnsafe.register0注册成功之后调用beginRead方法,其实现比较简单,可自行查看源码,主要是向方法doRegister注册之后返回的SelectionKey注册感兴趣的对象。Java NIO编程中,事件注册有两种方式,一种是在向Selector注册channel时同时指定感兴趣的事件,另一种是设置返回的SelectionKey感兴趣的事件。这里的beginRead方法调用还需要配置ChannelOption.AUTO_READ为true,否则可以在自己定义的Handler中调用ChannelHandlerContext.read方法,这个方法的调用最终也会调用beginRead注册OP_ACCEPT事件。


4.2 绑定


private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {


public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);


public void bind(
        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
        throws Exception {
    unsafe.bind(localAddress, promise);

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {

    if (!promise.setUncancellable() || !ensureOpen(promise)) {

    // See:
    if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
        localAddress instanceof InetSocketAddress &&
        !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
        !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
        // Warn a user about the fact that a non-root user can't receive a
        // broadcast packet on *nix if the socket is bound on non-wildcard address.
                "A non-root user can't receive a broadcast packet if the socket " +
                "is not bound to a wildcard address; binding to a non-wildcard " +
                "address (" + localAddress + ") anyway as requested.");

    boolean wasActive = isActive();
    try {
    } catch (Throwable t) {
        safeSetFailure(promise, t);

    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            public void run() {



protected void doBind(SocketAddress localAddress) throws Exception {
    //根据Java版本号,在java channel上进行绑定
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());


