netty前言
1.BIO
同步阻塞I/O,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制来改善。BIO方式适用于连接数目比较小且固定的架构,这种方式对服务端资源要求比较高,并发局限于应用中,在jdk1.4以前是唯一的io现在,但程序直观简单易理解。
/**
* BIO模型 每线程 每连接
* 优势:可以接收很多的连接
* 弊端:线程内存浪费 cpu调度消耗
* 根源:BLOCKING 阻塞 :accept recv
* 解决:NONBLOCKING 非阻塞
*/
public class BIOServerTest {
public static void main(String[] args) throws Exception {
ServerSocket server = new ServerSocket(8090);//1.socker -> fd 2.bind(fd,8090) 3.listen(fd)
System.out.println("create server and port is 8090");
while (true){
Socket client = server.accept();//accept(fd, 阻塞
System.out.println("client link and clint port is "+ client.getPort());
new Thread(new Runnable() {
Socket ss;
public Runnable setSS(Socket ss){
this.ss = ss;
return this;
}
@Override
public void run() {//多线程防止大文件读取阻塞
InputStream is = null;
try {
is = ss.getInputStream();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is));
while(true){
System.out.println(bufferedReader.readLine());//recv( 阻塞
}
} catch (IOException e) {
e.printStackTrace();
}
}
}.setSS(client)).start();
}
}
}
2.NIO
同步非阻塞I/O,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有IO请求时才启动一个线程进行处理。NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,jdk1,4开始支持。
/**
* NIO模型
* 优势:规避多线程
* 弊端:连接没有数据也会遍历。10万个连接,只有1个发来数据,遍历一次需要向内核发送10万次recv系统调用,99999次是浪费的 消耗时间和资源(用户空间向内核空间的循环遍历)
* 解决:多路复用器 select(1024) poll epoll 通过一次系统调用把fds,传给内核,内核遍历,这种遍历减小了系统调用的次数。
* select poll 弊端:重复传递fd 解决:内核开辟空间保留fd
* 每次select poll 都要重新遍历全量的fd 解决:计组深度知识,中断,callback,增强
*/
public class NIOServerTest {
public static void main(String[] args) throws Exception {
LinkedList<SocketChannel> clients = new LinkedList<>();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8090));
ssc.configureBlocking(false);//设置为非阻塞 NONBLOCKING
System.out.println("create server and port is 8090");
while (true){
//客服端连接
Thread.sleep(1000);
SocketChannel client = ssc.accept();//不会阻塞 没有连接返回null
if(client == null){
System.out.println("no client");
}else{
client.configureBlocking(false);//设置读取为非阻塞
System.out.println("client link and clint port is " + client.socket().getPort());
clients.add(client);
}
ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
//遍历连接的客服端
for(SocketChannel sc : clients){
int num = sc.read(buffer);//不会阻塞
if(num > 0){
buffer.flip();
byte[] data = new byte[buffer.limit()];
buffer.get(data);
// todo
String a = new String(data);
System.out.println("clint port is " + sc.socket().getPort() + ":" + a);
buffer.clear();
}
}
}
}
}
3.多路复用器
/**
* 多路复用器 解决io状态问题
* 优势:一次系统调用,循环所有io状态。减少用户到内核系统调用的过程
* epoll_create() = 7
* epoll_ctl(7,add,accept)
* epoll_wait 阻塞 timeout
*/
public class MultiplexerTest {
private ServerSocketChannel server = null;
private Selector selector = null;//多路复用器(select poll epoll kqueue)
int port = 8090;//端口
/**
* 初始化服务器
*/
public void init(){
try {
server = ServerSocketChannel.open();//socket -> 4
server.configureBlocking(false);//设置为非阻塞 NONBLOCKING
server.bind(new InetSocketAddress(port));//绑定端口 bind
selector = Selector.open();//linux 优先epoll -> epoll_create() = 7
server.register(selector, SelectionKey.OP_ACCEPT);//epoll_ctl(7,add,4,)
} catch (IOException e) {
e.printStackTrace();
}
}
public void run(){
init();
System.out.println("服务器启动完成.........");
try {
while(true){
Set<SelectionKey> keys = selector.keys();
System.out.println("keys size : " + keys.size());
while(selector.select(1000) > 0){ //select poll epoll-> select(fds) poll(fds) epoll_wait()
Set<SelectionKey> selectionKeys = selector.selectedKeys();//返回所有状态的fd集合
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if(key.isAcceptable()){
acceptHandler(key);
}else if (key.isReadable()){
readHandler(key);
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void readHandler(SelectionKey key) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
buffer.clear();
try {
int num = 0;//不会阻塞
num = sc.read(buffer);
if(num > 0){
buffer.flip();
byte[] data = new byte[buffer.limit()];
buffer.get(data);
// todo
String a = new String(data);
System.out.println("clint port is " + sc.socket().getPort() + ":" + a);
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void acceptHandler(SelectionKey key) {
try {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel client = ssc.accept();//调用accept接收客服端 fd7 有时间才触发
client.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
client.register(selector, SelectionKey.OP_READ, buffer);//select poll 在jvm中开辟数组存入fd7 epoll:epoll_ctl(7,ADD,8,)
System.out.println("new client : " + client.getRemoteAddress());
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
new MultiplexerTest().run();
}
}