BIO
同步并阻塞,服务器实现模式为一个连接一个线程,BIO通信模型实现通常由一个独立的Acceptor线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成后,通过输出流返回应答给客户端,线程销毁。
服务端(线程池)
public class MultiThreadEchoServer {
//线程池,用于处理业务
private static ExecutorService tp = Executors.newCachedThreadPool();
//处理请求
static class HandleMsg implements Runnable {
Socket clientSocket;
public HandleMsg(Socket clientSocket) {
this.clientSocket = clientSocket;
}
public void run() {
BufferedReader is = null;
PrintWriter os = null;
try {
System.out.println(Thread.currentThread().getId() + ",线程开始处理。");
is = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
os = new PrintWriter(clientSocket.getOutputStream(), true);
//从InputStream中读取客户端锁发送的数据
String inputLine;
long b = System.currentTimeMillis();
while ((inputLine = is.readLine()) != null) {
System.out.println(Thread.currentThread().getId() + ",from client: " + inputLine + ",用时:" + (
System.currentTimeMillis() - b) + "ms");
os.println(inputLine);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (is != null)
is.close();
if (os != null)
os.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
ServerSocket echoServer = null;
Socket clientSocket;
try {
echoServer = new ServerSocket(8000);
} catch (IOException e) {
System.out.println(e);
}
while (true) {
try {
clientSocket = echoServer.accept();
System.out.println(clientSocket.getRemoteSocketAddress() + " connect!");
tp.execute(new HandleMsg(clientSocket));
} catch (IOException e) {
System.out.println(e);
}
}
}
}
客户端(高并发)
public class HeavySocketClient {
private static ExecutorService tp = Executors.newCachedThreadPool();
private static final int sleep_time = 1000 * 1000 * 1000;
public static class EchoClient implements Runnable {
public void run() {
Socket client = null;
PrintWriter writer = null;
BufferedReader reader = null;
try {
client = new Socket();
client.connect(new InetSocketAddress("localhost", 8000));
long b = System.currentTimeMillis();
writer = new PrintWriter(client.getOutputStream(), true);
writer.println("He");
LockSupport.parkNanos(sleep_time);
writer.println("llo");
LockSupport.parkNanos(sleep_time);
writer.println("!");
writer.flush();
reader = new BufferedReader(new InputStreamReader(client.getInputStream()));
String readerLine;
while ((readerLine = reader.readLine()) != null) {
System.out.println("from server : " + readerLine);
}
long e = System.currentTimeMillis();
System.out.println("spend:" + (e - b) + "ms");
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (writer != null) writer.close();
if (reader != null) reader.close();
if (client != null) client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
EchoClient client = new EchoClient();
for (int i = 0; i < 3; i++) {
tp.execute(client);
}
}
}
服务端处理日志
/127.0.0.1:58421 connect!
/127.0.0.1:58422 connect!
10,线程开始处理。
/127.0.0.1:58423 connect!
11,线程开始处理。
12,线程开始处理。
12,from client: He,用时:2ms
11,from client: He,用时:2ms
10,from client: He,用时:3ms
11,from client: llo,用时:1002ms
12,from client: llo,用时:1002ms
10,from client: llo,用时:1002ms
11,from client: !,用时:2006ms
12,from client: !,用时:2006ms
10,from client: !,用时:2006ms
缺点
- 服务端的线程数和并发访问数成线性正比
- 服务端各个线程的处理时间完全取决于客服端的处理能力,以上例子中绝大部分时间用于IO等待而非服务端本身的业务处理,造成了资源的浪费
NIO
Reactor模型
responds to IO events by dispatching
the appropriate handler
通过调度合适的执行者来相应IO事件
- 分而治之
1、把处理过程拆分成明确的、职责单一的任务,使得每一个小的任务都可以采用非阻塞的方式才执行。
2、在任务状态是可执行时,才开始执行。
一个IO事件通常可以划分分为:read -> decode -> compute -> encode -> send
- 多路复用
NIO由原来的阻塞读写(占用线程)变成了单线程轮询事件,通过记录跟踪每个I/O流(sock)的状态,来同时管理多个I/O流 ,从而将IO阻塞操作从业务线程中抽取出来。
单线程版的Reactor 模型
可扩展性
为了实现可扩展性,我们可以战略性的新增线程来适应多处理器的计算机。
仔细分析一下我们需要的线程,其实主要包括以下几种:
- 事件分发器(Reactor),用于选择就绪的事件。
- I/O处理器,包括accept、read、write等
- 业务线程,在处理完I/O后,用于处理具体的业务。
- 单个Reactor-工作线程池
Reactor将非IO处理交给其他线程来处理。有的业务逻辑比较繁琐复杂且耗,有的还会有其他的阻塞I/O,如DB操作,RPC等。这些情况下我们就可以使用工作者线程池
- Reactor线程池
当单一的Reactor线程池模式达到饱和时,还能扩展成多个Reactor,使CPU、IO速率达到负载均衡。
单个Reactor-多线程模式
多Reactor模式
NIO 基本概念
缓冲区 Buffer
本质上是一块可以存储数据的内存,被封装成了buffer对象而已
通道 Channel
类似于流,但是可以异步读写数据(流只能同步读写),通道是双向的(流是单向的),通道的数据总是要先读到一个buffer 或者 从一个buffer写入,即通道与buffer进行数据交互
选择器 Selectors
相当于一个观察者,用来监听通道感兴趣的事件,一个选择器可以绑定多个通道。Selector会不断轮询注册在其上的Channel,如果某个Channel上面发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作
SelectionKeys
维护事件的状态,并与事件绑定,通过SelectionKey来完成IO的读写
- OP_ACCEPT
服务端收到一个连接请求
- OP_CONNECT
客户端发起连接
- OP_READ
当OS的读缓冲区中有数据可读
- OP_WRITE
当OS的写缓冲区中有空闲的空间
多线程-单Reactor模式
NIO服务端
/**
* 使用NIO实现多线程的Echo服务器
*
* @author guolinlin
* @version V1.0
* @since 2017-08-14 22:48
*/
public class EchoServer {
//用于处理所有网络连接
private Selector selector;
//用于统计服务器线程在一个客户端上花费的时间
public static Map<Socket, Long> time_stat = new HashMap<Socket, Long>(10240);
SimpleDateFormat df = new SimpleDateFormat("hh:mm:ss:SSS");
private void startServer() throws Exception {
//获得selector实例
selector = SelectorProvider.provider().openSelector();
//获得SocketChannel实例
ServerSocketChannel socketChannel = ServerSocketChannel.open();
//设置为非阻塞模式
socketChannel.configureBlocking(false);
//绑定端口
InetSocketAddress isa = new InetSocketAddress(8000);
socketChannel.socket().bind(isa);
/*
将ServerSocketChannel绑定到selector上,并注册它感兴趣的事件为accept,当selector发现
ServerSocketChannel有新的客户端连接时,就会通知ServerSocketChannel进行处理
*/
socketChannel.register(selector, SelectionKey.OP_ACCEPT);
//循环,等待-分发网络信息
for (; ; ) {
//获取已经准备就绪的SelectionKey
selector.select();
Set readyKeys = selector.selectedKeys();
Iterator i = readyKeys.iterator();
long e = 0;
while (i.hasNext()) {
SelectionKey selectionKey = (SelectionKey) i.next();
i.remove();
if (selectionKey.isAcceptable()) {
doAccept(selectionKey);
} else if (selectionKey.isValid() && selectionKey.isReadable()) {
if (!time_stat.containsKey(((SocketChannel) selectionKey.channel()).socket())) {
time_stat.put(((SocketChannel) selectionKey.channel()).socket(), System.currentTimeMillis());
}
doRead(selectionKey);
} else if (selectionKey.isValid() && selectionKey.isWritable()) {
doWrite(selectionKey);
e = System.currentTimeMillis();
long b = time_stat.remove(((SocketChannel) selectionKey.channel()).socket());
System.out.println("spead:" + (e - b) + "ms");
} else if (selectionKey.isConnectable()) {
System.out.println("isConnectable = true");
}
}
}
}
private void doAccept(SelectionKey selectionKey) {
ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
//域客户端通信的通道
SocketChannel clientChannel;
try {
clientChannel = server.accept();
//设置成非阻塞模式,要求系统在准备好IO后,再通知我们的线程来读取或者写入
clientChannel.configureBlocking(false);
//将这个channel注册到selector,并告诉Selector,我现在对读操作感兴趣
SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ);
EchoClient echoClient = new EchoClient();
clientKey.attach(echoClient);
System.out.println(
Thread.currentThread().getId() + ",do accept from port:" + clientChannel.socket().getPort() + "," + df
.format(new Date()));
} catch (Exception e) {
System.out.println("Failed to accept new client.");
e.printStackTrace();
}
}
private void doRead(SelectionKey selectionKey) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
System.out.println(
Thread.currentThread().getId() + ",do Read from port:" + channel.socket().getPort() + "," + df
.format(new Date()));
ByteBuffer bb = ByteBuffer.allocate(8192);
int len;
try {
len = channel.read(bb);
if (len < 0) {
channel.close();
selectionKey.cancel();
return;
}
} catch (Exception e) {
System.out.println("Failed to read from client.");
e.printStackTrace();
selectionKey.cancel();
return;
}
bb.flip();
//业务处理
new WorkHandle(selectionKey, bb);
}
public void doWrite(SelectionKey selectionKey) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
System.out.println(
Thread.currentThread().getId() + ",do Write from port:" + channel.socket().getPort() + "," + df
.format(new Date()));
EchoClient echoClient = (EchoClient) selectionKey.attachment();
LinkedList<ByteBuffer> outq = echoClient.getOutoutQueue();
ByteBuffer bb = outq.getLast();
try {
int len = channel.write(bb);
if (len == -1) {
channel.close();
selectionKey.cancel();
return;
}
if (bb.remaining() == 0) {
outq.remove();
}
} catch (Exception e) {
System.out.println("Failed to write from client.");
e.printStackTrace();
try {
channel.close();
} catch (IOException e1) {
}
selectionKey.cancel();
return;
}
if (outq.size() == 0) {
selectionKey.interestOps(SelectionKey.OP_READ);
}
}
public static void main(String[] args) {
EchoServer echoServer = new EchoServer();
try {
echoServer.startServer();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 工作线程
*
* @author guolinlin
* @version V1.0
* @since 2017-08-14 23:08
*/
public class WorkHandle {
//用于对每个客户端进行相应的处理
private static final ExecutorService service = Executors.newCachedThreadPool();
private SelectionKey selectionKey;
private ByteBuffer bb;
public WorkHandle(SelectionKey selectionKey, ByteBuffer bb) {
this.selectionKey = selectionKey;
this.bb = bb;
start();
}
public void start() {
service.submit(() -> {
System.out.println(Thread.currentThread().getId() + ",HandleMsg...");
EchoClient echoClient = (EchoClient) selectionKey.attachment();
echoClient.enqueue(bb);
selectionKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
//强迫selector强制返回
selectionKey.selector().wakeup();
});
}
}
NIO 服务端日志
1,do accept from port:62855,05:17:33:919
1,do accept from port:62856,05:17:33:920
1,do Read from port:62855,05:17:33:921
10,HandleMsg...
1,do accept from port:62857,05:17:33:925
1,do Read from port:62856,05:17:33:925
1,do Read from port:62857,05:17:33:926
11,HandleMsg...
1,do Write from port:62855,05:17:33:926
12,HandleMsg...
spead:6ms
1,do Write from port:62857,05:17:33:928
spead:2ms
1,do Write from port:62856,05:17:33:928
spead:3ms
1,do Read from port:62856,05:17:34:917
1,do Read from port:62855,05:17:34:918
12,HandleMsg...
1,do Read from port:62857,05:17:34:918
11,HandleMsg...
12,HandleMsg...
1,do Write from port:62856,05:17:34:918
spead:1ms
1,do Write from port:62855,05:17:34:918
spead:0ms
1,do Write from port:62857,05:17:34:918
spead:0ms
1,do Read from port:62857,05:17:35:922
1,do Read from port:62856,05:17:35:922
12,HandleMsg...
11,HandleMsg...
1,do Read from port:62855,05:17:35:922
11,HandleMsg...
1,do Write from port:62857,05:17:35:923
spead:1ms
1,do Write from port:62856,05:17:35:923
spead:1ms
1,do Write from port:62855,05:17:35:923
spead:1ms
多Reactor模式
多Reactor,也即一个主Reactor负责监控所有的连接请求,多个子Reactor负责监控并处理读/写请求,减轻了主Reactor的压力,降低了主Reactor压力太大而造成的延迟。
并且每个子Reactor分别属于一个独立的线程,每个成功连接后的Channel的所有操作由同一个线程处理。这样保证了同一请求的所有状态和上下文在同一个线程中,避免了不必要的上下文切换,同时也方便了监控请求响应状态。
/**
* 多Reactor
*
* @author guolinlin
* @version V1.0
* @since 2017-08-25 21:06
*/
public class MultiReactorService {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(8000));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//获取系统CPU核数
int coreNum = Runtime.getRuntime().availableProcessors();
SubReactor[] subReactors = new SubReactor[coreNum];
for (int i = 0; i < subReactors.length; i++) {
subReactors[i] = new SubReactor();
}
int index = 0;
while (selector.select() > 0) {
Set<SelectionKey> keys = selector.selectedKeys();
for (SelectionKey key : keys) {
keys.remove(key);
if (key.isAcceptable()) {
ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = acceptServerSocketChannel.accept();
socketChannel.configureBlocking(false);
System.out.println("Accept request from:" + socketChannel.getRemoteAddress());
SubReactor subReactor = subReactors[((index++) % coreNum)];
subReactor.addChannel(socketChannel);
subReactor.wakeup();
}
}
}
}
}
/**
* 用于读写的Reactor
*
* @author guolinlin
* @version V1.0
* @since 2017-08-25 21:10
*/
public class SubReactor {
private static final ExecutorService service = Executors
.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors());
private Selector selector;
public SubReactor() throws IOException {
this.selector = SelectorProvider.provider().openSelector();
start();
}
public void addChannel(SocketChannel socketChannel) throws ClosedChannelException {
socketChannel.register(this.selector, SelectionKey.OP_READ);
}
public void wakeup() {
this.selector.wakeup();
}
public void start() {
service.submit(() -> {
while (true) {
if (selector.select(500) <= 0) {
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = (SocketChannel) key.channel();
int count = socketChannel.read(buffer);
if (count < 0) {
socketChannel.close();
key.cancel();
System.out.println(socketChannel + "Read end");
continue;
} else if (count == 0) {
System.out.println(socketChannel + "Message size is 0");
continue;
} else {
System.out.println(socketChannel + "Read message :" + new String(buffer.array()));
}
}
}
}
});
}
}