Channel,EventLoop和ChannelFuture类构成了Netty网络抽象的代表:
- Channel:对应Socket
- EventLoop:对应控制流,多线程处理,并发
- ChannelFuture:对应异步通知
Channel接口
Channel是对Socket的封装,大大降低了直接使用Socket的复杂性。
EventLoop接口
EventLoop用于处理连接的生命周期中所发生的事件。在服务端编程中,EventLoop起到了监听端口连接和数据读取的工作。
ChannelFuture接口
Netty中的所有IO操作都是异步的,一个操作不会立即返回,我们需要在执行操作之后的某个时间点确定其结果的方法,即ChannelFuture接口,其addListener方法注册了一个ChannelFutureListener,一遍在某个操作完成时得到通知。
ChannelHandler和ChannelPipeline
ChannelHandler充当了所有处理入站和出站数据的应用程序逻辑的容器。ChannelHandler常常实现了传入数据包拆分以及业务逻辑控制的功能。
ChannelPipeline接口
ChannelPipeline为ChannelHandler链提供容器,并定义了用于在该链上传播入站和出站事件流的API。即ChannelPipeline其实就是一个保存很多ChannelHandler的链表。
BootStrap接口
Netty的引导类为应用程序的网络层配置提供了容器。有两种类型的引导:一种用户客户端,称为Bootstrap,另一种称为用于服务器,称为ServerBootstrap。
示例
上面提到的这些接口在实际使用过程中并不是挨个使用的,而是使用BootStrap接口来将需要的接口组织在一起,下面是个基于netty的服务端的例子:
public class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
.handler(new ServerHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
}
});
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("channelActive");
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
System.out.println("channelRegistered");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
System.out.println("handlerAdded");
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
new Thread(new Runnable() {
@Override
public void run() {
// 耗时的操作
String result = loadFromDB();
ctx.channel().writeAndFlush(result);
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
// ...
}
}, 1, TimeUnit.SECONDS);
}
}).start();
}
private String loadFromDB() {
return "hello world!";
}
}
下面就针对上面使用的ServerBootstrap类来分析Netty是如何来封装java nio相关的操作的。
Netty 如何获取NIO中需要的Channel
首先从ChannelFuture f = b.bind(8888).sync();
中的bind方法开始分析。
这个bind方法经过层层调用,最终会调用initAndRegister方法:
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
...
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
...
从上述源码中可以看到channel来自channelFactory的newChannel方法。ChannelFactory是一个接口,我们只能从它的实现类ReflectiveChannelFactory来看下具体实现了:
private final Class<? extends T> clazz;
@Override
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
可以看出这个工厂方法的newChannel其实就是根据clazz属性来反射出具体的对象。那当前代码中的clazz属性值是多少呢?回到一开始示例执行的channel(NioServerSocketChannel.class)
这句代码,这个channel方法实现如下:
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
所以ServerBootstrapy引导类中使用的通道就是channel方法中设置的NioServerSocketChannel通道。
NioServerSocketChannel实现
接下来就要来看下Netty的NioServerSocketChannel内部是怎么调用java NIO中的ServerSocketChannel的。
NioServerSocketChannel内部实现步骤如下:
- newSocket() 通过jdk来创建底层jdk channel
- AbstractNioChannel() 方法中调用configureBlocking(false) 设置通道为非阻塞模式
- AbstractNioChannel() 方法创建id,unsafe,pipeline
接下面就上面几个点来看下NioServerSocketChannel类的源码。
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException("Failed to open a server socket.", e);
}
从NioServerSocketChannel的构造函数可以看出java NIO的SelectorProvider.provider().openServerSocketChannel()生成了ServerSocketChannel通道。
接下来看第二个点,即在哪设置通道的非阻塞模式,NioServerSocketChannel构造函数如下:
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
...
}
super方法调用了父类AbstractNioChannel的构造函数如下:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
...
ch.configureBlocking(false);
...
所以NioServerSocketChannel的构造函数中将通道设置了非阻塞模式。除了设置为非阻塞模式外,还调用了父类的构造函数super(parent),具体如下:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
从上面这段代码中可以看出在构造函数中创建了一个新的pipeline,并且这个链表一开始是空的。
在介绍了ServerBootstrap中如何调用java NIO中的ServerSocketChannel的实现后,我们接下来看ServerBootstrap是如何初始化服务器Channel的。还是从示例中ChannelFuture f = b.bind(8888).sync();
中bind方法中的initAndRegister方法中的代码开始分析:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
}
...
}
abstract void init(Channel channel) throws Exception;
之前我们分析了从channelFactory中获取ServerChannel通道的逻辑,这里再分析下ServerBootstrap中这个init方法:
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
init方法首先将传入的childOption,childAttr属性保存到ServerBootstrap对象中;另外又把用户通过handler方法设置的handler对象添加到pipeline尾部,然后再在尾部添加一个ServerBootstrapAcceptor对象,里面包装了用户通过childHandler方法传入的对象。
注册Selector
上面主要学习了netty内部是如何封装了ServerSocketChannel等NIO相关的操作的,接下来再继续看下netty内部是如何封装NIO中注册选择器selector的。
回到我们之前提到的initAndRegister方法内部:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
......
}
ChannelFuture regFuture = config().group().register(channel);
......
}
我们已经讲过获取ServerSocketChannel和初始化通道这两个部分,接下来再看下面config().group().register(channel);
内部是如何实现注册Selector的。
由于EventLoopGroup是接口,在本例中实现类为NioEventLoop。
NioEventLoop类实现的register方法:
public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
// 参数校验......
try {
ch.register(selector, interestOps, task);
} catch (Exception e) {
throw new EventLoopException("failed to register a channel", e);
}
}
可以看出最终执行的SeverSocketChannel.register(selector,interestOps,task);
方法,即还是执行了java NIO中通道注册到选择器selector的方法。
端口绑定
上面我们已经分析了AbstractBootstrap类里面doBind方法中initAndRegister方法的内部实现:创建ServerSocketChannel通道,初始化,注册通道到选择器selector上这三步。下面再继续分析doBind方法中后面关于端口绑定的实现。
从下面代码中的doBind0方法开始解析:
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
.......
}
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
从上面源码中可以看出,在doBind0方法内部执行了ServerSocketChannel的bind方法,即进行端口的绑定操作。