Thrift提供的服务类实现有:
- 阻塞服务模型: TSimpleServer,TThreadPoolServer
- 非阻塞服务模型:TNonblockingServer,THsHaServer,TThreadedSelectorServer
TSimpleServer
使用的是单线程阻塞模型,他会启动一个线程,循环监听server的端口,并接收请求来进行处理。每次只能接收和处理一个socket连接,这种模型一般都不会使用。
TThreadPoolServer
为了解决单线程Server的问题,TThreadPoolServer通过线程池的方式来处理请求,线程池使用了SynchronousQueue同步队列。
private static ExecutorService createDefaultExecutorService(Args args) {
SynchronousQueue<Runnable> executorQueue =
new SynchronousQueue<Runnable>();
return new ThreadPoolExecutor(args.minWorkerThreads,
args.maxWorkerThreads,
args.stopTimeoutVal,
TimeUnit.SECONDS,
executorQueue);
}
主线程一直循环接收请求,接收到请求时将请求封装为WorkerProcess抛给线程池来处理。这种仍然是多线程阻塞模式
TNonblockingServer
在并发的场景下,如果并发数大于上述线程池的大小时,同样会导致并发的问题,所以TNonblockingServer通过使用java NIO的方法来解决线程池的问题。TNonblockingServer采用的是单线程非阻塞的模式,借助Channel/Selector机制。所有的socket都会被注册到selector上,在一个线程循环监控所有的socket。每次selector完成一次select,就会将已就绪的socket取出来进行处理。
/**
启动一个线程
*/
@Override
protected boolean startThreads() {
// start the selector
try {
selectAcceptThread_ = new SelectAcceptThread((TNonblockingServerTransport)serverTransport_);
selectAcceptThread_.start();
return true;
} catch (IOException e) {
LOGGER.error("Failed to start selector thread!", e);
return false;
}
}
/**
选取并处理io事件,对于有数据到来的 socket 进行数据读取操作,
对于有数据发送的 socket 则进行数据发送,
对于监听 socket 则产生一个新业务 socket 并将其注册到 selector 中。
**/
private void select() {
try {
// wait for io events.
selector.select();
// process the io events we received
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
// skip if not valid
if (!key.isValid()) {
cleanupSelectionKey(key);
continue;
}
// if the key is marked Accept, then it has to be the server
// transport.
if (key.isAcceptable()) {
handleAccept();
} else if (key.isReadable()) {
// deal with reads
handleRead(key);
} else if (key.isWritable()) {
// deal with writes
handleWrite(key);
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (IOException e) {
LOGGER.warn("Got an IOException while selecting!", e);
}
}
select方法的执行是在上边启动的线程中,那么很明显,如果接受到socket的任务后handle中有阻塞的操作,必然会导致整个服务的阻塞。而且TNonblockingServer是单线程的模型,对于并发调用的任务依然是按顺序一个一个的执行。所以,这种服务模型比较适应于并发高,业务逻辑简单的场景。对于并发低,接收到数据执行的任务比较繁重的场景,可能就需要用到别的服务模型了。
Java NIO 参考文章
THsHaServer
由于TNonBlockingServer的缺点,THsHaServer继承了TNonBlockingServer,并通过线程池来提高任务处理的并发能力。THsHaServer是半同步半异步的处理模式,Half-Aysnc用于IO事件处理(Accept/Read/Write),Half-Sync用于业务handler对rpc的同步处理上。
半同步:通过SelectAcceptThread线程同步轮询IO就绪事件,调用就绪的channel来accept,read,write事件。
半异步:将上述事件的调用封装成一个Runnale交给线程池执行,而同步轮询的SelectAcceptThread线程中直接返回进行下一轮轮询。
THsHaServer启动时创建线程池
THsHaServer通过重载Invoke,通过线程池来处理所有的事件:
显而易见,THsHaServer的多线程完美解决的了TNonBlocking的单线程的事件处理逻辑,但是线程池同样是有上限的,并发量较大时,并且单次发送的数据量比较大时,监听到的新的socket请求同样不会被及时处理。
TThreadedSelectorServer
TThreadedSelectorServer是当前Thrift中能提供的最高级的线程服务模型。
AcceptThread线程对象,专门负责处理监听socket上的新连接。
Set<SelectorThread>负责处理所有的网络I/O请求。
invoker负责处理SelectorThread接收到的请求。
源码分析
accept线程启动时通过循环一直进行select
protected class AcceptThread extends Thread {
// The listen socket to accept on
private final TNonblockingServerTransport serverTransport;
private final Selector acceptSelector;
private final SelectorThreadLoadBalancer threadChooser;
/**
* Set up the AcceptThead
*
* @throws IOException
*/
public AcceptThread(TNonblockingServerTransport serverTransport,
SelectorThreadLoadBalancer threadChooser) throws IOException {
this.serverTransport = serverTransport;
this.threadChooser = threadChooser;
this.acceptSelector = SelectorProvider.provider().openSelector();
this.serverTransport.registerSelector(acceptSelector);
}
/**
* The work loop. Selects on the server transport and accepts. If there was
* a server transport that had blocking accepts, and returned on blocking
* client transports, that should be used instead
*/
public void run() {
try {
if (eventHandler_ != null) {
eventHandler_.preServe();
}
while (!stopped_) {
select();
}
} catch (Throwable t) {
LOGGER.error("run() on AcceptThread exiting due to uncaught error", t);
} finally {
try {
acceptSelector.close();
} catch (IOException e) {
LOGGER.error("Got an IOException while closing accept selector!", e);
}
// This will wake up the selector threads
TThreadedSelectorServer.this.stop();
}
}
acceptSelector选择器等到connect事件。判断selectedKeys是不是accept事件,如果是则调用handleAccept();否则不进行处理
private void select() {
try {
// wait for connect events.
acceptSelector.select();
// process the io events we received
Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
// skip if not valid
if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
handleAccept();
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (IOException e) {
LOGGER.warn("Got an IOException while selecting!", e);
}
}
通过doAccept获取serverTransport中一个TNonblockingTransport,通过threadChooser线程负载均衡器来选取selector线程,通过doAddAccept将selector和TNonblockingTransport绑定,来完成接下来的I/O读写事件。
/**
* Accept a new connection.
*/
private void handleAccept() {
final TNonblockingTransport client = doAccept();
if (client != null) {
// Pass this connection to a selector thread
final SelectorThread targetThread = threadChooser.nextThread();
if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {
doAddAccept(targetThread, client);
} else {
// FAIR_ACCEPT
try {
invoker.submit(new Runnable() {
public void run() {
doAddAccept(targetThread, client);
}
});
} catch (RejectedExecutionException rx) {
LOGGER.warn("ExecutorService rejected accept registration!", rx);
// close immediately
client.close();
}
}
}
}
调用SelectorThread的addAcceptedConnection,通过wakeup方法唤醒SelectorThread中的seletor。
private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {
if (!thread.addAcceptedConnection(client)) {
client.close();
}
}
public boolean addAcceptedConnection(TNonblockingTransport accepted) {
try {
acceptedQueue.put(accepted);
} catch (InterruptedException e) {
LOGGER.warn("Interrupted while adding accepted connection!", e);
return false;
}
selector.wakeup();
return true;
}
SelectorThread线程的run方法实现:
public void run() {
try {
while (!stopped_) {
select();
processAcceptedConnections();
processInterestChanges();
}
for (SelectionKey selectionKey : selector.keys()) {
cleanupSelectionKey(selectionKey);
}
} catch (Throwable t) {
LOGGER.error("run() on SelectorThread exiting due to uncaught error", t);
} finally {
try {
selector.close();
} catch (IOException e) {
LOGGER.error("Got an IOException while closing selector!", e);
}
// This will wake up the accept thread and the other selector threads
TThreadedSelectorServer.this.stop();
}
}
SelectorThread方法的select()监听IO事件,仅仅用于处理数据读取和数据写入。
当selector可读时,就会调用handleRead;当selector可写时,就会调用handlerWrite;
/*
* Select and process IO events appropriately: If there are existing
* connections with data waiting to be read, read it, buffering until a
* whole frame has been read. If there are any pending responses, buffer
* them until their target client is available, and then send the data.
*/
private void select() {
try {
// wait for io events.
selector.select();
// process the io events we received
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (!stopped_ && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
// skip if not valid
if (!key.isValid()) {
cleanupSelectionKey(key);
continue;
}
if (key.isReadable()) {
// deal with reads
handleRead(key);
} else if (key.isWritable()) {
// deal with writes
handleWrite(key);
} else {
LOGGER.warn("Unexpected state in select! " + key.interestOps());
}
}
} catch (IOException e) {
LOGGER.warn("Got an IOException while selecting!", e);
}
}
如果连接中有数据可读,读取完数据之后将会缓存在Frame中,如果Frame已经完整可读时,将会通过requestInvoke方法,将请求的数据封装为一个Runnable对象,提交到线程池进行执行。
protected void handleRead(SelectionKey key) {
FrameBuffer buffer = (FrameBuffer) key.attachment();
if (!buffer.read()) {
cleanupSelectionKey(key);
return;
}
// if the buffer's frame read is complete, invoke the method.
if (buffer.isFrameFullyRead()) {
if (!requestInvoke(buffer)) {
cleanupSelectionKey(key);
}
}
}
如果连接中有写数据已经处理完了,整个Rpc过程也就结束了。
protected void handleWrite(SelectionKey key) {
FrameBuffer buffer = (FrameBuffer) key.attachment();
if (!buffer.write()) {
cleanupSelectionKey(key);
}
}
select()方法完成后,线程继续运行processAcceptedConnections()方法处理下一个连接的IO事件
当前seletorThread线程会尝试从阻塞队列获取下一个accepted的TNonblockingTransport来处理读写I/O事件。也正是通过这种方式来复用seletorThread。
private void processAcceptedConnections() {
// Register accepted connections
while (!stopped_) {
TNonblockingTransport accepted = acceptedQueue.poll();
if (accepted == null) {
break;
}
registerAccepted(accepted);
}
}
protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans,
final SelectionKey selectionKey,
final AbstractSelectThread selectThread) {
return processorFactory_.isAsyncProcessor() ?
new AsyncFrameBuffer(trans, selectionKey, selectThread) :
new FrameBuffer(trans, selectionKey, selectThread);
}
private void registerAccepted(TNonblockingTransport accepted) {
SelectionKey clientKey = null;
try {
clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ);
FrameBuffer frameBuffer = createFrameBuffer(accepted, clientKey, SelectorThread.this);
clientKey.attach(frameBuffer);
} catch (IOException e) {
LOGGER.warn("Failed to register accepted connection to selector!", e);
if (clientKey != null) {
cleanupSelectionKey(clientKey);
}
accepted.close();
}
}
结合Java NIO,我们可以看到Thrift四种服务模式TSimpleServer,TThreadpoolServer,TNonBlockingServer,THsHaServer,TThreadedSelectorServer的不同特点。同样通过他们之间的对比,我们能够看到没有更好的服务模式,只有更合适的服务模式。