1.简介
Netty是为了快速开发可维护的高性能、高可扩展、网络服务器和客户端程序而提供的异步事件驱动基础框架和工具。换句话说,Netty是一个Java NIO客户端/服务器框架
2.Netty目标
- 使开发可以做到“快速和轻松”
- 做到高性能和高扩展
3.创建第一个Netty项目
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
public class NettyDiscardServer {
private final int serverPort;
ServerBootstrap b = new ServerBootstrap();
public NettyDiscardServer(int port) {
this.serverPort = port;
}
public void runServer() {
//创建reactor 线程组
EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try {
//1 设置reactor 线程组
b.group(bossLoopGroup, workerLoopGroup);
//2 设置nio类型的channel
b.channel(NioServerSocketChannel.class);
//3 设置监听端口
b.localAddress(serverPort);
//4 设置通道的参数
b.option(ChannelOption.SO_KEEPALIVE, true);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
//5 装配子通道流水线
b.childHandler(new ChannelInitializer<SocketChannel>() {
//有连接到达时会创建一个channel
protected void initChannel(SocketChannel ch) throws Exception {
// pipeline管理子通道channel中的Handler
// 向子channel流水线添加一个handler处理器
ch.pipeline().addLast(new NettyDiscardHandler());
}
});
// 6 开始绑定server
// 通过调用sync同步方法阻塞直到绑定成功
ChannelFuture channelFuture = b.bind().sync();
Logger.info(" 服务器启动成功,监听端口: " +
channelFuture.channel().localAddress());
// 7 等待通道关闭的异步任务结束
// 服务监听通道会一直等待通道关闭的异步任务结束
ChannelFuture closeFuture = channelFuture.channel().closeFuture();
closeFuture.sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 8 优雅关闭EventLoopGroup,
// 释放掉所有资源包括创建的线程
workerLoopGroup.shutdownGracefully();
bossLoopGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
int port = NettyDemoConfig.SOCKET_SERVER_PORT;
new NettyDiscardServer(port).runServer();
}
}
- 客户端代码:
public class EchoClient {
public void start() throws IOException {
InetSocketAddress address =
new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
NioDemoConfig.SOCKET_SERVER_PORT);
// 1、获取通道(channel)
SocketChannel socketChannel = SocketChannel.open(address);
// 2、切换成非阻塞模式
socketChannel.configureBlocking(false);
//不断的自旋、等待连接完成,或者做一些其他的事情
while (!socketChannel.finishConnect()) {
}
Print.tcfo("客户端启动成功!");
//启动接受线程
Processer processer = new Processer(socketChannel);
new Thread(processer).start();
}
static class Processer implements Runnable {
final Selector selector;
final SocketChannel channel;
Processer(SocketChannel channel) throws IOException {
//Reactor初始化
selector = Selector.open();
this.channel = channel;
channel.register(selector,
SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
SelectionKey sk = it.next();
if (sk.isWritable()) {
ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);
Scanner scanner = new Scanner(System.in);
Print.tcfo("请输入发送内容:");
if (scanner.hasNext()) {
SocketChannel socketChannel = (SocketChannel) sk.channel();
String next = scanner.next();
buffer.put((Dateutil.getNow() + " >>" + next).getBytes());
buffer.flip();
// 操作三:通过DatagramChannel数据报通道发送数据
socketChannel.write(buffer);
buffer.clear();
}
}
if (sk.isReadable()) {
// 若选择键的IO事件是“可读”事件,读取数据
SocketChannel socketChannel = (SocketChannel) sk.channel();
//读取数据
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int length = 0;
while ((length = socketChannel.read(byteBuffer)) > 0) {
byteBuffer.flip();
Logger.info("server echo:" + new String(byteBuffer.array(), 0, length));
byteBuffer.clear();
}
}
//处理结束了, 这里不能关闭select key,需要重复使用
//selectionKey.cancel();
}
selected.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
new EchoClient().start();
}
}
4.Reactor反应器模式
4.1Reactor反应器模式中IO事件的处理流程
- 1.通道注册
- 2.查询选择
- 3.事件分发
- 4.io操作和业务处理
4.2Netty中的Channel通道组件
- 反应器模式和通道紧密相关,反应器的查询和分发的IO事件都来自于Channel通道组件。
- Netty中不直接使用Java NIO的Channel通道组件,对Channel通道组件进行了自己的封装。
- Netty还能处理Java的面向流的OIO(Old-IO,即传统的阻塞式IO)
-
Netty中的每一种协议的通道,都有NIO(异步IO)和OIO(阻塞式IO)两个版本
4.3Netty中的Reactor反应器
- Netty中的反应器有多个实现类,与Channel通道类有关系。对应于NioSocketChannel通道,Netty的反应器类为:NioEventLoop。
- NioEventLoop类绑定了两个重要的Java成员属性:一个是Thread线程类的成员,一个是Java NIO选择器的成员属性。
4.4 Netty中的Handler处理器
Netty的Handler处理器分为两大类:第一类是ChannelInboundHandler通道入站处理器;第二类是ChannelOutboundHandler通道出站处理器。二者都继承了ChannelHandler处理器接口
4.5Netty的流水性模式
- (1)反应器(或者SubReactor子反应器)和通道之间是一对多的关系:一个反应器可以查询很多个通道的IO事件。
- (2)通道和Handler处理器实例之间,是多对多的关系:一个通道的IO事件被多个的Handler实例处理;一个Handler处理器实例也能绑定到很多的通道,处理多个通道的IO事件。
- Netty设计了一个特殊的组件,叫作ChannelPipeline(通道流水线),它像一条管道,将绑定到一个通道的多个Handler处理器实例,串在一起,形成一条流水线。