1、reactor(反应器)模式
使用单线程模拟多线程,提高资源利用率和程序的效率,增加系统吞吐量。下面例子比较形象的说明了什么是反应器模式:
一个老板经营一个饭店,
传统模式 - 来一个客人安排一个服务员招呼,客人很满意;(相当于一个连接一个线程)
后来客人越来越多,需要的服务员越来越多,资源条件不足以再请更多的服务员了,传统模式已经不能满足需求。老板之所以为老板自然有过人之处,老板发现,服务员在为客人服务时,当客人点菜的时候,服务员基本处于等待状态,(阻塞线程,不做事)。
于是乎就让服务员在客人点菜的时候,去为其他客人服务,当客人菜点好后再招呼服务员即可。 --反应器(reactor)模式诞生了
饭店的生意红红火火,几个服务员就足以支撑大量的客流量,老板用有限的资源赚了更多的money~~~~^_^
通道:类似于流,但是可以异步读写数据(流只能同步读写),通道是双向的,(流是单向的),通道的数据总是要先读到一个buffer 或者 从一个buffer写入,即通道与buffer进行数据交互。
通道类型:
FileChannel:从文件中读写数据。
DatagramChannel:能通过UDP读写网络中的数据。
SocketChannel:能通过TCP读写网络中的数据。
ServerSocketChannel:可以监听新进来的TCP连接,像Web服务器那样。对每一个新进来的连接都会创建一个SocketChannel。
- FileChannel比较特殊,它可以与通道进行数据交互, 不能切换到非阻塞模式,套接字通道可以切换到非阻塞模式;
缓冲区 - 本质上是一块可以存储数据的内存,被封装成了buffer对象而已!
缓冲区类型:
ByteBuffer
MappedByteBuffer
CharBuffer
DoubleBuffer
FloatBuffer
IntBuffer
LongBuffer
ShortBuffer
常用方法:
allocate() - 分配一块缓冲区
put() - 向缓冲区写数据
get() - 向缓冲区读数据
filp() - 将缓冲区从写模式切换到读模式
clear() - 从读模式切换到写模式,不会清空数据,但后续写数据会覆盖原来的数据,即使有部分数据没有读,也会被遗忘;
compact() - 从读数据切换到写模式,数据不会被清空,会将所有未读的数据copy到缓冲区头部,后续写数据不会覆盖,而是在这些数据之后写数据
mark() - 对position做出标记,配合reset使用
reset() - 将position置为标记值
缓冲区的一些属性:
capacity - 缓冲区大小,无论是读模式还是写模式,此属性值不会变;
position - 写数据时,position表示当前写的位置,每写一个数据,会向下移动一个数据单元,初始为0;最大为capacity - 1,切换到读模式时,position会被置为0,表示当前读的位置
limit - 写模式下,limit 相当于capacity 表示最多可以写多少数据,切换到读模式时,limit 等于原先的position,表示最多可以读多少数据。
非直接缓冲区:通过allocate() 方法 分配缓冲区,将缓冲区建立在JVM内存中
直接缓冲区:通过allocateDirect() 方法直接缓冲区 将缓冲区建立在物理内存中
2.1 关于缓冲区各个属性的测试
String str ="abcde";//1. 分配一个指定大小的缓冲区ByteBuffer buf = ByteBuffer.allocate(1024); System.out.println("--------------allocate()----------------"); System.out.println(buf.position());//0System.out.println(buf.limit());//1024System.out.println(buf.capacity());//1024//2. 利用put存入数据到缓冲区中去buf.put(str.getBytes()); System.out.println("----------------put()-------------------"); System.out.println(buf.position());//5System.out.println(buf.limit());//1024System.out.println(buf.capacity());//1024//3. 切换到读取模式buf.flip(); System.out.println("----------------flip()------------------"); System.out.println(buf.position());//0System.out.println(buf.limit());//5System.out.println(buf.capacity());//1024//4. 利用get() 读取缓冲区中的数据byte[] dst =newbyte[buf.limit()]; buf.get(dst); System.out.println(newString(dst,0,dst.length)); System.out.println("----------------get()------------------"); System.out.println(buf.position());//5System.out.println(buf.limit());//5System.out.println(buf.capacity());//1024//5.可重复读buf.rewind(); System.out.println("----------------rewind()------------------"); System.out.println(buf.position());//0System.out.println(buf.limit());//5System.out.println(buf.capacity());//1024//6.clear(): 清空缓冲区, 但是缓冲区的数据依然存在, 但是处于被遗忘的状态buf.clear(); System.out.println("----------------clear()-------------------"); System.out.println(buf.position());//0System.out.println(buf.limit());//1024System.out.println(buf.capacity());//1024byte[] newByte =newbyte[buf.limit()]; buf.get(newByte); System.out.println(newString(newByte,0,newByte.length));
2.2 关于通道的使用
1.利用通道进行 文件的复制 非直接缓冲区
FileInputStream fis =null; FileOutputStream fos =null; FileChannel inChannel =null; FileChannel outChannel =null;try{ fis =newFileInputStream("1.jpg"); fos =newFileOutputStream("2.jpg");// ①获取通道inChannel = fis.getChannel(); outChannel = fos.getChannel();// ②将通道中的数据存入缓冲区ByteBuffer byteBuffer = ByteBuffer.allocate(1024);// 将通道中的数据存入缓冲区while(inChannel.read(byteBuffer) != -1) { byteBuffer.flip();// 切换读取数据的模式outChannel.write(byteBuffer); byteBuffer.clear(); } }catch(IOException e) { e.printStackTrace(); }finally{if(inChannel !=null) {try{ inChannel.close(); }catch(IOException e) { e.printStackTrace(); } }if(outChannel !=null) {try{ outChannel.close(); }catch(IOException e) { e.printStackTrace(); } }if(fis !=null) {try{ fis.close(); }catch(IOException e) { e.printStackTrace(); } }if(fos !=null) {try{ fos.close(); }catch(IOException e) { e.printStackTrace(); } } }
2.通道之间的传输
CREATE_NEW:如果文件不存在就创建,存在就报错
CREATE:如果文件不存在就创建,存在创建(覆盖)
FileChannel inChannel =null; FileChannel outChannel =null;try{ inChannel = FileChannel.open(Paths.get("hello.txt"), StandardOpenOption.READ); outChannel = FileChannel.open(Paths.get("hello2.txt"), StandardOpenOption.READ,StandardOpenOption.WRITE,StandardOpenOption.CREATE_NEW); inChannel.transferTo(0, inChannel.size(), outChannel); }catch(Exception e) { e.printStackTrace(); }finally{if(inChannel !=null){try{ inChannel.close(); }catch(IOException e) { e.printStackTrace(); } }if(outChannel !=null){try{ outChannel.close(); }catch(IOException e) { e.printStackTrace(); } } }
3. 使用直接缓冲区完成内存文件的复制
FileChannel inChannel =null; FileChannel outChannel =null;try{ inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ); outChannel = FileChannel.open(Paths.get("x.jpg"), StandardOpenOption.READ,StandardOpenOption.WRITE,StandardOpenOption.CREATE_NEW); MappedByteBuffer inMappedBuffer = inChannel.map(MapMode.READ_ONLY,0, inChannel.size()); MappedByteBuffer outMappedBuffer = outChannel.map(MapMode.READ_WRITE,0, inChannel.size()); System.out.println(inMappedBuffer.limit());byte[] b =newbyte[inMappedBuffer.limit()];; inMappedBuffer.get(b); outMappedBuffer.put(b); }catch(Exception e) { e.printStackTrace(); }finally{if(inChannel !=null){try{ inChannel.close(); }catch(IOException e) { e.printStackTrace(); } }if(outChannel !=null){try{ outChannel.close(); }catch(IOException e) { e.printStackTrace(); } } }
2.3 重点 NIO-非阻塞IO
个人认为 NIO 最难的两点 一个是对于选择器和选择键的理解 其次是对于网络通信模型的理解
本章内容以防过长 只讲解 NIO 的使用方法 上述两点参看下回分解
阻塞IO示例:
//客户端@Testpublicvoidclient()throwsIOException{ SocketChannel sChannel = SocketChannel.open(newInetSocketAddress("127.0.0.1",9898)); FileChannel inChannel = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ); ByteBuffer buf = ByteBuffer.allocate(1024);while(inChannel.read(buf) != -1){ buf.flip(); sChannel.write(buf); buf.clear(); } sChannel.shutdownOutput();//接收服务端的反馈intlen =0;while((len = sChannel.read(buf)) != -1){ buf.flip(); System.out.println(newString(buf.array(),0, len)); buf.clear(); } inChannel.close(); sChannel.close(); }//服务端@Testpublicvoidserver()throwsIOException{ ServerSocketChannel ssChannel = ServerSocketChannel.open(); FileChannel outChannel = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.WRITE, StandardOpenOption.CREATE); ssChannel.bind(newInetSocketAddress(9898)); SocketChannel sChannel = ssChannel.accept(); ByteBuffer buf = ByteBuffer.allocate(1024);while(sChannel.read(buf) != -1){ buf.flip(); outChannel.write(buf); buf.clear(); }//发送反馈给客户端buf.put("服务端接收数据成功".getBytes()); buf.flip(); sChannel.write(buf); sChannel.close(); outChannel.close(); ssChannel.close(); }
非阻塞IO示例-TCP:
//客户端@Testpublicvoidclient()throwsIOException{//1. 获取通道SocketChannel sChannel = SocketChannel.open(newInetSocketAddress("127.0.0.1",9898));//2. 切换非阻塞模式sChannel.configureBlocking(false);//3. 分配指定大小的缓冲区ByteBuffer buf = ByteBuffer.allocate(1024);//4. 发送数据给服务端Scanner scan =newScanner(System.in);while(scan.hasNext()){ String str = scan.next(); buf.put((newDate().toString() +"\n"+ str).getBytes()); buf.flip(); sChannel.write(buf); buf.clear(); }//5. 关闭通道sChannel.close(); }//服务端@Testpublicvoidserver()throwsIOException{//1. 获取通道ServerSocketChannel ssChannel = ServerSocketChannel.open();//2. 切换非阻塞模式ssChannel.configureBlocking(false);//3. 绑定连接ssChannel.bind(newInetSocketAddress(9898));//4. 获取选择器Selector selector = Selector.open();//5. 将通道注册到选择器上, 并且指定“监听接收事件”ssChannel.register(selector, SelectionKey.OP_ACCEPT);//6. 轮询式的获取选择器上已经“准备就绪”的事件while(selector.select() >0){//7. 获取当前选择器中所有注册的“选择键(已就绪的监听事件)”Iterator it = selector.selectedKeys().iterator();while(it.hasNext()){//8. 获取准备“就绪”的是事件SelectionKey sk = it.next();//9. 判断具体是什么事件准备就绪if(sk.isAcceptable()){//10. 若“接收就绪”,获取客户端连接SocketChannel sChannel = ssChannel.accept();//11. 切换非阻塞模式sChannel.configureBlocking(false);//12. 将该通道注册到选择器上sChannel.register(selector, SelectionKey.OP_READ); }elseif(sk.isReadable()){//13. 获取当前选择器上“读就绪”状态的通道SocketChannel sChannel = (SocketChannel) sk.channel();//14. 读取数据ByteBuffer buf = ByteBuffer.allocate(1024);intlen =0;while((len = sChannel.read(buf)) >0){ buf.flip(); System.out.println(newString(buf.array(),0, len)); buf.clear(); } }//15. 取消选择键 SelectionKeyit.remove(); } } }
非阻塞IO示例-UDP:
@Testpublicvoidsend()throwsIOException{ DatagramChannel dc = DatagramChannel.open(); dc.configureBlocking(false); ByteBuffer buf = ByteBuffer.allocate(1024); Scanner scan =newScanner(System.in);while(scan.hasNext()){ String str = scan.next(); buf.put((newDate().toString() +":\n"+ str).getBytes()); buf.flip(); dc.send(buf,newInetSocketAddress("127.0.0.1",9898)); buf.clear(); } dc.close(); }@Testpublicvoidreceive()throwsIOException{ DatagramChannel dc = DatagramChannel.open(); dc.configureBlocking(false); dc.bind(newInetSocketAddress(9898)); Selector selector = Selector.open(); dc.register(selector, SelectionKey.OP_READ);while(selector.select() >0){ Iterator it = selector.selectedKeys().iterator();while(it.hasNext()){ SelectionKey sk = it.next();if(sk.isReadable()){ ByteBuffer buf = ByteBuffer.allocate(1024); dc.receive(buf); buf.flip(); System.out.println(newString(buf.array(),0, buf.limit())); buf.clear(); } } it.remove(); 欢迎工作一到五年的Java工程师朋友们加入Java群: 891219277
群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用自己每一分每一秒的时间来学习提升自己,不要再用"没有时间“来掩饰自己思想上的懒惰!趁年轻,使劲拼,给未来的自己一个交代!