一、本次目标
- 改造server,采用NIO读取client信息;
- 改造client,亦采用NIO发送消息,与之前不同的BIO形成对比;
二、编码
1、新建byte数组拼接公共类
主要用作在channel读取数据时,因为缓存大小的原因,很可能出现中文半包(一个汉字的byte处在两次读取的buffer缓冲区中)而导致乱码;
package com.cjt.io;
public class CommonUtil {
/**
* byte数组拼接
*/
public static byte[] concat(byte[] first, byte[] second) {
byte[] result = new byte[first.length + second.length];
System.arraycopy(first, 0, result, 0, first.length);
System.arraycopy(second, 0, result, first.length, second.length);
return result;
}
}
2、提取channel缓存读取byte数组
便于server和client公共调用相同的代码实现;
package com.cjt.io.nio;
import com.cjt.io.CommonUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
class ChannelHandler {
static byte[] readFromChannel(SocketChannel channel, int buffSize) throws IOException {
// 创建缓存区
ByteBuffer buffer = ByteBuffer.allocate(buffSize);
byte[] result = {};
while ((channel.read(buffer)) != -1) {
// 切换为写模式,buffer的position置为0
buffer.flip();
// 读取buffer有效数据
byte[] bytes = new byte[buffer.limit()];
buffer.get(bytes, 0, bytes.length);
// 数组拼接
result = CommonUtil.concat(result, bytes);
// 清空缓存区
buffer.clear();
}
return result;
}
}
3、修改server,采用NIO机制读取client消息
需要添加selector进行管理channel,无限轮询就绪的channel主要分为两部分,首先需要将自己的serverChannel注册到selector中,同时获取对应clientChannel,设置相关参数也要注册到selector中;
package com.cjt.io.nio;
import com.cjt.io.Config;
import com.cjt.io.DateUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
public class ServerNIO implements Config {
private Selector selector;
private ServerNIO(int port) {
try {
// 创建选择器
selector = Selector.open();
// 打开监听通道
ServerSocketChannel channel;
channel = ServerSocketChannel.open();
// 非阻塞模式
channel.configureBlocking(false);
// 绑定端口
channel.socket().bind(new InetSocketAddress(port));
// 将channel注册到selector中,并且“兴趣模式”为“accept”
channel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("[" + DateUtil.getCurTimeStr() + "]:服务端已经启动");
} catch (IOException e) {
e.printStackTrace();
}
}
private void start() {
while (true) {
try {
// select方法会阻塞当前线程
System.out.println("[" + DateUtil.getCurTimeStr() + "]:开始轮询获取就绪的channel");
int readyChannels = selector.select();
if (readyChannels == 0) continue;
System.out.println("[" + DateUtil.getCurTimeStr() + "]:有chanel就绪啦");
Set<SelectionKey> keySet = selector.selectedKeys();
for (SelectionKey key : keySet) {
// 删除正在处理的key
keySet.remove(key);
if (key.isValid()) {
// 可接受的(server注册的key)
if (key.isAcceptable()) {
// 获取注册的server服务端channel通道
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
// 获取连接的client客户端channel通道
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("[" + DateUtil.getCurTimeStr() + "]:有新的客户端连接啦");
// 设置client的channel为非阻塞
socketChannel.configureBlocking(false);
// 注册当前clientChannel到selector中,并设置clientChannel为可读
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println();
}
// 可读的(client注册的key)
if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
byte[] result = ChannelHandler.readFromChannel(socketChannel, 1024);
String resultStr = new String(result, DEFAULT_ENCODE);
System.out.println("[" + DateUtil.getCurTimeStr() + "]:client传来消息");
System.out.println(resultStr);
System.out.println();
socketChannel.shutdownInput();
System.out.println("[" + DateUtil.getCurTimeStr() + "]:server写入消息");
System.out.println(resultStr);
System.out.println();
ByteBuffer buffer = ByteBuffer.wrap(result);
socketChannel.write(buffer);
// 当前channel操作完毕后一定要执行清除操作
key.cancel();
socketChannel.close();
}
}
}
} catch (IOException e) {
e.printStackTrace();
break;
}
}
}
public static void main(String[] args) {
new ServerNIO(DEFAULT_PORT).start();
}
}
其实现在我们可以用上一篇中的ClientBIO来测试,照例先启动server,然后运行client的main方法;
client控制台:
[19:53:50]:连接server成功
[19:53:50]:连接server成功
[19:53:51]:client写入消息
我是第二个客户端
[19:53:53]:client写入消息
我是第一个客户端
[19:53:53]:server传来消息
我是第一个客户端
[19:53:53]:server传来消息
我是第二个客户端
server控制台:
[19:53:15]:服务端已经启动
[19:53:15]:开始轮询获取就绪的channel
[19:53:50]:有chanel就绪啦
[19:53:50]:有新的客户端连接啦
[19:53:50]:开始轮询获取就绪的channel
[19:53:50]:有chanel就绪啦
[19:53:50]:有新的客户端连接啦
[19:53:50]:开始轮询获取就绪的channel
[19:53:50]:有chanel就绪啦
[19:53:53]:client传来消息
我是第一个客户端
[19:53:53]:server写入消息
我是第一个客户端
[19:53:53]:开始轮询获取就绪的channel
[19:53:53]:有chanel就绪啦
[19:53:53]:client传来消息
我是第二个客户端
[19:53:53]:server写入消息
我是第二个客户端
[19:53:53]:开始轮询获取就绪的channel
这里说个细节,仅仅server实现NIO,client相当于发起连接在server中就一直处于就绪状态,因而会造成阻塞,并且
while
循环读取数据时会无限循环直到client关闭socket流。如果操作ByteBuffer读取全部很容易造成OOM。
结合起来看很能发现问题:
- client中两个连接server是同时的,但是server中可以看到一个是50秒,一个是53秒;
- server中第一个就绪的channel连接在50秒,然后在53秒才打印出client的消息;
- server中接受client还是阻塞的;
所以仅仅是server实现NIO是不行的。
4、修改client实现NIO编程
这里简单处理,client收发消息都是单线程,就不需要采用selector进行管理;
package com.cjt.io.nio;
import com.cjt.io.Config;
import com.cjt.io.DateUtil;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
public class ClientNIO implements Config, Runnable {
private String serverIp;
private int port;
private String msg;
private int second;
private ClientNIO(String serverIp, int port) {
this.serverIp = serverIp;
this.port = port;
}
private ClientNIO second(int second) {
this.second = second;
return this;
}
private ClientNIO msg(String msg) {
this.msg = msg;
return this;
}
private void sleep(int second) {
try {
Thread.sleep(second * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void run() {
try (SocketChannel channel = SocketChannel.open()) {
if (channel.connect(new InetSocketAddress(serverIp, port))) {
System.out.println("[" + DateUtil.getCurTimeStr() + "]:连接server成功");
channel.configureBlocking(false);
byte[] bytes = msg.getBytes(DEFAULT_ENCODE);
ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.put(bytes);
buffer.flip();
sleep(second);
System.out.println("[" + DateUtil.getCurTimeStr() + "]:client写入消息");
System.out.println(msg);
channel.write(buffer);
channel.shutdownOutput();
byte[] result = ChannelHandler.readFromChannel(channel, 1024 * 1024);
String resultStr = new String(result, DEFAULT_ENCODE);
System.out.println("[" + DateUtil.getCurTimeStr() + "]:server传来消息");
System.out.println(resultStr);
System.out.println();
} else {
System.out.println("[" + DateUtil.getCurTimeStr() + "]:连接server失败");
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
ClientNIO client_1 = new ClientNIO(DEFAULT_ADDR, DEFAULT_PORT).second(3).msg("我是第一个客户端");
new Thread(client_1).start();
ClientNIO client_2 = new ClientNIO(DEFAULT_ADDR, DEFAULT_PORT).second(1).msg("我是第二个客户端");
new Thread(client_2).start();
}
}
基本步骤和server大同小异,那么我们再次测试看看;
client控制台:
[20:08:38]:连接server成功
[20:08:38]:连接server成功
[20:08:39]:client写入消息
我是第二个客户端
[20:08:39]:server传来消息
我是第二个客户端
[20:08:41]:client写入消息
我是第一个客户端
[20:08:41]:server传来消息
我是第一个客户端
server控制台:
[20:08:30]:服务端已经启动
[20:08:30]:开始轮询获取就绪的channel
[20:08:38]:有chanel就绪啦
[20:08:38]:有新的客户端连接啦
[20:08:38]:开始轮询获取就绪的channel
[20:08:38]:有chanel就绪啦
[20:08:38]:有新的客户端连接啦
[20:08:38]:开始轮询获取就绪的channel
[20:08:39]:有chanel就绪啦
[20:08:39]:client传来消息
我是第二个客户端
[20:08:39]:server写入消息
我是第二个客户端
[20:08:39]:开始轮询获取就绪的channel
[20:08:41]:有chanel就绪啦
[20:08:41]:client传来消息
我是第一个客户端
[20:08:41]:server写入消息
我是第一个客户端
[20:08:41]:开始轮询获取就绪的channel
client输出的没有任何变化,但是server就不一样了。server是在39秒也就是client在第一个写入消息时并不是连接server的时间,提示有就绪的channel,然后打印的也是“我是第二个客户端”。然后在41秒时再次提示有个就绪的channel,打印“我是第一个客户端”。Perfect!!我们要达到的不就是这种效果么(~ ̄▽ ̄)~
三、总结
相对传统的BIO,NIO的确要复杂不少,但是在高并发上效率要高得多。简单总结下server实现NIO的步骤:
- 创建selector选择器;
- 打开监听通道,设置为非阻塞,绑定好端口;
- 注册监听通道到之前的selector中,设置模式为“ACCEPT”;
- 编写主无限循环体,执行
selector.select()
阻塞线程获取就绪的channel; - 轮询就绪的SelectionKey,如果模式为“ACCEPT”说明是server的channel,获取连接server的client的channel,设置为非阻塞“READ”模式,并注册到selector中;如果模式为“READ”则说明是client的channel,直接创建ByteBuffer缓存区读取数据,一定要记得
key.cancel()
和socketChannel.close()
;
client实现NIO与server相差无几,只不过我这里简单处理做成单线程,实际上大多数的client还是需要加入selector监听通道,因为单独的读取和写入应该分为不同的线程处理。
从channel读取数据时,一定要注意读取的是有效数据,所以要好好掌握ByteBuffer的使用方法。