Bootstrap --- 客户端

  1. 简述
    这一章是netty源码分析系列的第一章,在这一章中只展示Netty的客户端和服务端的初始化和启动的过程,给读者一个对Netty源码有一个大致的框架上的认识,而不会深入每个功能模块。
    本章会从Bootstrap/ServerBootstrap类入手,分析Netty程序的初始化和启动的流程。
  2. Bootstrap
    Bootstrap 是netty提供的一个便利的工厂类,我们可以通过他来完成Netty的客户端或服务端的Netty初始化。
    下面以Netty源码例子中的Echo服务器作为例子,从客户端和服务端分别分析一下Netty的程序是如何启动的。
  3. 客户端部分
  • 连接源码
    EchoClient.java
EventLoopGroup group = new NioEventLoopGroup();
try {
    Bootstrap b = new Bootstrap();
    b.group(group)
     .channel(NioSocketChannel.class)
     .option(ChannelOption.TCP_NODELAY, true)
     .handler(new ChannelInitializer<SocketChannel>() {
         @Override
         public void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline p = ch.pipeline();
             p.addLast(new EchoClientHandler());
         }
     });

    // Start the client.
    ChannelFuture f = b.connect(HOST, PORT).sync();

    // Wait until the connection is closed.
    f.channel().closeFuture().sync();
} finally {
    // Shut down the event loop to terminate all threads.
    group.shutdownGracefully();
}

上面的客户端代码虽然简单,但是却展示了Netty客户端初始化时所需的所有内容
(1) EventLoopGroup:不论是服务器端还是客户端,都必须指定EventLoopGroup,在这个例子中,指定了NioEventLoopGroup,表示一个Nio的EventLoopGroup.
(2)ChannelType:指定Channel的类型。因为是客户端,因此使用了NioSocektChannel
(3) Handler:设置数据的处理器。

  1. NioSocketChannel的初始化过程
    在Netty中,Channel是一个Socket的抽象,它为用户提供了关于Socket状态(是否是连接还是断开)以及对Socket的读写等操作。每当Netty建立了一个连接后,都会有一个对应的Channel实例。
    NioSocketChannel的类层次结构如下


    1.png

下面着重分析一个Channel的初始化过程

  1. ChannelFactory 和Channel类型的确定
    除了TCP协议以外,Netty还支持很多其他的连接协议,并且每种协议还有NIO(异步IO)和OIO(Old-IO,即传统的阻塞IO)版本的区别,不同协议不同的阻塞类型的连接都有不同的Channel类型与之对应,下面是写常用的Channel类型:
  • NioSocketChannel,代表异步的客户端TCP Socket连接
  • NioServerSocketChannel,异步的服务器端TCP Socket连接
  • NioDatagramChannel, 异步的 UDP 连接
  • NioSctpChannel, 异步的客户端 Sctp 连接.
  • NioSctpServerChannel, 异步的 Sctp 服务器端连接.
  • OioSocketChannel, 同步的客户端 TCP Socket 连接.
  • OioServerSocketChannel, 同步的服务器端 TCP Socket 连接.
  • OioDatagramChannel, 同步的 UDP 连接
  • OioSctpChannel, 同步的 Sctp 服务器端连接.
  • OioSctpServerChannel, 同步的客户端 TCP Socket 连接.
    那么我们是如何设置所需要的Channel的类型的呢?答案时channel方法的调用。
    回想一下我们在客户端连接代码的初始化Bootstrap中,会调用channel()方法,传入NioSocketChannel.class,这个方法其实就是初始化了一个ReflectiveChannelFactory:
public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

而ReflectiveChannelFactory 实现了ChannelFactory接口,它提供了唯一的方法,即newChannel。ChannelFactory,顾名思义,就是产生Channel的工厂类。
进入到ReflectiveChannelFactory.newChannel中,我们看到其实现代码如下

