前面分析过bind的流程,里面创建了channel。
channelFactory.newChannel()
这个channelFactory类型由之前的文章分析可知,是ReflectiveChannelFactory。
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}
@Override
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
ReflectiveChannelFactory类比较简单,这里通过反射生成了NioServerChannel。如下是该类的层次结构:
NioServerSocketChannel.jpg
那么来看看这个类的实例化:
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
这里使用默认的SelectorProvider.provider()调用newSocket来创建ServerSocketChannel对象,这个对象是java nio提供的ServerSocketChannelImpl。接着调用了父类的构造函数:
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
//设置创建的ServerSocketChannel
this.ch = ch;
//这里表示设置所感兴趣的事件
this.readInterestOp = readInterestOp;
try {
//非阻塞
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
AbstractChannel做了以下事情:
- 设置channel的id,初始化unsafe和pipeline
- 设置ServerSocketChannel和this.readInterestOp
- 将ServerSocketChannel设置为非阻塞,注意这个类是nio提供的
接着看看unsafe,它创建了一个NioMessageUnsafe,它是AbstractNioMessageChannel的内部类。
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
final SelectionKey key = selectionKey();
final int interestOps = key.interestOps();
for (;;) {
Object msg = in.current();
if (msg == null) {
// Wrote all messages.
if ((interestOps & SelectionKey.OP_WRITE) != 0) {
key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
}
break;
}
try {
boolean done = false;
for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
if (doWriteMessage(msg, in)) {
done = true;
break;
}
}
if (done) {
in.remove();
} else {
// Did not write all messages.
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
break;
}
} catch (IOException e) {
if (continueOnWriteError()) {
in.remove(e);
} else {
throw e;
}
}
}
这里其实在前面的文章中也分析到了,就是最后读写的地方。读和写都会触发pipeline相应操作,这个在下一篇专门描述。new PipeLine()也在下篇分析。
接着返回到上面的NioServerSocketChannel,调用完父类的构造方法以后,创建NioServerSocketChannelConfig类
private final class NioServerSocketChannelConfig extends DefaultServerSocketChannelConfig {
private NioServerSocketChannelConfig(NioServerSocketChannel channel, ServerSocket javaSocket) {
super(channel, javaSocket);
}
@Override
protected void autoReadCleared() {
clearReadPending();
}
}
public DefaultServerSocketChannelConfig(ServerSocketChannel channel, ServerSocket javaSocket) {
super(channel);
if (javaSocket == null) {
throw new NullPointerException("javaSocket");
}
this.javaSocket = javaSocket;
}
这个类是NioServerSocketChannel的内部类,也就是配置了之前创建的ServerSocketChannel的一些信息。
最后总结下:
- NioServerSocketChannel通过bind方法中的initAndRegister进行实例化。
- 创建了NIO提供的ServerSocketChannel。
- 绑定了unsafe对象,这个对象底层操作读写。
- 绑定了pipeline,这是个双向链表,里面包含了handler。
- 实例化完后会把NioServerSocketChannel注册到eventLoop上。