作者信息:
Doug Lea State University of New York at Oswego
dl@cs.oswego.edu
http://gee.cs.oswego.edu
译者水平有限,如有错误和纰漏,欢迎指正
大纲
可伸缩网络服务
事件驱动处理
-
reactor模式
- 基础版本
- 多线程版本
- 其他变种
java nio 非阻塞IO API一览
网络服务
web服务, 分布式对象, 等大多数网络服务具有相同的基本结构:
- Read request, 读请求
- Decode request, 解码请求
- Process service, 处理服务
- Encode reply, 编码响应
- Send reply, 发送响应
但是每个步骤的性质和成本又有所不同: xml解析, 文件传输, web页生成以及计算服务等...
经典服务设计
每一个处理程序(上图中的handler)都拥有自己的线程
经典ServerSocket 循环
public class ClassicServerSocketLoop {
private static final int PORT = 1992;
private static final int MAX_INPUT = 1024;
class Server implements Runnable {
@Override
public void run() {
try (ServerSocket ss = new ServerSocket(PORT)) {
while (!Thread.interrupted()) {
// 这里可使用单线程处理,或者线程池管理多个线程
new Thread(new Handler(ss.accept())).start();
}
} catch (IOException ignored) {
// ignored
}
}
}
static class Handler implements Runnable {
final Socket socket;
Handler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
byte[] input = new byte[MAX_INPUT];
socket.getInputStream().read(input);
byte[] output = process(input);
socket.getOutputStream().write(output);
} catch (IOException ignored) {
// ignored
}
}
private byte[] process(byte[] input) {
// 业务处理逻辑
return new byte[0];
}
}
}
可伸缩目标
负载不断增加时能优雅降级 (更多客户端接入,负载增加)
资源增加时性能能够持续提升(CPU, 内存, IC盘, 带宽等资源)
-
同样满足可用性以及性能目标:
- 低延迟
- 满足高峰需求
- 可调节的服务质量
分而治之(Divide and Conquer)通常是实现可伸缩目标的最有效方式
分而治之
将处理程序划分为小的任务, 每个小任务以非阻塞的方式执行.
当小任务可以执行时, 再执行任务. 通常, IO事件充当触发器的角色:
-
java nio支持的基本机制:
- 非阻塞的读和写
- 分发与IO事件关联的任务
-
无尽的变化可能
- 一系列事件驱动设计
事件驱动设计
事件驱动设计通常比其他同类的可选设计更加有效:
- 所需资源更少: 无需为每个客户端分配一个线程
- 更少的开销: 更少的上下文切换, 更少的同步操作
- 但是分发会更慢: 必须手动将事件和处理程序绑定
事件驱动设计在编码上更加复杂:
-
必须将一个完整的任务切分为简单的非阻塞任务
- 与GUI事件启动动作相似
- 不能消除所有的阻塞, 比如: GC, 页错误等
必须持续跟踪服务的逻辑状态
背景资料: AWT中的事件
事件驱动IO使用相同的思想, 但是设计方面有所不同
reactor模式
- 响应IO事件时, reactor将事件分发给合适的处理器处理——与AWT线程相似
- 处理器执行非阻塞操作——与AWT的ActionListeners相似
- 将事件处理器绑定到具体事件——与AWT的 addActionListener操作相似
- 参考
Schmidt et al, Pattern-Oriented Software Architecture, Volume 2 (POSA2)
, 或者Richard Stevens's的网络编程书籍, 以及Matt Welsh's的SEDA架构等.
基本reactor设计
以上是单线程版本
java nio支持
- 渠道channel: channel是文件, socket等的连接, 支持非阻塞读
- 缓冲buffer: buffer是数组一样的对象, 可直接被channel读写
- 选择器selector: select可监控注册在其上的channel集合IO事件的发生
- selectionKey: selectionKey维护IO事件的状态以及事件和处理器的绑定关系(原本的selection并不支持维护绑定关系的功能, 只是reactor模式利用selectionKey的attachment特性实现了这一功能而已)
reactor 1: 初始设置
class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocketChannel;
Reactor(int port) throws IOException {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
sk.attach(new Acceptor());
}
// 类未完
reactor 2: 分发循环
// 继续类Reactor
@Override
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> sks = selector.selectedKeys();
Iterator<SelectionKey> it = sks.iterator();
while (it.hasNext()) {
dispatcher(it.next());
}
// 也可以在while循环中使用iterator的remove方法
sks.clear();
}
} catch (IOException ignored) {
// ignored
}
}
private void dispatcher(SelectionKey sk) {
Runnable r = (Runnable) sk.attachment();
if (null != r) {
r.run();
}
}
// 类未完
reactor 3: Acceptor
Acceport也是处理器EventHandler的一种, 用于处理socket accept事件
// 继续类Reactor
class Acceptor implements Runnable {
@Override
public void run() {
try {
SocketChannel c = serverSocketChannel.accept();
if (null != c) {
new Handler(selector, c);
}
} catch (IOException ignored) {
// ignored
}
}
}
}
// 类完成
reactor 4: handler设置
final class Handler implements Runnable {
final SocketChannel socket;
final SelectionKey sk;
static final int MAX_IN = 1024;
static final int MAX_OUT = 1024;
ByteBuffer input = ByteBuffer.allocate(MAX_IN);
ByteBuffer output = ByteBuffer.allocate(MAX_OUT);
static final int READING = 0, SENDING = 1;
int state = READING;
public Handler(Selector sel, SocketChannel c) throws IOException {
socket = c;
c.configureBlocking(false);
// 也可以注解注册SelectionKey.OP_READ; 这里先不关心任何事件, 后面注册读事件
sk = socket.register(sel, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
boolean inputIsComplete() {
// 加入实现
return true;
}
boolean outputIsComplete() {
// 加入实现
return true;
}
void process() {
// 加入实现
}
// 类未完
reactor 5: 请求处理
// 继续类Handler
@Override
public void run() {
try {
if (state == READING) {
read();
} else if (state == SENDING) {
write();
}
} catch (IOException ignored) {
// ignored.
}
}
void read() throws IOException {
socket.read(input);
process();
state = SENDING;
sk.interestOps(SelectionKey.OP_WRITE);
}
void write() throws IOException {
socket.write(output);
if (outputIsComplete()) {
sk.cancel();
}
}
}
// 类完成
另一种handler实现
使用GoF设计模式, 状态模式: 再绑定合适的处理器作为selectionKey的attchment
final class Handler implements Runnable {
final SocketChannel socket;
final SelectionKey sk;
static final int MAX_IN = 1024;
static final int MAX_OUT = 1024;
ByteBuffer input = ByteBuffer.allocate(MAX_IN);
ByteBuffer output = ByteBuffer.allocate(MAX_OUT);
static final int READING = 0, SENDING = 1;
int state = READING;
public Handler(Selector sel, SocketChannel c) throws IOException {
socket = c;
c.configureBlocking(false);
// 也可以注解注册SelectionKey.OP_READ; 这里先不关心任何事件, 后面注册读事件
sk = socket.register(sel, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
boolean inputIsComplete() {
// 加入实现
return true;
}
boolean outputIsComplete() {
// 加入实现
return true;
}
void process() {
// 加入实现
}
@Override
public void run() {
try {
socket.read(input);
if (inputIsComplete()) {
process();
sk.attach(new Sender());
sk.interestOps(SelectionKey.OP_WRITE);
sk.selector().wakeup();
}
} catch (IOException ignored) {
// ignored
}
}
class Sender implements Runnable {
@Override
public void run() {
try {
socket.write(output);
if (outputIsComplete()) {
sk.cancel();
}
} catch (IOException ignored) {
// ignored.
}
}
}
}
多线程设计
为可伸缩性考虑添加多线程: 主要适用于多处理器
-
工作线程
- Reactor应该快速触发处理器: 处理器的处理过程减慢了reactor的速度
- 将非IO处理放到其他线程中
-
多reactor线程
- reactor线程可以只做饱和IO, 将业务负载分发给其他线程: 采用负载均衡匹配CPU和IO速率
工作线程
卸载非IO处理, 以此加速reactor线程: 类似于 POSA2 Proactor设计
-
比将计算密集型处理重构为事件驱动形式更为简单
- 还应该是纯非阻塞计算, 足够的任务逻辑来抵消开销
-
但是与IO处理同时发生会更难
- 当可以首先将所有数据读入一个buffer, 最好不过了
-
使用线程池, 便于调整和控制
- 一般情况下所需的线程数少于客户端数量
工作线程池
使用线程池的handler
static class PoolHandler implements Runnable {
final SocketChannel socket;
final SelectionKey sk;
static ExecutorService pool = Executors.newFixedThreadPool(100);
static final int PROCESSING = 3;
static final int MAX_IN = 1024;
static final int MAX_OUT = 1024;
ByteBuffer input = ByteBuffer.allocate(MAX_IN);
ByteBuffer output = ByteBuffer.allocate(MAX_OUT);
static final int READING = 0, SENDING = 1;
int state = READING;
public PoolHandler(Selector sel, SocketChannel c) throws IOException {
socket = c;
c.configureBlocking(false);
// 也可以注解注册SelectionKey.OP_READ; 这里先不关心任何事件, 后面注册读事件
sk = socket.register(sel, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
boolean inputIsComplete() {
// 加入实现
return true;
}
boolean outputIsComplete() {
// 加入实现
return true;
}
void process() {
// 加入实现
}
synchronized void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
state = PROCESSING;
// 使用线程池处理业务
pool.execute(new Processor());
}
}
synchronized void write() throws IOException {
socket.write(output);
if (outputIsComplete()) {
sk.cancel();
}
}
synchronized void processAndHandOff() {
process();
// or rebind attachment
state = SENDING;
sk.interestOps(SelectionKey.OP_WRITE);
}
@Override
public void run() {
try {
if (SENDING == state) {
write();
} else if (READING == state) {
read();
}
} catch (IOException ignored) {
// ignored.
}
}
class Processor implements Runnable {
@Override
public void run() {
processAndHandOff();
}
}
}
协调任务
任务接力
每个任务触发或者调用下一个任务. 这种方式通常快速, 但是也很脆弱.回调
队列
Futures (java future ?)
使用 PooledExecutor
可调节的的工作线程池
main方法
execute(Runnable r)
-
控制:
- 任务队列的类型 (任何channel)
- 线程最大数量
- 线程最小数量
- "Warm" versus on-demand threads (不知道怎么翻译)
- 空闲线程消亡的keep-alive时间间隔: 如有需要,后面可以更换新的
- 饱和策略: 阻塞, drop, producer-runs 等
多个reactor线程
使用reactor线程池:
- 用来匹配CPU和IO速率
- 静态或者动态的构造方式: 每一个都拥有selector, 线程以及分发循环
- 主acceptor分发给其他reactor处理accept事件
Selector[] selectors; // also create threads
int next = 0;
class Acceptor { // ...
public synchronized void run() { ...
Socket connection = serverSocket.accept();
if (connection != null)
new Handler(selectors[next], connection);
if (++next == selectors.length) next = 0;
}
}
使用多个reactor:
使用其他java nio特性
一个reactor多个selector
- 绑定不同的处理器到不同的IO事件
- 需要认真仔细使用同步来协调多线程
文件传输
- 文件到网络或网络到文件的自动复制
内存映射文件
- 使用buffer访问文件
直接buffer
- 有时可以实现零拷贝传输
- 但是有启动和回收垃圾开销
- 适用于长连接的应用
基于连接的扩展
非单个服务连接
- 客户端连接
- 客户端发送一系列请求/消息
- 客户端断开连接
例子
- 数据库和事务监控
- 多个参与者的游戏, 聊天服务等
可扩展基本的网络服务模式
- 处理许多相对长连接的客户端
- 跟踪客户端session状态
- 分发跨域主机服务
API一览
- Buffer
- ByteBuffer/CharBuffer/LongBuffer等
- Channel
- SelectableChannel
- SocketChannel
- ServerSocketChannel
- FileChannel
- Selector
- SelectionKey
Buffer
abstract class Buffer {
int capacity();
int position();
Buffer position(int newPosition);
int limit();
Buffer limit(int newLimit);
Buffer mark();
Buffer reset();
Buffer clear();
Buffer flip();
Buffer rewind();
int remaining();
boolean hasRemaining();
boolean isReadOnly();
}
ByteBuffer
abstract class ByteBuffer extends Buffer {
static ByteBuffer allocateDirect(int capacity);
static ByteBuffer allocate(int capacity);
static ByteBuffer wrap(byte[] src, int offset, int len);
static ByteBuffer wrap(byte[] src);
boolean isDirect();
ByteOrder order();
ByteBuffer order(ByteOrder bo);
ByteBuffer slice();
ByteBuffer duplicate();
ByteBuffer compact();
ByteBuffer asReadOnlyBuffer();
byte get();
byte get(int index);
ByteBuffer get(byte[] dst, int offset, int length);
ByteBuffer get(byte[] dst);
ByteBuffer put(byte b);
ByteBuffer put(int index, byte b);
ByteBuffer put(byte[] src, int offset, int length);
ByteBuffer put(ByteBuffer src);
ByteBuffer put(byte[] src);
char getChar();
char getChar(int index);
ByteBuffer putChar(char value);
ByteBuffer putChar(int index, char value);
CharBuffer asCharBuffer();
short getShort();
short getShort(int index);
ByteBuffer putShort(short value);
ByteBuffer putShort(int index, short value);
ShortBuffer asShortBuffer();
int getInt();
int getInt(int index);
ByteBuffer putInt(int value);
ByteBuffer putInt(int index, int value);
IntBuffer asIntBuffer();
long getLong();
long getLong(int index);
ByteBuffer putLong(long value);
ByteBuffer putLong(int index, long value);
LongBuffer asLongBuffer();
float getFloat();
float getFloat(int index);
ByteBuffer putFloat(float value);
ByteBuffer putFloat(int index, float value);
FloatBuffer asFloatBuffer();
double getDouble();
double getDouble(int index);
ByteBuffer putDouble(double value);
ByteBuffer putDouble(int index, double value);
DoubleBuffer asDoubleBuffer();
}
Channel
interface Channel {
boolean isOpen();
void close() throws IOException;
}
interface ReadableByteChannel extends Channel {
int read(ByteBuffer dst) throws IOException;
}
interface WritableByteChannel extends Channel {
int write(ByteBuffer src) throws IOException;
}
interface ScatteringByteChannel extends ReadableByteChannel {
int read(ByteBuffer[] dsts, int offset, int length) throws IOException;
int read(ByteBuffer[] dsts) throws IOException;
}
interface GatheringByteChannel extends WritableByteChannel {
int write(ByteBuffer[] srcs, int offset, int length) throws IOException;
int write(ByteBuffer[] srcs) throws IOException;
}
SelectableChannel
abstract class SelectableChannel implements Channel {
int validOps();
boolean isRegistered();
SelectionKey keyFor(Selector sel);
SelectionKey register(Selector sel, int ops)
throws ClosedChannelException;
void configureBlocking(boolean block)
throws IOException;
boolean isBlocking();
Object blockingLock();
}
SocketChannel
abstract class SocketChannel implements ByteChannel ... {
static SocketChannel open() throws IOException;
Socket socket();
int validOps();
boolean isConnected();
boolean isConnectionPending();
boolean isInputOpen();
boolean isOutputOpen();
boolean connect(SocketAddress remote) throws IOException;
boolean finishConnect() throws IOException;
void shutdownInput() throws IOException;
void shutdownOutput() throws IOException;
int read(ByteBuffer dst) throws IOException;
int read(ByteBuffer[] dsts, int offset, int length)
throws IOException;
int read(ByteBuffer[] dsts) throws IOException;
int write(ByteBuffer src) throws IOException;
int write(ByteBuffer[] srcs, int offset, int length)
throws IOException;
int write(ByteBuffer[] srcs) throws IOException;
}
ServerSocketChannel
abstract class ServerSocketChannel extends ... {
static ServerSocketChannel open() throws IOException;
int validOps();
ServerSocket socket();
SocketChannel accept() throws IOException;
}
FileChannel
abstract class FileChannel implements ... {
int read(ByteBuffer dst);
int read(ByteBuffer dst, long position);
int read(ByteBuffer[] dsts, int offset, int length);
int read(ByteBuffer[] dsts);
int write(ByteBuffer src);
int write(ByteBuffer src, long position);
int write(ByteBuffer[] srcs, int offset, int length);
int write(ByteBuffer[] srcs);
long position();
void position(long newPosition);
long size();
void truncate(long size);
void force(boolean flushMetaDataToo);
int transferTo(long position, int count,
WritableByteChannel dst);
int transferFrom(ReadableByteChannel src,
long position, int count);
FileLock lock(long position, long size, boolean shared);
FileLock lock();
FileLock tryLock(long pos, long size, boolean shared);
FileLock tryLock();
static final int MAP_RO, MAP_RW, MAP_COW;
MappedByteBuffer map(int mode, long position, int size);
}
// NOTE: ALL methods throw IOException
Selector
abstract class Selector {
static Selector open() throws IOException;
Set keys();
Set selectedKeys();
int selectNow() throws IOException;
int select(long timeout) throws IOException;
int select() throws IOException;
void wakeup();
void close() throws IOException;
}
SelectionKey
abstract class SelectionKey {
static final int OP_READ, OP_WRITE,
OP_CONNECT, OP_ACCEPT;
SelectableChannel channel();
Selector selector();
boolean isValid();
void cancel();
int interestOps();
void interestOps(int ops);
int readyOps();
boolean isReadable();
boolean isWritable();
boolean isConnectable();
boolean isAcceptable();
Object attach(Object ob);
Object attachment();
}