public T newChannel() {
        try {
            return clazz.getConstructor().newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

根据上面代码的提示,我们就可以确定:

  • Bootstrap中的ChannelFactory的实现是ReflectiveChannelFactory
  • 生成的Channel的具体类型是NioSocketChannel
    Channel的实例化过程,其实就是调用ChannelFactory#newChanne方法,而实例化的Channel的具体的类型又是和初始化Bootstrap时传入的channel()方法的参数相关。因为对于我们这个例子中的客户端的Bootstrap而言,生成的Channel实例就是NioSocketChannel
  1. Channel实例化
    前面我们已经知道了如何确定一个Channel的类型,并且了解到Channel是通过工厂方法ChannelFacotry.newChannel()来实例化的,那么ChannelFactory.newChannel()方法在哪儿调用?
Bootstrap.connect -> Bootstrap.doResolveAndConnect ->AbstractBootrap.initAndRegister

在AbstractBootstrap.initAndRegister中就调用了channelFactory.newChannel()来获取一个新的NioSocketChannel实例,其源码如下:

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }

在newChannel中,通过类对象newInstance来获取一个新的Channel实例,因而会调用NioSocketChannel的默认构造器NioSocketChannel 默认构造器代码如下

public NioSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

这里的代码比较关键,我们看到,在这个构造器中,会调用newSocket来打开一个新的 java NIO socketChannel:

private static SocketChannel newSocket(SelectorProvider provider) {
    ...
    return provider.openSocketChannel();
}

接着会调用父类,即AbstractNioByteChannel的构造器

AbstractNioByteChannel(Channel parent, SelectableChannel ch)

并传入参数parent为null,ch为刚才使用newSocket创建的Java Nio SocketChannel,因此生成的NioSocketChannel的parent channel为Null

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);
}

接着会继续调用父类的AbstractNioChannel的构造器,并传入了参数readInterestOp = SelectionKey.OP_READ:

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    // 省略 try 块
    // 配置 Java NIO SocketChannel 为非阻塞的.
    ch.configureBlocking(false);
}

然后继续调用父类AbstractChannel的构造器

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}

到这里,一个完整的NioSocketChannel就初始化完成了,我们可以稍微总结一下构造一个NioSocketChannel所需要做的工作:

  • 调用NioSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER)打开一个新的java NIO SocketChannel
  • AbstractChannel(Channel parent)中初始化一个AbstractChannel的属性:
    • parent属性置为null
    • unsafe通过newUnsafe()实例化一个unsafe对象,它的类型 是AbstractNioByteChannel.NioByteUnsafe 内部类
    • pipeline 是 new DefaultChannelPipeline(this) 新创建的实例. 这里体现了:Each channel has its own pipeline and it is created automatically when a new channel is created.
  • AbstractNioChanne中的属性:
    • SelectableChannel ch 被设置为 Java SocketChannel, 即 NioSocketChannel#newSocket 返回的 Java NIO SocketChannel.
    • readInterestOp被设置为SelectionKey.OP_READ
    • SlectableChannel ch被配置为非阻塞的ch.configureBlocking(false)
  • NioSocketChannel中的属性:
    • SocketChannelConfig config = new NioSocketChannelConfig(this, socket.socket())
  1. 关于unsafe 字段的初始化
    我们简单地提到,在实例化NioSocketChannel的过程中,会在父类AbstractChannel的构造器中,调用newUnsafe()来获取一个unsafe实例。那么unsafe是怎么初始化的呢?它的作用是什么?
    其实unsafe特别关键,它封装了对java底层Socket的操作,因此实际上是连通Netty上层和java底层的重要桥梁。
    Unsafe接口所提供的方法
interface Unsafe {
    SocketAddress localAddress();
    SocketAddress remoteAddress();
    void register(EventLoop eventLoop, ChannelPromise promise);
    void bind(SocketAddress localAddress, ChannelPromise promise);
    void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
    void disconnect(ChannelPromise promise);
    void close(ChannelPromise promise);
    void closeForcibly();
    void deregister(ChannelPromise promise);
    void beginRead();
    void write(Object msg, ChannelPromise promise);
    void flush();
    ChannelPromise voidPromise();
    ChannelOutboundBuffer outboundBuffer();
}

