因为Netty是NIO的二次开发,所以先了解IO相关知识,比如阻塞,同步异步,NIO等还是必要的
1.阻塞和非阻塞
阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态.
阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。
非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞或者挂起当前线程。
2.同步和异步
同步和异步关注的是消息通信机制
同步:就是在发出一个调用时,在没有得到结果之前,该调用就不返回。但是一旦调用返回,就得到返回值了。换句话说,就是由调用者主动等待这个调用的结果。
异步则是相反:调用在发出之后,这个调用就直接返回了,所以没有返回结果。换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果。而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用。
3.IO模型
IO模型更多的偏向于用户空间和内核空间的理解。主要有如下四种数据模型
3.1 同步阻塞
同步阻塞:即传统的IO模型,基于字节,字符流的方式传递数据
一直等待数据到达,然后从内核拷贝数据到用户空间,再然后返回数据
例子:站在我们平时用的到c/s架构中,比如我们客户端发起一个请求,需要一直等待服务端的结果返回,数据结果没有返回当前线程处于一直等待中,也不能做其他事。
3.2 同步非阻塞
默认创建的socket都是阻塞的,非阻塞IO要求socket被设置为NONBLOCK。需要当前线程自己去从内核获取结果,不过中途可以干自己的事情。
需要用户不停向内核轮询去获取结果,如果有结果了就从将数据返回。
同样的例子:站在我们平时用的到c/s架构中,比如我们客户端发起一个请求后我们的线程不会被阻塞,可以自己做其他的事情了,然后隔一段时间再请求服务器查看我们刚才的请求是否有结果了。重复这样直到拿到数据。
3.3 异步阻塞
异步阻塞IO:当前是阻塞的(当前线程挂起),不能做其他事,但是结果由内核控件把IO结果返回;不需要用户空间主动获取结果。
同样的例子:站在我们平时用的到c/s架构中,比如我们客户端发起一个请求后我们的线程会被阻塞,什么也不能干。不过等服务端数据处理好后,主动把数据发送给客户端。
3.4 异步非阻塞
当前线程执行内核调用后就返回,当内核数据获取到后,自己把当前的数据拷贝到内核空间通知用户空间直接获取。
同样的例子:站在我们平时用的到c/s架构中,比如我们客户端发起一个请求后我们的线程就自己处理自己的事情了,等服务器结果处理好了然后就把结果发送给客户端就不用客户端主动来拉数据了。
4.IO多路复用
多路复用IO模型(JAVA的 NIO就是采用此模式)
在多路复用IO模型中,会有一个线程(Java中的Selector)不断去轮询多个socket的状态,只有当socket真正有读写事件时,才真正调用实际的IO读写操作。因为在多路复用IO模型中,只需要使用一个线程就可以管理多个socket,系统不需要建立新的进程或者线程,也不必维护这些线程和进程,并且只有在真正有socket读写事件进行时,才会使用IO资源,所以它大大减少了资源占用。
这里我们的Selector是阻塞的,它需要去轮训socket状态,同时如果有相关事件需要阻塞处理相关的IO事件。
5. JAVA NIO
相比原来的BIO一个字节,或者字符传送数据,NIO在传送和接受两端都添加了一个缓存池来保存数据。并且采用channel通道来收发数据。
NIO的通道和流的区别如下:
- 通道可以同时进行读写,而流只能读或者只能写
- 通道可以实现异步读写数据
- 通道可以从缓冲读数据,也可以写数据到缓冲
通道和缓存类似这种,数据直接从buffer中获取。
Java NIO有三大法器:Channel,Buffer,Selector;
了解下NIO的数据收发源码:
客户端:
package com.visonwu;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
public class NIOClient {
public static void main(String[] args){
// 创建远程连接地址
InetSocketAddress remote = new InetSocketAddress("localhost",9999);
SocketChannel channel = null;
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
channel = SocketChannel.open();
channel.connect(remote);
Scanner reader = new Scanner(System.in);
while (true){
System.out.println("请输入需要发送的数据>");
String line = reader.nextLine();
if(line.equals("exit")){ // 表示停止输入了
break;
}
//buffer不细说,
buffer.put(line.getBytes("UTF-8"));
buffer.flip();
channel.write(buffer);
buffer.clear();
int readLength = channel.read(buffer);
if(readLength == -1){
break;
}
// 重置缓存游标
buffer.flip();
byte[] datas = new byte[buffer.remaining()];
// 读取数据到数组
buffer.get(datas);
System.out.println("from server : " + new String(datas, "UTF-8"));
// 清空缓存
buffer.clear();
}
}catch (Exception e){
e.printStackTrace();
}finally {
if(channel != null){
try {
channel.close();
}catch (IOException e){
e.printStackTrace();
}
}
}
}
}
服务端:
package www.visonwu;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.*;
import java.nio.channels.*;
import java.nio.channels.spi.AbstractSelector;
import java.util.Iterator;
import java.util.Scanner;
public class NIOServer implements Runnable {
// 多路复用器,选择器。用于注册通道
private Selector selector;
// 定义两个缓存 分别用于读和写;初始化空间大小为1024字节
private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
private ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
public static void main(String[] args){
new Thread(new NIOServer(9999)).start();
}
public NIOServer (int port){
init(port);
}
private void init(int port) {
try {
System.out.println("server starting at port " + port + "...");
this.selector = Selector.open();
// 开启服务通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 非阻塞,传递参数为true为阻塞模式
serverSocketChannel.configureBlocking(false);
// 绑定端口
serverSocketChannel.bind(new InetSocketAddress(port));
// 注册,并标记当前服务通道状态
/**
* register(Selector,int)
* int - 状态编码
* OP_ACCEPT :连接成功的标记
* OP_READ :可以读取数据的标记
* OP_WRITE :可以写入数据的标记
* OP_CONNECT :建立连接后的标记
*/
serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
System.out.println("server start");
}catch (IOException e){
e.printStackTrace();
}
}
@Override
public void run() {
//循环获取channel事件,有事件在处理,否则就阻塞。
while (true){
try {
// 阻塞方法,当至少一个通道被选中,此方法返回。
this.selector.select();
// 返回以选中的通道标记集合,集合保存的是通道的标记,相当于是通道的ID
Iterator<SelectionKey> keys = this.selector.selectedKeys().iterator();
while (keys.hasNext()){
SelectionKey key = keys.next();
// 将本次要处理的通道冲集合中删除,下次删除根据新的通道列表再次执行必要的业务逻辑
keys.remove();
// 通道是否有效
if(key.isValid()){
try {
if (key.isAcceptable()){
accept(key);
}
}catch (CancelledKeyException e){
key.cancel();
}
try{
if(key.isReadable()){
read(key);
}
}catch (CancelledKeyException cke){
key.cancel();
}
try{
if(key.isWritable()){
write(key);
}
}catch (CancelledKeyException cke){
key.cancel();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void write(SelectionKey key){
this.writeBuffer.clear();
SocketChannel channel = (SocketChannel)key.channel();
Scanner reader = new Scanner(System.in);
try {
System.out.println("put message for send to client > ");
String line = reader.nextLine();
writeBuffer.put(line.getBytes("UTF-8"));
writeBuffer.flip();
channel.write(writeBuffer);
channel.register(this.selector,SelectionKey.OP_READ);
}catch (IOException e){
e.printStackTrace();
}
}
private void read(SelectionKey key){
try {
// 清空读缓存
this.readBuffer.clear();
// 获取通道
SocketChannel channel = (SocketChannel)key.channel();
// 将通道中的数据读到缓存中。通道中的数据,就是客户端发送给服务器的数据。
int readLength = channel.read(readBuffer);
// 检查客户端是否写入数据
if(readLength == -1){
// 通道关闭
key.channel().close();
// 关闭连接
key.cancel();
return;
}
// flip,NIO中最复杂的操作就是Buffer的控制
/** Buffer中有一个游标。游标的信息在操作后不会归零,如果直接访问Buffer的话,数据有可能不一致。
* flip是重置游标的方法.NIO编程中,flip方法是常用的方法
*
*/
this.readBuffer.flip();
// 字节数据,保存具体数据。Buffer.remaining() ->获取Buffer中有效数据长度的方法。
byte[] datas = new byte[readBuffer.remaining()];
// 是将Buffer中的有效数据保存到有效数组中。
readBuffer.get(datas);
System.out.println("from" + channel.getRemoteAddress() + " client : " + new String(datas,"UTF-8"));
channel.register(this.selector,SelectionKey.OP_WRITE);
}catch (IOException e){
e.printStackTrace();
}
}
private void accept(SelectionKey key){
try {
// 此通道为init方法中注册到Seleor上的ServerSocketChannel
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
// 阻塞方法,当客户端发起请求后返回.此通道和客户端一一对应
SocketChannel channel = serverSocketChannel.accept();
channel.configureBlocking(false);
// 设置对用客户端的通道标记状态,此通道为读取数据使用的。
channel.register(this.selector,SelectionKey.OP_READ);
}catch (IOException e){
e.printStackTrace();
}
}
}
这里的NIO是属于同步非阻塞;Java的NIO2.0实现了异步非阻塞AIO(类:AsynchronousSocketChannel,AsynchronousServerSocketChannel)
参考书籍:《Netty权威指南》
参考网络:https://www.zhihu.com/question/19732473/answer/20851256
更加详细的介绍