描述
Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.
Netty是一个NIO客户端服务器框架,可以快速轻松地开发网络应用程序,例如协议服务器和客户端。它极大地简化和简化了网络编程,例如TCP和UDP套接字服务器。
先来看看JDK的javaNIO的编程
public static void main(String[] args) throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8000));
serverSocketChannel.configureBlocking(false);
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (!Thread.currentThread().isInterrupted()) {
while (true) {
int select = selector.select(2000);
if (select >= 0) {
break;
}
}
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
//位置1
while (iterator.hasNext()){
//位置2
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
System.out.println("isAcceptable");
ServerSocketChannel channel = (ServerSocketChannel)selectionKey.channel();
SocketChannel accept = channel.accept();
accept.configureBlocking(false);
//位置3
accept.register(selector, SelectionKey.OP_READ);
}
if(selectionKey.isReadable()){
System.out.println("isReadable");
SocketChannel channel = (SocketChannel)selectionKey.channel();
//处理
}
iterator.remove();
}
}
}
存在的问题:
位置1:如果多个链接注册在一个select中,多个链接共同准备好,链接处理需要等待,相当于串行。
解决方法:将链接分批注册到不同的select中
位置2:如果收到一个链接在处理的时候需要的时间过长,其他的链接都需要等待
解决方法:使用线程池来处理
位置3:建立链接和和处理链接同时注册到一个select中会导致建立链接和受处理链接的影响(cpu处理和IO处理在一块)
解决方法:建立链接的select和数据处理的selector分开
netty模型
- Netty分BossGroup和WorkerGroup,其中BossGroup主要用来建立channel,channel建立完成后将其注册到WorkerGroup中
- 每个Group中NioEventGourp相当于多个线程同时处理,比如WorkerGroup中的每个NioEventGroup都有一个selecor来注册建立的连接
BossGroup和WorkerGroup创建方法如下:
//参数为线程的数量,默认为:cpu核数*2
//Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup();
- BossGroup中如何有多个NioEventGourp呢?
一个端口开启一个SeverSocketChannel,如果只有一个端口,BossGroup线程数设置为1就行,多个端口的话可以设置多个线程
Netty代码
源码图
- NioEventLoopGroup为BossGroup和workerGroup,里面有多个NioEventLoop
- 每个NioEventLoop中会有一个Selector,用来注册相应的channel(BossGroup为NioServerChannle,WorkerGroup为NioSocketChannel)
- NioEventLoopGroup选择哪个channel注册到哪个selector是轮询选择
- 每个NioChannle中会有一个pipeline,是个双向队列,用来处理请求
- pipeline中各个元素为ChannelHandlerContext,会封装ChannelHandler
- ChannleHandler中有两种,channleInBound和ChannelOutBound,InBound用来处理接受的数据,channleOutBound处理返回的数据
- ChannelHandlerContext通过next方法查找ChannelOutBound,通过prv来查找channleOutBound,所有先注册的handler,数据流入的时候先执行,流出的时候后执行,也就是编解码为什么需要第一个注册的原因
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup(1);
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(new IdleStateHandler(5000, 0 ,0 , TimeUnit.MILLISECONDS));
p.addLast(new IdleHandler());
p.addLast(new HttpServerCodec());
p.addLast("httpAggregator",new HttpObjectAggregator(512*1024));
p.addLast(new HttpHandler());
}
});
// 绑定端口,开始接收进来的连接
ChannelFuture f = serverBootstrap.bind(9999).sync();
ChannelFuture f2 = serverBootstrap.bind(10000).sync();
ChannelFuture f = serverBootstrap.bind(9999).sync();
ChannelFuture f2 = serverBootstrap.bind(10000).sync();
会创建两个NioServerSocketChannel,都注册到bossGroup中的selector中去由于线程数是1所以注册到统一个selector中去
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(new IdleStateHandler(5000, 0 ,0 , TimeUnit.MILLISECONDS));
p.addLast(new IdleHandler());
p.addLast(new HttpServerCodec());
p.addLast("httpAggregator",new HttpObjectAggregator(512*1024));
p.addLast(new HttpHandler());
}
});
定义serverBootstrap的属性,BossGroup和workerGroup都是NioEventLoopGroup,所以执行流程一样,数据来了之后会找InboundHandler和OutBounderHandler来处理。两个是如何关联起来的?
在执行serverBootstrap.bind(9999)的bind方法是,会有init方法,此时会将workerGroup中注册收到的链接封装成一个InboundHandler加入NioServerSocketChannle的pipleline中去
代码见:
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
ServerBootstrapAcceptor的channelRead方法
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}