一看便知,这些方法其实都会对应到相关的java底层的Socket的操作。
回到AbstractChannel的构造方法中,在这里调用了newUnsafe()获取一个新的unsafe对象,而newUnsafe方法在NioSocketChannel中被重写了

@Override
protected AbstractNioUnsafe newUnsafe() {
    return new NioSocketChannelUnsafe();
}

NioSocketChannel.newUnsafe方法会返回一个NioSocketChannelUnsafe实例,从这里我们就可以确定了,在实例化的NioSocketChannel中的unsafe字段,其实就是一个NioSocketChannelUnsafe的实例。

  1. 关于pipeline的初始化
    上面我们分析了一个Channel(在这个例子中时NioSocketChannel)的大体初始化过程,但是我们漏掉了一个关键的部分,即ChannnelPipeline的初始化。
    根据Each channel has its own pipeline and it is created automatically when a new channel is created。我们知道,在实例化一个Channel时,必然伴随着实例化的一个ChannelPipeline.而我们确实在AbstractChannel的构造器看到了pipeline字段被初始化为DefaultChannelPipeline的实例,那么看下DefaultChannelPipeline构造器做了那些工作吧:
public DefaultChannelPipeline(AbstractChannel channel) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    this.channel = channel;

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

我们调用DefaultChannelPipeline的构造器,传入了一个channel,而这个channel其实就是我们实例化的NioSocketChannel,DefaultChannelPipeline会将这个NioSocketChannel对象保存在channel字段中。DefaultChannelPipeline中,还有两个特殊的字段,即head和tail,而这两个字段是一个双向链表的头和尾,其实在DefaultChannelPipeline中,维护了一个以AbstractChannelHandlerContext为节点的双向链表,这个链表是Netty实现pipeline机制的关键。关于DefaultChannelPipeline中的双向链表以及它所起的作用,下面介绍
HeadContext的继承层次结果如下所示:


1.png

TailContext 的继承层次结构如下所示:


2.png

我们可以看到,链表中head是一个ChannelOutboundHandler ,而tail则是一个ChannelInboundHandler,接着我们看一下headContext的构造器
HeadContext(DefaultChannelPipeline pipeline) {
    super(pipeline, null, HEAD_NAME, false, true);
    unsafe = pipeline.channel().unsafe();
}

它调用了父类AbstractChannelHandlerContext的构造器,并传入参数inbound=false,outbound=true
TailContext的构造器与HeadContext的相反,它调用了父类AbstractChannelHandlerContext的构造器,并传入参数inbound=true,outbound=false
即header是一个outboundHandler,而tail是一个inboundHandler

  1. 关于EventLoop初始化
    回到最开始的EchoClient.java代码中,我们在一开始实例化了一个NioEventLoopGroup对象,因此我们就从它的构造器中追踪一下EventLoop的初始化过程。
    首先来看一下NioEventLoopGroup的类继承层次


    3.png

    NioEventLoop有几个重载的构造器,不过内容都没什么大的区别,最终都是调用父类MultithreadEventLoopGroup构造器

protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}

其中一点是,如果我们传入的线程数nThreads是0,那么Netty会为我们设置默认的线程数DEFAULT_EVENT_LOOP_THREADS,而是这个默认的线程数是怎么确定的呢?
其实很简答,在静态代码块中,会首先确定DEFAULT_EVENT_LOOP_THREADS的值

static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
}

Netty会首先从系统属性中回去"io.netty.eventLoopThreads"的值,如果我们没有设置它的话,那么就返回默认值:处理器核心数*2.
回到MultithreadEventLoopGroup构造器中,这个构造器会继续调用父类MultithreadEventExecutorGroup的构造器:

protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    // 去掉了参数检查, 异常处理 等代码.
    children = new EventExecutor[nThreads];
    if (isPowerOfTwo(children.length)) {
        chooser = new PowerOfTwoEventExecutorChooser();
    } else {
        chooser = new GenericEventExecutorChooser();
    }

    for (int i = 0; i < nThreads; i ++) {
        children[i] = newChild(threadFactory, args);
    }
}

根据代码,我们就很清楚MultithreadEventExecutorGroup中的处理逻辑了:

  • 创建一个大小为nThreads的EventExcutor数组
  • 调用newChild方法初始化children数组
    根据上面的diamante,我们知道,MultithreadEventLoopExecutorGroup内部维护了一个EventExecutor数据,Netty的EventLoopGroup的实现机制其实就建立在MultithreadEventExecutorGroup之上,每当Netty需要EventLoop一个时,会调用netx方法获取一个可用的EventLoop.
    上面代码的最后一部分是newChild方法,这个是一个抽象方法,它的任务是实例化EventLoop对象,我们根据以下它的代码,可以发现这个方法在NioEventLoopGroup类中实现了,其内容很简单:
  @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }

其实就是实例化一个NioEventLoop对象,然后返回它。
最后总结以下整个EventLoopGroup的初始化过程

  • EventLoopGroup(其实是MultithreadEventExecutorGroup)内部维护一个类型为EventExecutor children数组,其大小是nThreads,这样就构造成了一个线程池
  • 其实我们在实例化NioEventLoopGroup时,如果指定线程池大小,则nThreads就是指定的值,反之是处理器核心数*2
  • MultithreadEventExecutorGroup中会调用newChild抽象方法来实例化children数组
  • 抽象方法newChild是在NioEventLoopGroup中实现,它返回一个NioEventLoop实例
  • NioEventLoop属性
    • SelectorProvider provide属性:NioEventLoopGroup构造器中通过SelectorProvider.provider()获取一个SelectorProvider
    • Selector selector属性:NioEventLoop构造器中通过调用selector=provider.openSelector()获取一个selector对象
  1. channel的注册过程
    在前面的分析中,我们提到,channel会在Bootrap.initAndRegister中进行初始化,但是这个方法还会将初始化好的Channel注册到EventLoop中。接下来我们就来分析一些Channel注册的过程
    回顾一下AbstractBootstrap.initAndRegister方法:
final ChannelFuture initAndRegister() {
    // 去掉非关键代码
    final Channel channel = channelFactory().newChannel();
    init(channel);
    ChannelFuture regFuture = group().register(channel);
}

当Channel初始化后,会紧接着调用group.register()方法来注册Channel,我们继续跟踪的话,会发现其调用链如下:
AbstractBootrap.initAndRegister -> MultithreadEventLoopGroup.register ->SingleThreadEventLoop.register -> AbstractUnsafe.register
通过跟踪调用链,最终我们发现是调用到了unsafe的register方法,那么接下来我们就仔细看一下AbstractUnsafe.register方法

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    // 省略条件判断和错误处理
    AbstractChannel.this.eventLoop = eventLoop;
    register0(promise);
}

首先,将eventLoop赋值给Channel的eventLoop属性,而我们知道这个eventLoop对象其实就是MultithreadEventLoopGroup.next()方法获取的,根据我们前面关于EventLoop初始化小节中,我们可以确定next()方法返回的eventLoop对象是NioEventLoop实例,register方法接着调用register0()

private void register0(ChannelPromise promise) {
    boolean firstRegistration = neverRegistered;
    doRegister();
    neverRegistered = false;
    registered = true;
    safeSetSuccess(promise);
    pipeline.fireChannelRegistered();
    // Only fire a channelActive if the channel has never been registered. This prevents firing
    // multiple channel actives if the channel is deregistered and re-registered.
    if (firstRegistration && isActive()) {
        pipeline.fireChannelActive();
    }
}

register0又调用了AbstractNioChannel.doRegister

@Override
protected void doRegister() throws Exception {
    // 省略错误处理
    selectionKey = javaChannel().register(eventLoop().selector, 0, this);
}

