Blocking IO下线程模型
- 单线程模型
ServerSocket ss =new ServerSocket();
ss.bind(new InetSocketAddress("hostname",6379));
while (true) {
Socket socket = ss.accept();
handle(socket);
}
单线程模型将网络监听、网络IO处理和业务处理逻辑全部放在一个线程中,这种方式缺点显而易见:无法并发导致服务效率太低,系统吞吐量也太低。试想:如果某一时刻有1000个请求达到服务器,当前线程accept其中一个,并且在当前线程中执行handle()函数进行处理,那么剩余999个请求都要等待当前处理完成(阻塞),如果处理逻辑耗时较长,将有大量的请求超时。
- 多线程模型
ServerSocket ss =new ServerSocket();
ss.bind(new InetSocketAddress("hostname",6379));
while (true) {
final Socket socket = ss.accept();
new Thread(()->{
handle(socket);
}).start();
}
该模式使用一个线程监听网络端口,将业务逻辑放到新的独立线程中处理,这相比单线程模型服务效率和吞吐量都有很大提升。此时即便业务处理逻辑耗时很长,后续请求也不会阻塞。虽然使得服务效率和吞吐量有所提高,但仍有缺点:当并发量骤增时,服务线程数也必然大幅增加(一个服务请求对应一个业务处理线程),这将导致大量的cpu周期浪费在线程上线文切换的消耗中。此外,大量的线程创建和销毁动作,也将成为系统性能的瓶颈(线程创建和销毁都很耗时)。
- 线程池技术
ServerSocket ss =new ServerSocket();
ss.bind(new InetSocketAddress("hostname",6379));
while (true) {
final Socket socket = ss.accept();
ThreadPool.get().excute(()->{
handle(socket);
});
}
该模式是对多线程模型的改进。此模式在系统启动时创建一个线程池和一部分线程,使用线程池对线程的生命周期进行管理。服务接收到网络请求后,从池中获取线程进行处理,完成之后归还到池中,从而实现线程的复用。该模式只是避免了大量线程的创建和销毁动作,并没有解决线程上下文切换的消耗问题。
固有缺陷:
1. 服务更快的到达瓶颈,系统不能随着硬件的增加(更多更快的cpu,更大的内存,更多的网络带宽等)而带来更多的性能提升,线程上下文切换、数据同步及数据在各个CPU之间的复制移动等导致更多的开销。
2. 使用多线程编程意味着需要忍受其带来的变成复杂度,例如需要小心处理并发控制。
3. 服务的线程数应该由服务器的硬件资源决定,而不能是客户端的请求数量来决定(线程池技术已解决该问题)。
以上,是基于阻塞IO的线程模型,下面我们来看一看非阻塞IO下的线程模型应当如何设计。
Reactor模式
优化目标:
1. 可用性和性能目标:更低的延迟(short latency)、适应需求高峰(meeting peak demand)、保证服务质量(tunable quality of service)等。
2. 可扩展性:随着资源的增加,系统性能可获取持续性的提升(Continuous improvement with increasing resources (CPU, memory, disk, bandwidth))
3.随着负载的增加,系统可以自动降级(Graceful degradation under increasing load)。
途径:
Divide-and-conquer is usually the best approach for achieving any scalability goal. — Doug Lea
分而治之通常是实现可扩展性目标的最好途径。— 李道长
1. Divide processing into small tasks,each task perform an action without blocking.将处理过程划分为更小的任务,每一个小任务执行一个单一的功能或动作(必须是非阻塞的,否则将失去意义)。
2. Execute each task when it is enabled.Here, an IO event usually serves as trigger.IO事件触发后才执行相应的任务,此时IO时间通常用作触发器,这很容易让我们想到事件驱动模型。
我们先看一下Reactor模式的结构,看看他是如何做的:
Handle:
句柄,一般由操作系统提供,对资源在操作系统层面上的一种抽象.用于标志和识别事件源。它可以是打开的文件、一个连接(Socket)、Timer等。由于Reactor模式一般使用在网络编程中,因而这里一般指Socket Handle,即一个网络连接(Connection,在Java NIO中的Channel)。这个Channel注册到Synchronous Event Demultiplexer中,以监听Handle中发生的事件,对ServerSocketChannnel可以是CONNECT事件,对SocketChannel可以是READ、WRITE、CLOSE事件等。
同步事件分离器(Synchronous Event Demultiplexer)
可以理解为一个阻塞函数,用于监听句柄集合(Handle Set)中的事件。这个函数一旦调用,会一直阻塞,直到句柄集合中的句柄有一个是就绪(ready)状态,句柄的就绪状态意味着可以在以非阻塞的方式在该句柄上进行相关操作(meaning that an operation can be initiated on them without blocking)。select()方法是一个用于处理IO事件的基本同步事件分离器,在unix或者window上都可以用。
事件处理器(Event Handler)
事件处理器一般是由多个钩子函数(Hook Method)组成的接口,这些函数是用于处理特定句柄事件的操作集合,通常情况下特定的事件处理器中与特定的句柄有着对应关系. Each concrete event handler is associated with a handle that identifies this service within the application。
反应器(Reactor,also known as Initiation Dispatcher)
1. 用于管理Event Handler,即EventHandler的容器,用以注册、移除EventHandler等.
2. 作为Reactor模式的入口启动event loop。reactor使用同步事件分离器(Synchronous Event Demultiplexer)来获取句柄集合(Handle Set)中出现的事件。当他感兴趣的事件出现时,将其发送到handle对应的时间处理器,亦即调用EventHandler中的钩子方法。
Concrete Event Handler 特定的事件处理器,一般是EventHandler的实现类,处理特定的Handle 或者Handle中的特定事件。
交互流程:
1.主程序初始化Reactor,并将特定的handler注册到Reactor上
2. 由于handler需要指定它要处理的handle事件,因此需要获取handler对应的handle并形成对应关系
3.通过调用handle_events()来启动event loop,在eventloop中,通过调用select()来阻塞的等待Event.
4.一旦有handle进入ready状态,则select()方法返回,Reactor通过Handle事件类型获取相应的EventHandler,并调用handle_event()方法,handle_event()通过调用具体的service()方法实现处理。值得一提的是,handle_event()的过程中可以像Reactor中注册新的Handler.
Reactor模式的Java代码实现
单线程
public class Reactor {
//作为demultiplexer
final Selector selector;
//作为handle
final ServerSocketChannel ssc;
public Reactor( String hostname,int port) throws Exception {
this.ssc = ServerSocketChannel.open();
selector = Selector.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(hostname, port));
new Acceptor().start();
}
class Acceptor extends Thread {
@Override
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> selectionKeySet = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeySet.iterator();
while (iterator.hasNext()) {
SelectionKey sk = iterator.next();
if(sk.isValid()){
if(sk.isAcceptable()){
//handle
SocketChannel channel = ssc.accept();
channel.register(selector,SelectionKey.OP_READ);
}else {
dispatch(sk);
}
}
}
selectionKeySet.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("Thread:"+Thread.currentThread().getName()+" exit...");
}
}
void dispatch(SelectionKey selectionKey){
new ChannelHandler().handle(selectionKey);
}
}