前面在分析server channel启动过程时,在dobind()
函数中会调用channel的bind()
函数对channel进行绑定,这里就具体分析这个函数。
这个函数的具体实现为
AbstractChannel
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
只是简单的调用pipeline的bind()
函数
DefaultChannelPipeline
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
因为bind是一个outbound事件,从pipeline链尾tailContext开始执行
AbstractChannelHandlerContext
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
findContextOutbound()
最终会找到head context,执行head context的invokeBind()
,
AbstractChannelHandlerContext
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
最后调用到了unsafe的bind()
AbstractUnsafe
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
调用了doBind(localAddress)
,这个函数的实现:
NioServerSocketChannel
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
可见还是通过JDK调用channel socket的bind函数绑定服务器端口号。
在调用dobind()
前,保存channel的绑定状态
NioServerSocketChannel
public boolean isActive() {
return javaChannel().socket().isBound();
}
执行完dobind()后,
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
channel已经绑定了,fire channelActive事件, channelActive事件在piepline中的传递与channelRegiter相同,这里不具体跟踪过程了。pipeline.fireChannelActive()
会调用到head context的channelActive()
函数,
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
先执行channelActive事件的pipeline操作链,然后如果channel为auto read(事实上默认为auto read),则会执行下面这个函数,
public Channel read() {
pipeline.read();
return this;
}
原来会执行pipeline的read操作链,由于read是一个outbound事件,会从tailcontext往前执行outbound handler一直到head context。我们看下HeadContext的read()
函数,
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
调用到了unsafe的beginRead()
,继续跟下去,最后会调用到AbstractNioChannel
的doBeginRead()
:
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
这里真正把channel的OP_ACCEPT事件注册到了selecotr上,server channel可以接收客户端连接了