javaChannel()这个方法在前面我们已经知道了,它返回的是一个 Java Nio SocketChannel,这里我们将这个SocketChannel注册到eventLoop关联的selector上了
我们总结一下Channel的注册过程:

  • 首先在AbstractBootstrap.initAndRegister中,通过group().register(channel),调用MultithreadEventLoopGroup.register()
  • 在MultithreadEventLoopGroup.register中,通过next()获取一个可用的SingleThreadEventLoop,然后调用它的register
  • 在SingleThreadEventLoop.register中,通过channel.unsafe().register(this.promise)开获取channel的unsafe()底层操作对象,然后调用它的register
  • 在AbstractUnsafe.register(),调用register0()注册Channel
  • 在AbstractUnsafe.register0中,调用AbstrackNioChannel.doRegister()
  • AbstractNioChannel.doRegister()通过javaChannel().register(eventLoop().selector,0,this)将Channel对应的Java NIO SocketChannel注册到一个eventLoop的Selector中,并且将当前Channel作为attachment.
    总的来说,Channel注册过程所做的工作就是将Channel与对应的EventLoop关联,因此这也体现了,在Netty中,每个Channel都会关联一个特定的EventLoop,并且这个Channel中的所有IO操作都是在这个EventLoop中执行的;当关联好Channel和EventLoop后,会继续调用底层的java NIO SocketChannel 的register(),将底层Java NIO SocketChannel注册到指定的selector中,通过这两部,就完成了Netty Channel的注册过程。
  1. handler的添加过程
    Netty的一个强大和灵活之处就是基于Pipeline的自定义handler机制。基于此,我们可以像添加插件一样自由组合各种各样的handler来完成业务逻辑。例如我们需要处理HTTP数据,那么就可以在pipeline前添加一个Http的编解码的Handler,然后接着添加我们自己的业务逻辑的handler,这样网络上的数据流就像通过一个管道一样,从不同的handler中流过并进行编解码,最终在到达我们自定义的handler中。
    如何将自定义的handler添加到pipeline中
...
.handler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         if (sslCtx != null) {
             p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
         }
         //p.addLast(new LoggingHandler(LogLevel.INFO));
         p.addLast(new EchoClientHandler());
     }
 });

这段代码就是实现了handler的添加功能,我们看到,Bootstrap.handler()接受一个ChannelHandler,而我们传递的是一个派生于ChannelInitializer的匿名类,他正好也实现了ChannelHandler接口,我们来看一下,ChannelInitializer类到底有什么

@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);
    protected abstract void initChannel(C ch) throws Exception;

    @Override
    @SuppressWarnings("unchecked")
    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        initChannel((C) ctx.channel());
        ctx.pipeline().remove(this);
        ctx.fireChannelRegistered();
    }
    ...
}

ChannelInitializer是一个抽象类,它有一个抽象的方法initChannel,我们正是实现了这个方法,并在这个方法中添加的自定义的handler,那么initChannel是那么被调用呢?在ChannelInitializer.channelRegistered()中
我们来关注一下channelRegister(),从上面的源码中,我们可以看出,在channelRegister()中,会调用initChannel(),将自定义的handler添加到ChannelPipeline中,然后调用ctx.pipeline().remove(this)将自己从ChannelPipeline中删除,上面的分析可以用如下图片展示:
一开始,ChannelPipeline中只有三个handler,head,tail和我们添加的ChannelIntializer


1.png

接着initChannel方法调用后,添加了自定义的handler


2.png

最后将ChannelInitializer删除
3.png
  1. 客户端连接分析
    通过上面的各种分析后,我们大致了解了Netty初始化,所做的工作,那么接下来我们就分析下客户端是如何发起TCP连接的.
    首先,客户端通过调用Bootstrap的connect()进行连接
    在connect中,会进行一些参数检查后,最终调用的是doConnect0(),其实现如下:
