Channel介绍
Channel是JDK 的NIO类库中的重要组成部分,我们在之前的代码中也经常用到io.netty.channel.socket.nio.NioSocketChannel和io.netty.channel.socket.nio.NioServerSocketChannel用于阻塞性IO操作
NioServerSocketChannel类的继承图
从上可以看出,Channel是最顶层的抽象,一个Channel抽象为网络socket的连接或者读写,连接,绑定IO的一个组件。
一个Channel需要提供给用户:
1、channel当前的状态(打开还是已连接?)
2、channel的配置参数(channelConfig)
3、channel支持哪些IO操作
4、处理channel IO操作的事件
在Netty中所有的IO操作都是异步的,也就意味着IO的请求会立刻返回,并不能保证请求是否已完成。在Netty中IO请求会返回ChannelFuture实例。
Channel是有层级的,例如,对于服务端而言,父channel为空,对于客户端NioSocketChannel,它的父Channel就是创建它的ServerSocketChannel.
Channel主要API方法
下面对我们对Channel的主要API做简单介绍
Channel read() 从channel中读取数据到第一个buffer,如果数据被成功读取触发channelRead事件,如果数据被读取完成会触发channelReadComplete事件,如果有读操作被挂起,那么后续读操作会取消。
ChannelFuture write(Object msg) 将当前msg通过ChannelPipeLine写入到Channel,这个方法不会真正执行flush操作,所以当写入完成后要执行flush()方法才能将数据真正发送出去。
write(Object msg, ChannelPromise promise) 跟write功能一样,只是写入完成后会回调promise。
Channel flush() 将channel中所有缓存消息全部写入到目标channel,发送给通信对方。
ChannelFuture writeAndFlush(Object msg) 作用等于write+flush
ChannelFuture disconnect() 请求与远程通信对端断开连接,这个操作会级联触发ChannelHandler.disconnect(ChannelHandlerContext, ChannelPromise)事件
ChannelFuture connect(SocketAddress remoteAddress) 使用指定的remoteAddress发起连接请求,并且返回ChannelFuture对象,如果连接超时会ChannelFuture返回操作结果是ConnectTimeoutException,如果连接被拒绝操作结果是ConnectException,该操作会触发ChannelHandler#connect(ChannelHandlerContext, SocketAddress, SocketAddress, ChannelPromise)事件。
EventLoop eventLoop() 返回channel注册的eventLoop
ChannelConfig config() 返回channel的配置
ChannelMetadata metadata() 返回当前channel的元数据信息,比如TCP参数配置。
SocketAddress localAddress() 返回channel绑定的本地地址
SocketAddress remoteAddress() 返回channel通信的远程地址
Channel parent() 返回父级channel
AbstractChannel分析
AbstractChannel是Channel的基本实现类,它采用聚合的方式封装了各种功能,从成员变量聚合了以下内容:
private final Channel parent :父级channel
private final ChannelId id = DefaultChannelId.newInstance() 全局唯一id
private final Unsafe unsafe : unsafe示例
private final DefaultChannelPipeline pipeline; 当前channle对应的DefaultChannelPipeline
private final EventLoop eventLoop; 当前channel注册的EventLoop
前面提到channel的IO操作会触发会产生对应的IO事件,然后事件在ChannelPipeLine中传播,并由对应的ChannleHandler处理。
AbstractChannel的IO操作直接调用DefaultChannelPipeline的相关方法由DefaultChannelPipeline处理相关逻辑
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return pipeline.connect(remoteAddress, localAddress);
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}
……
AbstractNioChannel分析
AbstractChannel是Channel的基本实现类,那么AbstractNioChannel就是Channel基于选择器的实现类,它实现了核心的将Channel注册到Selector的功能。
首先看下成员变量
private static final InternalLogger logger =
InternalLoggerFactory.getInstance(AbstractNioChannel.class);
private final SelectableChannel ch;//JDK NIO SelectableChannel
protected final int readInterestOp;//JDK selectionKey 的OP_READ
private volatile SelectionKey selectionKey;
private volatile boolean inputShutdown;
/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
*/
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;
核心注册功能源码:
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
定义布尔类型变量selected标识是否注册成功,调用SelectableChannel的register将当前channel注册到EventLoop的多路复用器Selector上。
其中SelectableChannel的注册方法定义如下:
public abstract SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException;
第二个参数是 指定监听的网络操作位,表示channel对哪几类网络事件感兴趣,具体定义如下:
public static final int OP_READ = 1 << 0; //读事件
public static final int OP_WRITE = 1 << 2;//写事件
public static final int OP_CONNECT = 1 << 3;//客户端连接服务端事件
public static final int OP_ACCEPT = 1 << 4;//服务端接收客户端连接事件
其中注册时传的ops等于0,表示不对任何事件感兴趣,只是完成注册操作,注册成功之后返回selectionKey,通过selectionKey可以获取注册的channel.
如果注册返回的selectionKey被取消,则抛出CancelledKeyException异常,如果是第一次抛异常则调用selectNow将取消的selectionKey删除并将selected置为true并再次注册,如果仍未成功则抛出CancelledKeyException异常。
处理读操作之前设置网络操作位为读
protected void doBeginRead() throws Exception {
if (inputShutdown) {
return;
}
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
先判断Channel是否关闭,如果处于关闭则直接返回,然后获取当前的SelectKey的操作位与读操作位位于,如果结果为0标识没有设置过读操作位,最后通过或设置读操作位
AbstractNioByteChannel分析
AbstractNioByteChannel是Channel操作字节的实现,核心代码是protected void doWrite(ChannelOutboundBuffer in)
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = -1;
for (;;) {
Object msg = in.current(true);
if (msg == null) {
// Wrote all messages.
clearOpWrite();
break;
}
首先从ChannelOutboundBuffer环形数组中读取数据,如果数据为空,调用clearOpWrite清除读标志位
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
in.remove();
continue;
}
如果数据不为空,判断是否是ByteBuf类型,若是强制转换为ByteBuf,并判断ByteBuf中是否有可读字节,如果没有则将该消息从数组中删除,并继续处理其他消息。
boolean setOpWrite = false;
boolean done = false;
long flushedAmount = 0;
if (writeSpinCount == -1) {
writeSpinCount = config().getWriteSpinCount();
}
for (int i = writeSpinCount - 1; i >= 0; i --) {
int localFlushedAmount = doWriteBytes(buf);
if (localFlushedAmount == 0) {
setOpWrite = true;
break;
}
flushedAmount += localFlushedAmount;
if (!buf.isReadable()) {
done = true;
break;
}
}
设置半包标志setOpWrite,消息是否全部发送标识done,发送总消息数flushedAmount,对循环发送次数writeSpinCount判断,如果为-1从channel配置重获取循环发送次数,调用doWriteBytes进行循环发送,如果本次发送的字节数为0那么说明TCP缓冲区已满,所以讲写半包标识置为true并跳出循环。
如果发送字节数大于0那么对发送字节数进行累加,如果当前buf没有可读字节数了则标识buf写入完成,设置消息发送标识为true并跳出循环。
in.progress(flushedAmount);
if (done) {
in.remove();
} else {
incompleteWrite(setOpWrite);
break;
}
调用progress更新进度信息,如果消息已完全发送则将该buf从环形数组中移除,否则调用incompleteWrite方法
protected final void incompleteWrite(boolean setOpWrite) {
// Did not write completely.
if (setOpWrite) {
setOpWrite();
} else {
// Schedule flush again later so other tasks can be picked up in the meantime
Runnable flushTask = this.flushTask;
if (flushTask == null) {
flushTask = this.flushTask = new Runnable() {
@Override
public void run() {
flush();
}
};
}
eventLoop().execute(flushTask);
}
}
如果写半包标识setOpWrite为true,调用setOpWrite()重新设置写操作位setOpWrite()
protected final void setOpWrite() {
final SelectionKey key = selectionKey();
final int interestOps = key.interestOps();
if ((interestOps & SelectionKey.OP_WRITE) == 0) {
key.interestOps(interestOps | SelectionKey.OP_WRITE);
}
}
SelectKey设置为OP_WRITE后,Selector会不断轮询对应的Channel处理没有发送完成的半包消息,直到清除OP_WRITE标志为止。
如果没有设置半包标识,则需要新起一个线程来处理。