netty作为客户端从bootstrap启动,作为服务端从ServerBootstrap,本文默认传输层协议为TCP协议。
UML图
如上图所示,Bootstrap和ServerBoostrap都继承自AbstractBootstrap.
Bootstrap
Bootstrap用于一个客户端连接服务器,获取一个channel,代码如下
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
p.addLast(new DiscardClientHandler());
}
});
// Make the connection attempt.
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
bootstrap的核心代码是connect方法如下:
/**
* Connect a {@link Channel} to the remote peer.
*/
public ChannelFuture connect(SocketAddress remoteAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
validate();
return doResolveAndConnect(remoteAddress, config.localAddress());
}
doResolveAndConnect方法如下:
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
// 首先创建并初始化并注册一个channel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
// 如果注册完成,channel已经找到了自己的EventLoop/excutor,则直接调用
// doResolveAndConnect0解析域名和连接服务器
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
// 如果没有注册完成,则为注册future增加一个listener,但是有个问题:
// 返回的connectFuture怎么办?由于channel现在不一定有excutor,
// 所以你不能用channel.newPromise()去新建一个promise, 所以,就诞生了
// PendingRegistrationPromise, Future/listern模式我们后面会专门研究
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Direclty obtain the cause and do a null check so we only need one volatile read in case of a
// failure.
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}
PendingRegistrationPromise 到底是什么鬼?其实就干一件事:如果注册成功,其执行线程是channel的执行线程,如果注册失败,执行线程是一个全局的执行线程。
static final class PendingRegistrationPromise extends DefaultChannelPromise {
// Is set to the correct EventExecutor once the registration was successful. Otherwise it will
// stay null and so the GlobalEventExecutor.INSTANCE will be used for notifications.
private volatile boolean registered;
PendingRegistrationPromise(Channel channel) {
super(channel);
}
void registered() {
registered = true;
}
@Override
protected EventExecutor executor() {
// 如果注册完成了, 这个listener就是就是channel的enventLoop
if (registered) {
// If the registration was a success executor is set.
//
// See https://github.com/netty/netty/issues/2586
return super.executor();
}
// 否则就是全局的一个线程
// The registration failed so we can only use the GlobalEventExecutor as last resort to notify.
return GlobalEventExecutor.INSTANCE;
}
}
initAndRegister()方法在AbstractBootstrap中
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 首先产生一个channel
channel = channelFactory.newChannel();
// 初始化, 该方法在bootstrap中,这个方法里面就是为channel设置一些可选项和属性
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 注册channel
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
doResolveAndConnect0()方法解析远程域名并发起连接
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) {
try {
final EventLoop eventLoop = channel.eventLoop();
final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
// 如果不支持解析,或已经解析,直接连接
if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
// Resolver has no idea about what to do with the specified remote address or it's resolved already.
doConnect(remoteAddress, localAddress, promise);
return promise;
}
// 如果没有解析,则现在解析
final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
//解析完成,则马上解析
if (resolveFuture.isDone()) {
final Throwable resolveFailureCause = resolveFuture.cause();
if (resolveFailureCause != null) {
// Failed to resolve immediately
channel.close();
promise.setFailure(resolveFailureCause);
} else {
// Succeeded to resolve immediately; cached? (or did a blocking lookup)
doConnect(resolveFuture.getNow(), localAddress, promise);
}
return promise;
}
// Wait until the name resolution is finished.
// 解析没有完成,则放在listener中连接
resolveFuture.addListener(new FutureListener<SocketAddress>() {
@Override
public void operationComplete(Future<SocketAddress> future) throws Exception {
if (future.cause() != null) {
channel.close();
promise.setFailure(future.cause());
} else {
doConnect(future.getNow(), localAddress, promise);
}
}
});
} catch (Throwable cause) {
promise.tryFailure(cause);
}
return promise;
}
doConnect方法负责连接远程服务器,调用channel的connect方法完成,channel.connect()方法负责调用java的connetc方法。
private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}
总结,Bootstrap中的方法风味两类:设置属性和连接远程服务器,连接远程服务器的底层调用channel的连接,所以本质上连接操作是在channel里面做的,bootstrap中的连接过程如下:
1: 创建一个channel
2: 初始化一个channel
3: 注册一个channel到一个eventLoop中去,
4: 解析远程域名,但是未必能够正确解析
5: 调用channel.connect连接远程服务器