private static void doConnect0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                if (localAddress == null) {
                    channel.connect(remoteAddress, promise);
                } else {
                    channel.connect(remoteAddress, localAddress, promise);
                }
                promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

在doConnect0()中,会在event loop线程中调用Channel的connect(),而这个Channel的具体类型是什么呢?我们在Channel初始化的这一小节已经分析过了,这里channel的类型是NioSocketChannel
进行跟踪到channel.connect中,我们发现它调用的是DefaultChannelPipeline#connect,而pipeline的connect代码如下:

 @Override
    public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
        return tail.connect(remoteAddress, promise);
    }

而tail字段,我们已经分析过了,是一个TailContext的实例,而TailContext又是AbstractChannelHandlerContext 的子类,而且没有实现connect(),因此这里调用的其实是AbstractChanneHandlerContext.connect,我们看一下这个方法的实现

@Override
public ChannelFuture connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

    // 删除的参数检查的代码
    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeConnect(remoteAddress, localAddress, promise);
    } else {
        safeExecute(executor, new OneTimeTask() {
            @Override
            public void run() {
                next.invokeConnect(remoteAddress, localAddress, promise);
            }
        }, promise, null);
    }

    return promise;
}

上面的代码中有一个关键的地方,即final AbstractChannelHandlerContext next = findContextOutbound(),这里调用findContextOutbound(),从DefaultChannelPipeline内的双向链表的tail开始,不断向前寻找第一个outbound为true的AbstractChannelHandlerContext,然后调用它的invokeConnect(),其代码如下:

private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    // 忽略 try 块
    ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
}

在之前提到,在DefaultChannelPipeline的构造器中,会实例化两个对象:head和tail,并形成了双向链表的头和尾。head是HeadContext的实例,它实现了ChannelOutboundHandler接口,并且他的outbound字段为true,因此在findContextOutbound中,找到了AbstractChannelHandlerContext对象其实就是head,进而在invokeConnect(),我们向上转换为ChannelOutboundHandler就是没问题了。
而又因为HeadContext重写了connect方法,因此实际上调用的是HeadContext.connect,我们接着跟踪HeadContext.connect,其代码如下

@Override
public void connect(
        ChannelHandlerContext cox,
        SocketAddress remoteAddress, SocketAddress localAddress,
        ChannelPromise promise) throws Exception {
    unsafe.connect(remoteAddress, localAddress, promise);
}

这个connect()很简单,仅仅调用了unsafe的connect(),而unsafe又是什么呢?
回顾一下HeadContext的构造器,我们发现unsafe是pipeline.channel().unsafe()返回的,而Channel的unsafe字段,在这个例子中,我们已经知道了,其实是AbstractNioByteChannel.NioByteUnsafe内部类.兜兜转转了一大圈,我们找到了创建Socket连接的关键diamante
NioByteUnsafe -> AbstractNioUnsafe.connect

@Override
public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    boolean wasActive = isActive();
    if (doConnect(remoteAddress, localAddress)) {
        fulfillConnectPromise(promise, wasActive);
    } else {
        ...
    }
}

AbstractNioUnsafe.connect的实现如上代码所示,在这个connect方法中,调用了doConnect,注意,这个方法并不是AbstractNioUnsafe的方法,而是AbstractNioChannel 的抽象方法。doConnect()是在NioSocketChannel中实现的,因此进入NioSocketChannel.doConnect中:

@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
  if (localAddress != null) {
      javaChannel().socket().bind(localAddress);
  }

  boolean success = false;
  try {
      boolean connected = javaChannel().connect(remoteAddress);
      if (!connected) {
          selectionKey().interestOps(SelectionKey.OP_CONNECT);
      }
      success = true;
      return connected;
  } finally {
      if (!success) {
          doClose();
      }
  }
}

首先是获取Java Nio SocketChannel,即我们已经分析过的,从NioSocketChannel.newSocket返回的SocketChannel对象,然后调用SocketChannel.connect()完成java NIO层面的Socket的连接


4.png
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容