经典的网络编程
一般网络编程都具有以下几个步骤:
- 读取请求 Read request
- 解码请求 Decode request
- 处理服务 Process services
- 加密回应 Encode reply
- 发送回应 Send reply
但是每一步的处理的内容和成本都不一样。xml、json、file等等
每种类型的处理程序都需要在各自都线程中来进行,用代码表示就是如下
class Server implements Runnable {
public void run() {
try {
ServerSocket ss = new ServerSocket(PORT);
while (!Thread.interrupted())
new Thread(new Handler(ss.accept())).start();
// or, single-threaded, or a thread pool
} catch (IOException ex) { /* ... */ }
}
static class Handler implements Runnable {
final Socket socket;
Handler(Socket s) { socket = s; }
public void run() {
try {
byte[] input = new byte[MAX_INPUT];
socket.getInputStream().read(input);
byte[] output = process(input);
socket.getOutputStream().write(output);
} catch (IOException ex) { /* ... */ }
}
private byte[] process(byte[] cmd) { /* ... */ }
}
}
如果当前运行线程没有被中断就一直循环创建一个线程或者线程池用来处理ServerSocket里面的Socket请求。
注意:Thread.interrupted()和Thread.isInterrupted()
这样会造成我们需要为每一个socket请求创建一个线程来处理对应的数据。一旦用户过多或者处理程序时间较长就会造成各种各样的问题。无法并发,前面的活没干完后面的需要等着,负载等等
优化方向
增加负载
增加硬件 (CPU, memory, disk, bandwidth)
同时满足可用性和性能目标
减短延迟
满足高峰需求
提高服务质量
通常来说Divide-and-conquer(分而治之)是实现任何可扩展性目标的最佳方法
Divide-and-conquer(分而治之)
将整体任务切割成小任务。每个小任务只执行单一任务,并且不会阻塞其他小任务的运行
用IO事件来触发每个小任务的启动
java.nio 中支持的基本机制
非阻塞读取和写入
用感测到的IO事件来调度相关的任务
事件驱动设计中可能出现的无尽变化
Event-driven Designs 事件驱动设计
比较有效的方法
占用更少的资源,每个客户端不一定需要单独创建一个线程
减少开销。减少Context的切换可以相应的减少锁定
调度可能会更慢,所以必须手动将动作绑定到事件
更难的编程
将动作分解为简单非阻塞的
类似于GUI事件驱动的动作
无法消除所有阻塞。比如:GC,页面错误等
必须跟踪服务的逻辑状态
AWT事件机制
IO事件驱动使用相似的想法,但设计不同
java.awt是一个软件包,包含用于创建用户界面和绘制图形图像的所有分类
Reactor Pattern(反应堆模式)
- Reactor通过调度来响应IO事件。如:AWT thread
- Handler执行非阻塞动作。如:AWT ActionListeners
- Manager将Handler绑定到事件上。如:AWT addActionListener
预先使用Manager将Handler绑定到指定的事件上,如onClick
用户点击按钮的时候,Reactor获取到事件,并调度事先绑定好的处理程序
经典的Reactor设计
单线程版本
java.nio 支持
Channels
支持非阻塞的读取文件和socket连接
Buffers
Channels通过Buffers可以直接读取或者写入对象
Selectors
通知一组Channels触发了哪些IO事件
SelectionKeys
维护IO事件的状态和绑定
Reactor 实现
Setup
class Reactor implements Runnable {
//Selector选择器
final Selector selector;
//Socket服务通道
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException {
//创建一个Selector
selector = Selector.open();
//创建一个Socket Channel
serverSocket = ServerSocketChannel.open();
//将Socket Channel绑定到指定端口
serverSocket.socket().bind(
new InetSocketAddress(port));
//设置Socket Channel为非阻塞
serverSocket.configureBlocking(false);
//将Selector和Socket Channel注册到SelectionKey
SelectionKey sk =
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
//将SelectionKey附加到接受者
sk.attach(new Acceptor());
}
/*
也可以使用SPI提供接口:
SelectorProvider p = SelectorProvider.provider();
selector = p.openSelector();
serverSocket = p.openServerSocketChannel();
*/
}
Dispatch Loop
// class Reactor continued
public void run() { //通常在新线程中执行
try {
//如果当前线程没有中断就循环执行
while (!Thread.interrupted()) {
//查询选择器中获取已经准备好的并且注册过的操作
selector.select();
//获取所有已经准备好的并且注册过的操作
Set selected = selector.selectedKeys();
//循环遍历
for (Object o : selected) {
//调度任务并处理事件操作
dispatch((SelectionKey) o);
}
//移除选择器
selected.clear();
}
} catch (IOException ex) { /* ... */ }
}
//处理事件操作
void dispatch(SelectionKey k) {
//获取SelectionKey中绑定的处理程序,如果不为空就执行
Runnable r = (Runnable) (k.attachment());
if (r != null)
r.run();
}
Acceptor
// class Reactor continued
// 创建接收器
class Acceptor implements Runnable {
public void run() {
try {
//获取连接成功到客户端连接
SocketChannel c = serverSocket.accept();
if (c != null) {
//如果不为空就处理客户端连接以及selector
new Handler(selector, c);
}
} catch (IOException ex) { /* ... */ }
}
}
Handler setup
//处理程序
final class Handler implements Runnable {
//指定最大输入bytes
private static final int MAXIN = 1024;
//指定最大输出bytes
private static final int MAXOUT = 1024;
//客户端连接
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector sel, SocketChannel c) throws IOException {
socket = c;
//配置非阻塞模式
c.configureBlocking(false);
//将客户端连接和读注册到SelectionKey
sk = socket.register(sel, SelectionKey.OP_READ);
//将SelectionKey附加到当前线程的run
sk.attach(this);
//将SelectionKey的操作设置为读取
sk.interestOps(SelectionKey.OP_READ);
//唤醒Selector
sel.wakeup();
}
}
Request handling
// class Handler continued
//输入处理完成
boolean inputIsComplete() { /* ... */
return true;
}
//输出处理完成
boolean outputIsComplete() { /* ... */
return true;
}
//处理过程中
void process() { /* ... */ }
public void run() {
try {
//根据不同的状态进行不同的处理程序
if (state == READING) read();
else if (state == SENDING) send();
} catch (IOException ex) { /* ... */ }
}
//读取数据
void read() throws IOException {
//从客户端获取数据
socket.read(input);
//如果读取完成
if (inputIsComplete()) {
//处理数据
process();
//标记为发送状态
state = SENDING;
// 将SelectionKey的操作设置为写入
sk.interestOps(SelectionKey.OP_WRITE);
}
}
//发送数据
void send() throws IOException {
//将数据写入客户端连接
socket.write(output);
//发送完成后将SelectionKey中的绑定取消
if (outputIsComplete()) sk.cancel();
}
}
Per-State Handlers
GoF State-Object pattern 状态模式,针对状态重新绑定对应的处理程序
//处理程序
class Handler {
// 初始化为读取状态
public void run() {
//客户端读取数据
socket.read(input);
//读取完成
if (inputIsComplete()) {
//处理数据
process();
//附加新的处理程序Sender
sk.attach(new Sender());
//标记状态为写入
sk.interest(SelectionKey.OP_WRITE);
//唤醒SelectionKey中绑定的Selector
sk.selector().wakeup();
}
}
//处理程序Sender
class Sender implements Runnable {
public void run(){ // ...
//写入数据
socket.write(output);
//写入完成之后将SelectionKey中的绑定取消
if (outputIsComplete()) sk.cancel();
}
}
}
Multithreaded Designs 多线程设计
战略性的为扩展性增加线程
主要适用于多处理器
工作线程
Reactor可以快速的触发处理程序
因为处理程序过多或者处理时间过程会减慢Reactor的速度
将非IO处理放到其他的线程
多个Reactor处理线程
Reactor线程任务过多会导致IO饱和
分配一些任务给其他Reactor线程
负载均衡以匹配CPU和IO速率
Worker Threads 工作线程设计
将非IO处理放到其他的线程来加快Reactor线程
比计算绑定处理重新处理为事件驱动的形式更简单
应该仍然是纯非阻塞计算
足够的处理胜过开销
很难与IO重叠处理
最好能先将所有输入读入缓冲区
使用线程池可以进行调优和控制
通常需要的线程数比客户端少得多
Handler with Thread Pool 多线程处理
class Handler implements Runnable {
// 创建一个线程池
static PooledExecutor pool = new PooledExecutor(...);
//设置处理状态
static final int PROCESSING = 3;
//读数据操作,设计到多线程读取需要加线程锁
synchronized void read() {
//读取数据
socket.read(input);
//读取完成
if (inputIsComplete()) {
//标记为处理状态
state = PROCESSING;
//将处理过程放到线程池中执行
pool.execute(new Processer());
}
}
//处理数据线程
class Processer implements Runnable {
public void run() { processAndHandOff(); }
}
//处理数据并关闭
synchronized void processAndHandOff() {
//处理数据
process();
//标记处理完成并标记发送状态
state = SENDING; // 或者绑定其他操作
//将SelectionKey的操作设置为写入
sk.interest(SelectionKey.OP_WRITE);
}
}
协调任务Coordinating Tasks
Handoffs 传递
循环任务的启用、触发或调用下一个任务
通常是最快的但同时也是脆弱的
给每个处理程序触发回调
设置状态、附加处理程序等等
状态模式
Queues 队列
比如跨阶段传递buffers
Futures
当每个任务产生结果时触发
协调层位于连接或等待/通知之上
Using PooledExecutor 使用线程池执行
一个可优化的工作线程池
主方法执行(Runnable r)
控制
任务队列的类型(任何通道)
最大线程数
最小线程数
"Warm" 与按需加载线程
保持活动间隔,直到空闲线程死亡
如有必要,稍后将其替换为新的
饱和策略
阻塞、下降、生产运行等
Multiple Reactor Threads 多个Reactor线程
使用Reactor线程池
用于匹配CPU和IO速率
静态或动态构造
每个Reactor都有自己的选择器,线程,调度循环
主接收器分配到专用的Reactor
Using other java.nio features 使用其他的java.nio特性
一个Reactor对应多个Selectors
将不同的处理程序绑定到不同的IO事件
调度需要仔细处理线程安全
文件传输
自动化的文件到网络或网络到文件的复制
内存映射文件
通过缓冲区访问文件
直接访问缓冲区
有可能实现零拷贝传输吗
但是有设置和完成的开销
最适合长时间连接的应用
Connection-Based Extensions 基础连接的扩展
不能使用单个服务请求
客户端连接
客户端发送一系列消息/请求
客户端断开连接
举例
数据库和事务监控器
多人游戏,聊天等
可以扩展基本的网络服务模式
处理许多相对长连接的客户
跟踪客户端和会话状态(包括丢弃)
分布式部署服务
原文:Doug Lea Scalable IO in Java