Server
package nio.demo1;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class Server {
public static void main(String[] args) {
int port = 8001;
ServerSocketChannel ssc = null;
Selector selector = null;
try {
// 0:打开一个多路选择器
selector = Selector.open();
// 1:打开一个服务器端的socket用于监听某个端口
ssc = ServerSocketChannel.open();
// 2:配置是阻塞模式的
ssc.configureBlocking(false);
// 3:监听哪个端口,和最多的连接数量
ssc.bind(new InetSocketAddress(port),1024);
// 4:把server socket channel注册到selector里,key为accept
ssc.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("正在监听" + port + "端口");
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
// 5:不间断的循环,等待客户端链接进入
while (true) {
try {
// 6:阻塞。有一个链接进来,或周期时间到就放行。
selector.select(1000);
// 7:找出激活的channel所对应的key
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
// 8:处理输入流
handleInput(selector,key);
} catch (Exception e) {
// 关闭
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private static void handleInput(Selector selector, SelectionKey key) throws Exception {
if (!key.isValid()){
return;
}
// 1:这个key对应的channel是Server socket channel and has new socket channel coming,
// 是否有新的通道可被接收(进入)
if (key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
//1.1把新的通道注册到多路选择器中(Selector)
sc.register(selector,SelectionKey.OP_READ);
}
// 2:this key`s channel is Socket Channel that has new data coming
// 这个键对应的通道是Socket Channel,且有新的数据流进来
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
//2.1 定义一个缓存区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//2.2 把通道的数据读到缓存区里。再返回字节长度
int read = sc.read(buffer);
if (read > 0) {
//2.3 This method is often used in conjunction with the compact method
// when transferring data from one place to another
// 数据放入缓存区后,你要用时,调用一下这个方法。可以多次放入,再一次调用拿出来。
buffer.flip();
byte[] bytes = new byte[read];
//2.4 取出缓存区里的数据
buffer.get(bytes);
String s = new String(bytes, "utf-8");
System.out.println("client said: " + s);
// 2.5(非必须的) 用相同的通道返回数据
doWrite(sc,s + "6666");
}else if (read < 0) {
// 2.3 客户端关闭channel
key.cancel();
sc.close();
}
}
}
private static void doWrite(SocketChannel sc, String s) throws IOException {
byte[] bytes = s.getBytes();
ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.put(bytes);
buffer.flip();
sc.write(buffer);
}
}
客户端
package nio.demo1;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.UUID;
public class Client {
public static void main(String[] args) {
String host = "127.0.0.1";
int port = 8001;
SocketChannel sc = null;
Selector selector = null;
try {
//1. 打开一个channel
sc = SocketChannel.open();
//2. 配置非阻塞模式
sc.configureBlocking(false);
//3. 创建一个新的多路选择器
selector = Selector.open();
//4. 尝试去链接
//-.1 返回true: 阻塞模式:会像本地链接一下马上建立
//-.2 返回false: 非阻塞模式:在其它线程里建立链接,建好之后会调用finishConnect方法
if (sc.connect(new InetSocketAddress(host,port))) {
//4.1 已经链接的话,把通道以可读的key注册到选择器里。
sc.register(selector, SelectionKey.OP_READ);
doWrite(sc);
}else {
//4.1 还没有链接的话,把通道用要链接的key注册到选择器里。
sc.register(selector,SelectionKey.OP_CONNECT);
}
} catch (IOException e) {
e.printStackTrace();
}
while (true) {
try {
// 6:阻塞。有一个链接进来,或周期时间到就放行。
selector.select(1000);
// 7:找出激活的channel所对应的key
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
// 8:处理输入流
handleInput(selector,key);
}catch (Exception e) {
// 关闭
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
Thread.sleep(1000);
doWrite(sc);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private static void handleInput(Selector selector, SelectionKey key) throws IOException {
if (!key.isValid()) {
return;
}
SocketChannel sc = (SocketChannel) key.channel();
// 1: key 是可以连接的 channel完成成了连接
if (key.isConnectable() && sc.finishConnect()) {
// 1.1 把channel以可读的key,再注册一下。
sc.register(selector,SelectionKey.OP_READ);
}
// 2: 这个键对应的通道是Socket Channel,且有新的数据流进来
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
int read = sc.read(buffer);
if (read > 0) {
buffer.flip();
byte[] bytes = new byte[read];
buffer.get(bytes);
String s = new String(bytes, "utf-8");
System.out.println("server said: " + s);
}else if (read < 0) {
key.cancel();
sc.close();
}
}
}
private static void doWrite(SocketChannel sc) throws IOException {
byte[] bytes = UUID.randomUUID().toString().getBytes();
ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.put(bytes);
buffer.flip();
sc.write(buffer);
}
}