话不多说直接从代码入手
服务器端
NioEventLoopGroup group = new NioEventLoopGroup();//1
try {
ServerBootstrap bootstrap = new ServerBootstrap();//2
bootstrap.group(group,group) //3
.channel(NioServerSocketChannel.class) //4
.localAddress(new InetSocketAddress(port)) //5
.childHandler(new ChannelInitializer<SocketChannel>() {//6
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new EchoServerHandler());
}
});
ChannelFuture channelFuture = bootstrap.bind().sync(); //7
System.out.println(EchoServer.class.getName() + "started and listen on " + channelFuture);
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();//8
}
以上是用netty实现最简单的服务器程序的一段代码,相比于用NIO实现相应功能,这样的代码不能再简洁了。
下面来分析一下这几句代码各自封装了什么功能:
1、此处声明了一个EventLoopGroup,顾名思义就是一个eventloop。
查看其继承关系EventLoopGroup->MultithreadEventLoopGroup ->MultithreadEventExecutorGroup
MultithreadEventExecutorGroup中有以下几个属性
private final EventExecutor[] children;
private final Set<EventExecutor> readonlyChildren;
private final AtomicInteger childIndex = new AtomicInteger();
private final AtomicInteger terminatedChildren = new AtomicInteger();
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
private final EventExecutorChooser chooser;
其主要属性即为EventExecutor[] children;
定位到其创建的地方
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
super(parent);
if (executor == null) {
throw new NullPointerException("executor");
}
this.addTaskWakesUp = addTaskWakesUp;
this.executor = executor;
taskQueue = newTaskQueue();
}
这里管理了一个taskQueue用于保存将要执行的任务。并在线程中不断poll队列中的task并执行。
protected Runnable takeTask() {
assert inEventLoop();
if (!(taskQueue instanceof BlockingQueue)) {
throw new UnsupportedOperationException();
}
BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
for (;;) {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
Runnable task = null;
try {
task = taskQueue.take();
if (task == WAKEUP_TASK) {
task = null;
}
} catch (InterruptedException e) {
// Ignore
}
return task;
} else {
long delayNanos = scheduledTask.delayNanos();
Runnable task = null;
if (delayNanos > 0) {
try {
task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) { // Waken up.
return null;
}
}
if (task == null) { // We need to fetch the scheduled tasks now as otherwise there may be a chance that // scheduled tasks are never executed if there is always one task in the taskQueue. // This is for example true for the read task of OIO Transport // See https://github.com/netty/netty/issues/1614 fetchFromScheduledTaskQueue();
task = taskQueue.poll();
}
if (task != null) {
return task;
}
}
}}
总而言之,Netty中的EventLoopGroup就是建立了一个EventLoop数组。并在其中不断处理新的事务,其中包括selector的轮询操作和一些用户自定义的Task。
2、此处申明了一个Bootstrap
查看其代码,发现其只有两个构造函数,一个无参构造函数,一个复制构造函数。
《Netty权威指南》中对其解释如下:
其根本原因为它的参数太多了,而且未来也可能会发生变化,为了解决这个问题,就需要引入Builder模式。
这个类是Netty的一个辅助类,提供方法设置启动相关的参数。
3、绑定group
从下面的代码可以看出应该有两个group来完成Server端,parent负责acceptor,child作为client,而当其只传一个group时,这个group需要完成两件事情。
/**
* Specify the {@link EventLoopGroup} which is used for the parent (acceptor) and the child (client).
*/
@Override
public ServerBootstrap group(EventLoopGroup group) {
return group(group, group);
}
/**
* Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
* {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
* {@link Channel}'s.
*/
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
4,5、绑定相应的channel,并为其绑定端口
这里通过反射的工厂方法建立了一个NIOServerSocketChannel
/**
* The {@link Class} which is used to create {@link Channel} instances from.
* You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your
* {@link Channel} implementation has no no-args constructor.
*/
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
6、设置处理handle,在pipeline中添加相应的回调函数
private class EchoServerHandler extends ChannelHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
}
}
7、最后一步绑定本地端口,启动服务
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();//用工厂方法创建Channel
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.executor = channel.eventLoop();
}
doBind0(regFuture, channel, localAddress, promise);
}
});
return promise;
}
}
在newChannel后对channel进行初始化,这个方法由ServerBootstrap实现,代码如下
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();//设置socket相应的属性
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();//将AbstractBootstrap的handler添加到NioServerSocektChannel的ChannelPipeline中
if (handler() != null) {
p.addLast(handler());
}
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));//将注册的ServerBootstrapAcceptor注册到pipeline中
}
});
}