上一篇说到了Tomcat的启动流程,那么请求处理的流程是如何完成的呢?
Connector接收请求
Connector是请求接收器,通过设置的协议处理器处理请求,以默认的Http11NioProtocol协议为例,Http11NioProtocol构造时创建了NioEndpoint,用来处理网络请求:
public Http11NioProtocol() {
super(new NioEndpoint());
}
Connector启动时,启动了protocolHandler(Http11NioProtocol), protocolHandler启动的时候又对endpoint进行了启动,NioEndpoint启动过程中绑定了本地端口并在Acceptor线程中监听网络连接(详情可见Tomcat启动流程),所以Tomcat接收网络请求的起点是从Accetpor线程开始的。
Acceptor中会循环阻塞调用serverSock.accept()来监听请求:
......
try {
// Accept the next incoming connection from the server
// socket
socket = endpoint.serverSocketAccept();
} catch (Exception ioe) {
// We didn't get a socket
endpoint.countDownConnection();
if (endpoint.isRunning()) {
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
} else {
break;
}
}
// Successful accept, reset the error delay
errorDelay = 0;
// Configure the socket
if (endpoint.isRunning() && !endpoint.isPaused()) {
// setSocketOptions() will hand the socket off to
// an appropriate processor if successful
if (!endpoint.setSocketOptions(socket)) {
endpoint.closeSocket(socket);
}
} else {
endpoint.destroySocket(socket);
}
......
接收到请求后,通过setSocketOptions来处理请求:
protected boolean setSocketOptions(SocketChannel socket) {
NioSocketWrapper socketWrapper = null;
try {
// Allocate channel and wrapper
NioChannel channel = null;
if (nioChannels != null) {
channel = nioChannels.pop();
}
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(bufhandler, selectorPool, this);
} else {
channel = new NioChannel(bufhandler);
}
}
NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this);
channel.reset(socket, newWrapper);
connections.put(socket, newWrapper);
socketWrapper = newWrapper;
// Set socket properties
// Disable blocking, polling will be used
socket.configureBlocking(false);
socketProperties.setProperties(socket.socket());
socketWrapper.setReadTimeout(getConnectionTimeout());
socketWrapper.setWriteTimeout(getConnectionTimeout());
socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
socketWrapper.setSecure(isSSLEnabled());
poller.register(channel, socketWrapper);
return true;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error(sm.getString("endpoint.socketOptionsError"), t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
if (socketWrapper == null) {
destroySocket(socket);
}
}
// Tell to close the socket if needed
return false;
}
代码比较长,最关键的代码是poller.register(channel, socketWrapper),将请求进来的socket包装后注册到poller中等待读写事件,这个地方的socket是非阻塞的,即可以用一个poller线程处理大量的请求连接,提高系统吞吐量。
接下来来到Poller类:
public void register(final NioChannel socket, final NioSocketWrapper socketWrapper) {
socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
PollerEvent r = null;
if (eventCache != null) {
r = eventCache.pop();
}
if (r == null) {
r = new PollerEvent(socket, OP_REGISTER);
} else {
r.reset(socket, OP_REGISTER);
}
addEvent(r);
}
Poller线程run方法:
@Override
public void run() {
// Loop until destroy() is called
while (true) {
boolean hasEvents = false;
try {
if (!close) {
hasEvents = events();
...
keyCount = selector.select(selectorTimeout);
...
wakeupCounter.set(0);
}
...
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
continue;
}
// Either we timed out or we woke up, process events first
if (keyCount == 0) {
hasEvents = (hasEvents | events());
}
Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
processKey(sk, socketWrapper);
}
代码进行了一些删减,只保留了主要流程:
1.Poller注册socket
Poller将连接上的socket设置interestOps(SelectionKey.OP_READ),包装成PollerEvent,interestOps为OP_REGISTER,放入队列中。
2.Poller线程从队列中取事件,执行run方法:
if (interestOps == OP_REGISTER) {
try {
socket.getIOChannel().register(socket.getSocketWrapper().getPoller().getSelector(), SelectionKey.OP_READ, socket.getSocketWrapper());
} catch (Exception x) {
log.error(sm.getString("endpoint.nio.registerFail"), x);
}
}
将socket注册到Poller的Selector中,ops为SelectionKey.OP_READ,等待对端发送数据。
3.然后在socket有数据可读时,通过processKey(sk, socketWrapper)来进行处理。继续跟进processKey,最终将socket封装成socketProcessor提交到Executor处理。
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
SocketProcessorBase<S> sc = null;
if (processorCache != null) {
sc = processorCache.pop();
}
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
......
SocketProcessor实现了Runnable接口,在线程池中执行SocketProcessor.doRun,忽略可能的TLS握手到流程,将通过连接处理器ConnectionHandler来处理请求:
getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
ConnectHandler又通过Http11Processor来处理
if (processor == null) {
processor = getProtocol().createProcessor();
register(processor);
}
...
// Associate the processor with the connection
wrapper.setCurrentProcessor(processor);
SocketState state = SocketState.CLOSED;
do {
state = processor.process(wrapper, status);
Http11Processor中会根据Http协议来解析数据,封装成request、response模型,并通过适配器转给Container,然后Connector的任务就完成了,接下来的操作由Container来进行。
getAdapter().service(request, response);
Adapter的处理
Connect的主要任务是接收请求,并在有数据读写时在线程池中根据使用的协议来解析数据并封装成request、response,然后交给Adapter来处理。
Adapter的实现类是CoyoteAdapter,顾名思义是一个适配器,将Connector连接器读取的数据适配到Container容器来处理。
Adapter主要有两个任务map和转发。
map在postParseRequest方法中进行:
connector.getService().getMapper().map(serverName, decodedURI,
version, request.getMappingData());
根据请求到uri找到目标Host、Context、Wrapper。
Mapper对象存在service中,通过MapperListener来监听Container到生命周期,动态调整Mapper中的Container。
通过Mapper找到目标Container后,就可以转发给Container来处理了:
connector.getService().getContainer().getPipeline().getFirst().invoke(
request, response);
Container的处理流程
Adapter传给Container的操作是将request、response参数交给engine的pipeline处理。每一个级别的Container都维护了一个pipeline,用来处理请求,这是典型的流水线模式,pipeline中维护了一个Valve链表,请求在pipeline中处理的过程就是从第一个Valve处理到最后一个,Valve可以方便的进行配置来实现各层容器的扩展功能,每个pipeline的最后一个Valve是"basic",完成基本的逻辑处理,如Engine的basic将调用传给下一级的Host的pipeline,Host的basic将调用传给下一级的Context的pipeline,Context的basic将调用传给下一级的Wrapper的pipeline,如Engine的basic StandardEngineValve:
public final void invoke(Request request, Response response)
throws IOException, ServletException {
// Select the Host to be used for this Request
Host host = request.getHost();
if (host == null) {
// HTTP 0.9 or HTTP 1.0 request without a host when no default host
// is defined. This is handled by the CoyoteAdapter.
return;
}
if (request.isAsyncSupported()) {
request.setAsyncSupported(host.getPipeline().isAsyncSupported());
}
// Ask this Host to process this request
host.getPipeline().getFirst().invoke(request, response);
}
而Wrapper作为最底层的Container,basic完成最终Servlet的加载、初始化,并组装filterChain进行调用:
standardWrapperValve.invoke部分代码
...
if (!unavailable) {
servlet = wrapper.allocate();
}
...
// Create the filter chain for this request
ApplicationFilterChain filterChain =
ApplicationFilterFactory.createFilterChain(request, wrapper, servlet);
...
filterChain.doFilter(request.getRequest(), response.getResponse());
...
那么什么时候调用了Servlet的service来处理请求呢?在filter链的调用过程中,链中所有的filter处理完成后,后执行Servlet.service来处理请求:
private void internalDoFilter(ServletRequest request,
ServletResponse response)
throws IOException, ServletException {
// Call the next filter if there is one
if (pos < n) {
...
return;
}
// We fell off the end of the chain -- call the servlet instance
//chain处理完成后,执行servlet.service
try {
...
// Use potentially wrapped request from this point
if ((request instanceof HttpServletRequest) &&
(response instanceof HttpServletResponse) &&
Globals.IS_SECURITY_ENABLED ) {
...
} else {
servlet.service(request, response);
}
至此,tomcat已经完成了从接受连接请求到找到目标servlet来处理请求的流程,实现了作为Servlet容器的功能。