- 简述
这一章是netty源码分析系列的第一章,在这一章中只展示Netty的客户端和服务端的初始化和启动的过程,给读者一个对Netty源码有一个大致的框架上的认识,而不会深入每个功能模块。
本章会从Bootstrap/ServerBootstrap类入手,分析Netty程序的初始化和启动的流程。 - Bootstrap
Bootstrap 是netty提供的一个便利的工厂类,我们可以通过他来完成Netty的客户端或服务端的Netty初始化。
下面以Netty源码例子中的Echo服务器作为例子,从客户端和服务端分别分析一下Netty的程序是如何启动的。 - 客户端部分
- 连接源码
EchoClient.java
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
上面的客户端代码虽然简单,但是却展示了Netty客户端初始化时所需的所有内容
(1) EventLoopGroup:不论是服务器端还是客户端,都必须指定EventLoopGroup,在这个例子中,指定了NioEventLoopGroup,表示一个Nio的EventLoopGroup.
(2)ChannelType:指定Channel的类型。因为是客户端,因此使用了NioSocektChannel
(3) Handler:设置数据的处理器。
-
NioSocketChannel的初始化过程
在Netty中,Channel是一个Socket的抽象,它为用户提供了关于Socket状态(是否是连接还是断开)以及对Socket的读写等操作。每当Netty建立了一个连接后,都会有一个对应的Channel实例。
NioSocketChannel的类层次结构如下
下面着重分析一个Channel的初始化过程
- ChannelFactory 和Channel类型的确定
除了TCP协议以外,Netty还支持很多其他的连接协议,并且每种协议还有NIO(异步IO)和OIO(Old-IO,即传统的阻塞IO)版本的区别,不同协议不同的阻塞类型的连接都有不同的Channel类型与之对应,下面是写常用的Channel类型:
- NioSocketChannel,代表异步的客户端TCP Socket连接
- NioServerSocketChannel,异步的服务器端TCP Socket连接
- NioDatagramChannel, 异步的 UDP 连接
- NioSctpChannel, 异步的客户端 Sctp 连接.
- NioSctpServerChannel, 异步的 Sctp 服务器端连接.
- OioSocketChannel, 同步的客户端 TCP Socket 连接.
- OioServerSocketChannel, 同步的服务器端 TCP Socket 连接.
- OioDatagramChannel, 同步的 UDP 连接
- OioSctpChannel, 同步的 Sctp 服务器端连接.
- OioSctpServerChannel, 同步的客户端 TCP Socket 连接.
那么我们是如何设置所需要的Channel的类型的呢?答案时channel方法的调用。
回想一下我们在客户端连接代码的初始化Bootstrap中,会调用channel()方法,传入NioSocketChannel.class,这个方法其实就是初始化了一个ReflectiveChannelFactory:
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
而ReflectiveChannelFactory 实现了ChannelFactory接口,它提供了唯一的方法,即newChannel。ChannelFactory,顾名思义,就是产生Channel的工厂类。
进入到ReflectiveChannelFactory.newChannel中,我们看到其实现代码如下
public T newChannel() {
try {
return clazz.getConstructor().newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
根据上面代码的提示,我们就可以确定:
- Bootstrap中的ChannelFactory的实现是ReflectiveChannelFactory
- 生成的Channel的具体类型是NioSocketChannel
Channel的实例化过程,其实就是调用ChannelFactory#newChanne方法,而实例化的Channel的具体的类型又是和初始化Bootstrap时传入的channel()方法的参数相关。因为对于我们这个例子中的客户端的Bootstrap而言,生成的Channel实例就是NioSocketChannel
- Channel实例化
前面我们已经知道了如何确定一个Channel的类型,并且了解到Channel是通过工厂方法ChannelFacotry.newChannel()来实例化的,那么ChannelFactory.newChannel()方法在哪儿调用?
Bootstrap.connect -> Bootstrap.doResolveAndConnect ->AbstractBootrap.initAndRegister
在AbstractBootstrap.initAndRegister中就调用了channelFactory.newChannel()来获取一个新的NioSocketChannel实例,其源码如下:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
在newChannel中,通过类对象newInstance来获取一个新的Channel实例,因而会调用NioSocketChannel的默认构造器NioSocketChannel 默认构造器代码如下
public NioSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
这里的代码比较关键,我们看到,在这个构造器中,会调用newSocket来打开一个新的 java NIO socketChannel:
private static SocketChannel newSocket(SelectorProvider provider) {
...
return provider.openSocketChannel();
}
接着会调用父类,即AbstractNioByteChannel的构造器
AbstractNioByteChannel(Channel parent, SelectableChannel ch)
并传入参数parent为null,ch为刚才使用newSocket创建的Java Nio SocketChannel,因此生成的NioSocketChannel的parent channel为Null
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
接着会继续调用父类的AbstractNioChannel的构造器,并传入了参数readInterestOp = SelectionKey.OP_READ:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
// 省略 try 块
// 配置 Java NIO SocketChannel 为非阻塞的.
ch.configureBlocking(false);
}
然后继续调用父类AbstractChannel的构造器
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this);
}
到这里,一个完整的NioSocketChannel就初始化完成了,我们可以稍微总结一下构造一个NioSocketChannel所需要做的工作:
- 调用NioSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER)打开一个新的java NIO SocketChannel
- AbstractChannel(Channel parent)中初始化一个AbstractChannel的属性:
- parent属性置为null
- unsafe通过newUnsafe()实例化一个unsafe对象,它的类型 是AbstractNioByteChannel.NioByteUnsafe 内部类
- pipeline 是 new DefaultChannelPipeline(this) 新创建的实例. 这里体现了:Each channel has its own pipeline and it is created automatically when a new channel is created.
- AbstractNioChanne中的属性:
- SelectableChannel ch 被设置为 Java SocketChannel, 即 NioSocketChannel#newSocket 返回的 Java NIO SocketChannel.
- readInterestOp被设置为SelectionKey.OP_READ
- SlectableChannel ch被配置为非阻塞的ch.configureBlocking(false)
- NioSocketChannel中的属性:
- SocketChannelConfig config = new NioSocketChannelConfig(this, socket.socket())
- 关于unsafe 字段的初始化
我们简单地提到,在实例化NioSocketChannel的过程中,会在父类AbstractChannel的构造器中,调用newUnsafe()来获取一个unsafe实例。那么unsafe是怎么初始化的呢?它的作用是什么?
其实unsafe特别关键,它封装了对java底层Socket的操作,因此实际上是连通Netty上层和java底层的重要桥梁。
Unsafe接口所提供的方法
interface Unsafe {
SocketAddress localAddress();
SocketAddress remoteAddress();
void register(EventLoop eventLoop, ChannelPromise promise);
void bind(SocketAddress localAddress, ChannelPromise promise);
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
void disconnect(ChannelPromise promise);
void close(ChannelPromise promise);
void closeForcibly();
void deregister(ChannelPromise promise);
void beginRead();
void write(Object msg, ChannelPromise promise);
void flush();
ChannelPromise voidPromise();
ChannelOutboundBuffer outboundBuffer();
}
一看便知,这些方法其实都会对应到相关的java底层的Socket的操作。
回到AbstractChannel的构造方法中,在这里调用了newUnsafe()获取一个新的unsafe对象,而newUnsafe方法在NioSocketChannel中被重写了
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioSocketChannelUnsafe();
}
NioSocketChannel.newUnsafe方法会返回一个NioSocketChannelUnsafe实例,从这里我们就可以确定了,在实例化的NioSocketChannel中的unsafe字段,其实就是一个NioSocketChannelUnsafe的实例。
- 关于pipeline的初始化
上面我们分析了一个Channel(在这个例子中时NioSocketChannel)的大体初始化过程,但是我们漏掉了一个关键的部分,即ChannnelPipeline的初始化。
根据Each channel has its own pipeline and it is created automatically when a new channel is created。我们知道,在实例化一个Channel时,必然伴随着实例化的一个ChannelPipeline.而我们确实在AbstractChannel的构造器看到了pipeline字段被初始化为DefaultChannelPipeline的实例,那么看下DefaultChannelPipeline构造器做了那些工作吧:
public DefaultChannelPipeline(AbstractChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel;
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
我们调用DefaultChannelPipeline的构造器,传入了一个channel,而这个channel其实就是我们实例化的NioSocketChannel,DefaultChannelPipeline会将这个NioSocketChannel对象保存在channel字段中。DefaultChannelPipeline中,还有两个特殊的字段,即head和tail,而这两个字段是一个双向链表的头和尾,其实在DefaultChannelPipeline中,维护了一个以AbstractChannelHandlerContext为节点的双向链表,这个链表是Netty实现pipeline机制的关键。关于DefaultChannelPipeline中的双向链表以及它所起的作用,下面介绍
HeadContext的继承层次结果如下所示:
TailContext 的继承层次结构如下所示:
我们可以看到,链表中head是一个ChannelOutboundHandler ,而tail则是一个ChannelInboundHandler,接着我们看一下headContext的构造器
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
}
它调用了父类AbstractChannelHandlerContext的构造器,并传入参数inbound=false,outbound=true
TailContext的构造器与HeadContext的相反,它调用了父类AbstractChannelHandlerContext的构造器,并传入参数inbound=true,outbound=false
即header是一个outboundHandler,而tail是一个inboundHandler
-
关于EventLoop初始化
回到最开始的EchoClient.java代码中,我们在一开始实例化了一个NioEventLoopGroup对象,因此我们就从它的构造器中追踪一下EventLoop的初始化过程。
首先来看一下NioEventLoopGroup的类继承层次
NioEventLoop有几个重载的构造器,不过内容都没什么大的区别,最终都是调用父类MultithreadEventLoopGroup构造器
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
其中一点是,如果我们传入的线程数nThreads是0,那么Netty会为我们设置默认的线程数DEFAULT_EVENT_LOOP_THREADS,而是这个默认的线程数是怎么确定的呢?
其实很简答,在静态代码块中,会首先确定DEFAULT_EVENT_LOOP_THREADS的值
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
}
Netty会首先从系统属性中回去"io.netty.eventLoopThreads"的值,如果我们没有设置它的话,那么就返回默认值:处理器核心数*2.
回到MultithreadEventLoopGroup构造器中,这个构造器会继续调用父类MultithreadEventExecutorGroup的构造器:
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
// 去掉了参数检查, 异常处理 等代码.
children = new EventExecutor[nThreads];
if (isPowerOfTwo(children.length)) {
chooser = new PowerOfTwoEventExecutorChooser();
} else {
chooser = new GenericEventExecutorChooser();
}
for (int i = 0; i < nThreads; i ++) {
children[i] = newChild(threadFactory, args);
}
}
根据代码,我们就很清楚MultithreadEventExecutorGroup中的处理逻辑了:
- 创建一个大小为nThreads的EventExcutor数组
- 调用newChild方法初始化children数组
根据上面的diamante,我们知道,MultithreadEventLoopExecutorGroup内部维护了一个EventExecutor数据,Netty的EventLoopGroup的实现机制其实就建立在MultithreadEventExecutorGroup之上,每当Netty需要EventLoop一个时,会调用netx方法获取一个可用的EventLoop.
上面代码的最后一部分是newChild方法,这个是一个抽象方法,它的任务是实例化EventLoop对象,我们根据以下它的代码,可以发现这个方法在NioEventLoopGroup类中实现了,其内容很简单:
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
其实就是实例化一个NioEventLoop对象,然后返回它。
最后总结以下整个EventLoopGroup的初始化过程
- EventLoopGroup(其实是MultithreadEventExecutorGroup)内部维护一个类型为EventExecutor children数组,其大小是nThreads,这样就构造成了一个线程池
- 其实我们在实例化NioEventLoopGroup时,如果指定线程池大小,则nThreads就是指定的值,反之是处理器核心数*2
- MultithreadEventExecutorGroup中会调用newChild抽象方法来实例化children数组
- 抽象方法newChild是在NioEventLoopGroup中实现,它返回一个NioEventLoop实例
- NioEventLoop属性
- SelectorProvider provide属性:NioEventLoopGroup构造器中通过SelectorProvider.provider()获取一个SelectorProvider
- Selector selector属性:NioEventLoop构造器中通过调用selector=provider.openSelector()获取一个selector对象
- channel的注册过程
在前面的分析中,我们提到,channel会在Bootrap.initAndRegister中进行初始化,但是这个方法还会将初始化好的Channel注册到EventLoop中。接下来我们就来分析一些Channel注册的过程
回顾一下AbstractBootstrap.initAndRegister方法:
final ChannelFuture initAndRegister() {
// 去掉非关键代码
final Channel channel = channelFactory().newChannel();
init(channel);
ChannelFuture regFuture = group().register(channel);
}
当Channel初始化后,会紧接着调用group.register()方法来注册Channel,我们继续跟踪的话,会发现其调用链如下:
AbstractBootrap.initAndRegister -> MultithreadEventLoopGroup.register ->SingleThreadEventLoop.register -> AbstractUnsafe.register
通过跟踪调用链,最终我们发现是调用到了unsafe的register方法,那么接下来我们就仔细看一下AbstractUnsafe.register方法
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 省略条件判断和错误处理
AbstractChannel.this.eventLoop = eventLoop;
register0(promise);
}
首先,将eventLoop赋值给Channel的eventLoop属性,而我们知道这个eventLoop对象其实就是MultithreadEventLoopGroup.next()方法获取的,根据我们前面关于EventLoop初始化小节中,我们可以确定next()方法返回的eventLoop对象是NioEventLoop实例,register方法接着调用register0()
private void register0(ChannelPromise promise) {
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (firstRegistration && isActive()) {
pipeline.fireChannelActive();
}
}
register0又调用了AbstractNioChannel.doRegister
@Override
protected void doRegister() throws Exception {
// 省略错误处理
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
}
javaChannel()这个方法在前面我们已经知道了,它返回的是一个 Java Nio SocketChannel,这里我们将这个SocketChannel注册到eventLoop关联的selector上了
我们总结一下Channel的注册过程:
- 首先在AbstractBootstrap.initAndRegister中,通过group().register(channel),调用MultithreadEventLoopGroup.register()
- 在MultithreadEventLoopGroup.register中,通过next()获取一个可用的SingleThreadEventLoop,然后调用它的register
- 在SingleThreadEventLoop.register中,通过channel.unsafe().register(this.promise)开获取channel的unsafe()底层操作对象,然后调用它的register
- 在AbstractUnsafe.register(),调用register0()注册Channel
- 在AbstractUnsafe.register0中,调用AbstrackNioChannel.doRegister()
- AbstractNioChannel.doRegister()通过javaChannel().register(eventLoop().selector,0,this)将Channel对应的Java NIO SocketChannel注册到一个eventLoop的Selector中,并且将当前Channel作为attachment.
总的来说,Channel注册过程所做的工作就是将Channel与对应的EventLoop关联,因此这也体现了,在Netty中,每个Channel都会关联一个特定的EventLoop,并且这个Channel中的所有IO操作都是在这个EventLoop中执行的;当关联好Channel和EventLoop后,会继续调用底层的java NIO SocketChannel 的register(),将底层Java NIO SocketChannel注册到指定的selector中,通过这两部,就完成了Netty Channel的注册过程。
- handler的添加过程
Netty的一个强大和灵活之处就是基于Pipeline的自定义handler机制。基于此,我们可以像添加插件一样自由组合各种各样的handler来完成业务逻辑。例如我们需要处理HTTP数据,那么就可以在pipeline前添加一个Http的编解码的Handler,然后接着添加我们自己的业务逻辑的handler,这样网络上的数据流就像通过一个管道一样,从不同的handler中流过并进行编解码,最终在到达我们自定义的handler中。
如何将自定义的handler添加到pipeline中
...
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
}
});
这段代码就是实现了handler的添加功能,我们看到,Bootstrap.handler()接受一个ChannelHandler,而我们传递的是一个派生于ChannelInitializer的匿名类,他正好也实现了ChannelHandler接口,我们来看一下,ChannelInitializer类到底有什么
@Sharable
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelInitializer.class);
protected abstract void initChannel(C ch) throws Exception;
@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
initChannel((C) ctx.channel());
ctx.pipeline().remove(this);
ctx.fireChannelRegistered();
}
...
}
ChannelInitializer是一个抽象类,它有一个抽象的方法initChannel,我们正是实现了这个方法,并在这个方法中添加的自定义的handler,那么initChannel是那么被调用呢?在ChannelInitializer.channelRegistered()中
我们来关注一下channelRegister(),从上面的源码中,我们可以看出,在channelRegister()中,会调用initChannel(),将自定义的handler添加到ChannelPipeline中,然后调用ctx.pipeline().remove(this)将自己从ChannelPipeline中删除,上面的分析可以用如下图片展示:
一开始,ChannelPipeline中只有三个handler,head,tail和我们添加的ChannelIntializer
接着initChannel方法调用后,添加了自定义的handler
最后将ChannelInitializer删除
- 客户端连接分析
通过上面的各种分析后,我们大致了解了Netty初始化,所做的工作,那么接下来我们就分析下客户端是如何发起TCP连接的.
首先,客户端通过调用Bootstrap的connect()进行连接
在connect中,会进行一些参数检查后,最终调用的是doConnect0(),其实现如下:
private static void doConnect0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
if (localAddress == null) {
channel.connect(remoteAddress, promise);
} else {
channel.connect(remoteAddress, localAddress, promise);
}
promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
在doConnect0()中,会在event loop线程中调用Channel的connect(),而这个Channel的具体类型是什么呢?我们在Channel初始化的这一小节已经分析过了,这里channel的类型是NioSocketChannel
进行跟踪到channel.connect中,我们发现它调用的是DefaultChannelPipeline#connect,而pipeline的connect代码如下:
@Override
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}
而tail字段,我们已经分析过了,是一个TailContext的实例,而TailContext又是AbstractChannelHandlerContext 的子类,而且没有实现connect(),因此这里调用的其实是AbstractChanneHandlerContext.connect,我们看一下这个方法的实现
@Override
public ChannelFuture connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
// 删除的参数检查的代码
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, new OneTimeTask() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null);
}
return promise;
}
上面的代码中有一个关键的地方,即final AbstractChannelHandlerContext next = findContextOutbound(),这里调用findContextOutbound(),从DefaultChannelPipeline内的双向链表的tail开始,不断向前寻找第一个outbound为true的AbstractChannelHandlerContext,然后调用它的invokeConnect(),其代码如下:
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
// 忽略 try 块
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
}
在之前提到,在DefaultChannelPipeline的构造器中,会实例化两个对象:head和tail,并形成了双向链表的头和尾。head是HeadContext的实例,它实现了ChannelOutboundHandler接口,并且他的outbound字段为true,因此在findContextOutbound中,找到了AbstractChannelHandlerContext对象其实就是head,进而在invokeConnect(),我们向上转换为ChannelOutboundHandler就是没问题了。
而又因为HeadContext重写了connect方法,因此实际上调用的是HeadContext.connect,我们接着跟踪HeadContext.connect,其代码如下
@Override
public void connect(
ChannelHandlerContext cox,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
这个connect()很简单,仅仅调用了unsafe的connect(),而unsafe又是什么呢?
回顾一下HeadContext的构造器,我们发现unsafe是pipeline.channel().unsafe()返回的,而Channel的unsafe字段,在这个例子中,我们已经知道了,其实是AbstractNioByteChannel.NioByteUnsafe内部类.兜兜转转了一大圈,我们找到了创建Socket连接的关键diamante
NioByteUnsafe -> AbstractNioUnsafe.connect
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
boolean wasActive = isActive();
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
...
}
}
AbstractNioUnsafe.connect的实现如上代码所示,在这个connect方法中,调用了doConnect,注意,这个方法并不是AbstractNioUnsafe的方法,而是AbstractNioChannel 的抽象方法。doConnect()是在NioSocketChannel中实现的,因此进入NioSocketChannel.doConnect中:
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
javaChannel().socket().bind(localAddress);
}
boolean success = false;
try {
boolean connected = javaChannel().connect(remoteAddress);
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
首先是获取Java Nio SocketChannel,即我们已经分析过的,从NioSocketChannel.newSocket返回的SocketChannel对象,然后调用SocketChannel.connect()完成java NIO层面的Socket的连接