在分析了Tomcat的启动过程和各个组件后,本文开始分析Tomcat是如何处理请求的。让我们回到Tomcat启动分析(六)文章的末尾,在AbstractEndPoint类的processSocket方法中,工作线程池执行的任务是一个SocketProcessorBase,本文从SocketProcessorBase开始分析。
SocketProcessorBase类
SocketProcessorBase是一个实现了Runnable接口的抽象类,run方法调用了doRun抽象方法:
public abstract class SocketProcessorBase<S> implements Runnable {
protected SocketWrapperBase<S> socketWrapper;
protected SocketEvent event;
public SocketProcessorBase(SocketWrapperBase<S> socketWrapper, SocketEvent event) {
reset(socketWrapper, event);
}
public void reset(SocketWrapperBase<S> socketWrapper, SocketEvent event) {
Objects.requireNonNull(event);
this.socketWrapper = socketWrapper;
this.event = event;
}
@Override
public final void run() {
synchronized (socketWrapper) {
// It is possible that processing may be triggered for read and
// write at the same time. The sync above makes sure that processing
// does not occur in parallel. The test below ensures that if the
// first event to be processed results in the socket being closed,
// the subsequent events are not processed.
if (socketWrapper.isClosed()) {
return;
}
doRun();
}
}
protected abstract void doRun();
}
NioEndPoint的createSocketProcessor函数返回的SocketProcessor是NioEndPoint类的内部类,继承了SocketProcessorBase抽象类,实现了doRun方法。该类的注释说明SocketProcessor与Worker的作用等价。
/**
* This class is the equivalent of the Worker, but will simply use in an
* external Executor thread pool.
*/
protected class SocketProcessor extends SocketProcessorBase<NioChannel> {
public SocketProcessor(SocketWrapperBase<NioChannel> socketWrapper, SocketEvent event) {
super(socketWrapper, event);
}
@Override
protected void doRun() {
NioChannel socket = socketWrapper.getSocket();
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
int handshake = -1;
try {
if (key != null) {
if (socket.isHandshakeComplete()) {
// No TLS handshaking required. Let the handler
// process this socket / event combination.
handshake = 0;
} else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
event == SocketEvent.ERROR) {
// Unable to complete the TLS handshake. Treat it as
// if the handshake failed.
handshake = -1;
} else {
handshake = socket.handshake(key.isReadable(), key.isWritable());
// The handshake process reads/writes from/to the
// socket. status may therefore be OPEN_WRITE once
// the handshake completes. However, the handshake
// happens when the socket is opened so the status
// must always be OPEN_READ after it completes. It
// is OK to always set this as it is only used if
// the handshake completes.
event = SocketEvent.OPEN_READ;
}
}
} catch (IOException x) {
handshake = -1;
if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);
} catch (CancelledKeyException ckx) {
handshake = -1;
}
if (handshake == 0) {
SocketState state = SocketState.OPEN;
// Process the request from this socket
if (event == null) {
state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
} else {
state = getHandler().process(socketWrapper, event);
}
if (state == SocketState.CLOSED) {
close(socket, key);
}
} else if (handshake == -1 ) {
close(socket, key);
} else if (handshake == SelectionKey.OP_READ){
socketWrapper.registerReadInterest();
} else if (handshake == SelectionKey.OP_WRITE){
socketWrapper.registerWriteInterest();
}
} catch (CancelledKeyException cx) {
socket.getPoller().cancelledKey(key);
} catch (VirtualMachineError vme) {
ExceptionUtils.handleThrowable(vme);
} catch (Throwable t) {
log.error("", t);
socket.getPoller().cancelledKey(key);
} finally {
socketWrapper = null;
event = null;
//return to cache
if (running && !paused) {
processorCache.push(this);
}
}
}
}
- handshake相关的变量与函数都与SSL的握手有关,暂时忽略,NioChannel的handshake函数直接返回了0;
- 真正的处理过程在getHandler().process这处调用。
getHandler函数定义在AbstractEndPoint类中,相关代码如下:
/**
* Handling of accepted sockets.
*/
private Handler<S> handler = null;
public void setHandler(Handler<S> handler ) { this.handler = handler; }
public Handler<S> getHandler() { return handler; }
Handler是何时被赋值的呢?答案在Http11NioProtocol对象被创建的过程中,调用父类构造函数时会为端点的Handler赋值:
public AbstractHttp11Protocol(AbstractEndpoint<S> endpoint) {
super(endpoint);
setConnectionTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
ConnectionHandler<S> cHandler = new ConnectionHandler<>(this);
setHandler(cHandler);
getEndpoint().setHandler(cHandler);
}
ConnectionHandler类
ConnectionHandler是AbstractProtocol类的静态内部类,上文调用了该类的process函数,由于代码较多此处不再展示。由于能力有限,本文只分析最常见情景对应的代码:
- processor = getProtocol().createProcessor();
createProcessor是定义在AbstractProtocol中的抽象方法,AbstractHttp11Protocol实现了它:@Override protected Processor createProcessor() { Http11Processor processor = new Http11Processor(getMaxHttpHeaderSize(), getAllowHostHeaderMismatch(), getRejectIllegalHeaderName(), getEndpoint(), getMaxTrailerSize(), allowedTrailerHeaders, getMaxExtensionSize(), getMaxSwallowSize(), httpUpgradeProtocols, getSendReasonPhrase(), relaxedPathChars, relaxedQueryChars); processor.setAdapter(getAdapter()); processor.setMaxKeepAliveRequests(getMaxKeepAliveRequests()); processor.setConnectionUploadTimeout(getConnectionUploadTimeout()); processor.setDisableUploadTimeout(getDisableUploadTimeout()); processor.setCompressionMinSize(getCompressionMinSize()); processor.setCompression(getCompression()); processor.setNoCompressionUserAgents(getNoCompressionUserAgents()); processor.setCompressibleMimeTypes(getCompressibleMimeTypes()); processor.setRestrictedUserAgents(getRestrictedUserAgents()); processor.setMaxSavePostSize(getMaxSavePostSize()); processor.setServer(getServer()); processor.setServerRemoveAppProvidedValues(getServerRemoveAppProvidedValues()); return processor; }
- state = processor.process(wrapper, status);这一行委托给Processor的process函数继续处理请求。
Processor接口
Processor接口用来处理协议的请求,类层次结构如下图所示。
Http11Processor类的process函数定义在其父类AbstractProcessorLight中,代码如下,它会接着调用service抽象方法处理请求。
@Override
public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
throws IOException {
SocketState state = SocketState.CLOSED;
Iterator<DispatchType> dispatches = null;
do {
if (dispatches != null) {
DispatchType nextDispatch = dispatches.next();
state = dispatch(nextDispatch.getSocketStatus());
} else if (status == SocketEvent.DISCONNECT) {
// Do nothing here, just wait for it to get recycled
} else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
state = dispatch(status);
if (state == SocketState.OPEN) {
// There may be pipe-lined data to read. If the data isn't
// processed now, execution will exit this loop and call
// release() which will recycle the processor (and input
// buffer) deleting any pipe-lined data. To avoid this,
// process it now.
state = service(socketWrapper);
}
} else if (status == SocketEvent.OPEN_WRITE) {
// Extra write event likely after async, ignore
state = SocketState.LONG;
} else if (status == SocketEvent.OPEN_READ){
state = service(socketWrapper);
} else {
// Default to closing the socket if the SocketEvent passed in
// is not consistent with the current state of the Processor
state = SocketState.CLOSED;
}
// 省略一些代码
} while (state == SocketState.ASYNC_END ||
dispatches != null && state != SocketState.CLOSED);
return state;
}
Http11Processor类实现了自己的service方法,由于代码较多此处不再展示,重要的处理流程是getAdapter().service(request, response);这一行。
Http11Processor的adapter是在对象生成后set上去的,其参数是调用AbstractProtocol的getAdapter方法得到的,而AbstractProtocol的adapter是Connector初始化的时候被赋值的,代码如下:
@Override
protected void initInternal() throws LifecycleException {
super.initInternal();
// Initialize adapter
adapter = new CoyoteAdapter(this);
protocolHandler.setAdapter(adapter);
// 省略一些代码
}
CoyoteAdapter类
给ProtocolHandler设置的Adapter是CoyoteAdapter类型,其service方法代码如下:
@Override
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
throws Exception {
Request request = (Request) req.getNote(ADAPTER_NOTES);
Response response = (Response) res.getNote(ADAPTER_NOTES);
if (request == null) {
// Create objects
request = connector.createRequest();
request.setCoyoteRequest(req);
response = connector.createResponse();
response.setCoyoteResponse(res);
// Link objects
request.setResponse(response);
response.setRequest(request);
// Set as notes
req.setNote(ADAPTER_NOTES, request);
res.setNote(ADAPTER_NOTES, response);
// Set query string encoding
req.getParameters().setQueryStringCharset(connector.getURICharset());
}
if (connector.getXpoweredBy()) {
response.addHeader("X-Powered-By", POWERED_BY);
}
boolean async = false;
boolean postParseSuccess = false;
req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());
try {
// Parse and set Catalina and configuration specific
// request parameters
postParseSuccess = postParseRequest(req, request, res, response);
if (postParseSuccess) {
//check valves if we support async
request.setAsyncSupported(
connector.getService().getContainer().getPipeline().isAsyncSupported());
// Calling the container
connector.getService().getContainer().getPipeline().getFirst().invoke(
request, response);
}
if (request.isAsync()) {
async = true;
ReadListener readListener = req.getReadListener();
if (readListener != null && request.isFinished()) {
// Possible the all data may have been read during service()
// method so this needs to be checked here
ClassLoader oldCL = null;
try {
oldCL = request.getContext().bind(false, null);
if (req.sendAllDataReadEvent()) {
req.getReadListener().onAllDataRead();
}
} finally {
request.getContext().unbind(false, oldCL);
}
}
Throwable throwable =
(Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);
// If an async request was started, is not going to end once
// this container thread finishes and an error occurred, trigger
// the async error process
if (!request.isAsyncCompleting() && throwable != null) {
request.getAsyncContextInternal().setErrorState(throwable, true);
}
} else {
request.finishRequest();
response.finishResponse();
}
} catch (IOException e) {
// Ignore
} finally {
// 省略一些代码
}
}
请求预处理
postParseRequest方法对请求做预处理,如对路径去除分号表示的路径参数、进行URI解码、规格化(点号和两点号)
protected boolean postParseRequest(org.apache.coyote.Request req, Request request,
org.apache.coyote.Response res, Response response) throws IOException, ServletException {
// 省略部分代码
MessageBytes decodedURI = req.decodedURI();
if (undecodedURI.getType() == MessageBytes.T_BYTES) {
// Copy the raw URI to the decodedURI
decodedURI.duplicate(undecodedURI);
// Parse the path parameters. This will:
// - strip out the path parameters
// - convert the decodedURI to bytes
parsePathParameters(req, request);
// URI decoding
// %xx decoding of the URL
try {
req.getURLDecoder().convert(decodedURI, false);
} catch (IOException ioe) {
res.setStatus(400);
res.setMessage("Invalid URI: " + ioe.getMessage());
connector.getService().getContainer().logAccess(
request, response, 0, true);
return false;
}
// Normalization
if (!normalize(req.decodedURI())) {
res.setStatus(400);
res.setMessage("Invalid URI");
connector.getService().getContainer().logAccess(
request, response, 0, true);
return false;
}
// Character decoding
convertURI(decodedURI, request);
// Check that the URI is still normalized
if (!checkNormalize(req.decodedURI())) {
res.setStatus(400);
res.setMessage("Invalid URI character encoding");
connector.getService().getContainer().logAccess(
request, response, 0, true);
return false;
}
} else {
/* The URI is chars or String, and has been sent using an in-memory
* protocol handler. The following assumptions are made:
* - req.requestURI() has been set to the 'original' non-decoded,
* non-normalized URI
* - req.decodedURI() has been set to the decoded, normalized form
* of req.requestURI()
*/
decodedURI.toChars();
// Remove all path parameters; any needed path parameter should be set
// using the request object rather than passing it in the URL
CharChunk uriCC = decodedURI.getCharChunk();
int semicolon = uriCC.indexOf(';');
if (semicolon > 0) {
decodedURI.setChars
(uriCC.getBuffer(), uriCC.getStart(), semicolon);
}
}
// Request mapping.
MessageBytes serverName;
if (connector.getUseIPVHosts()) {
serverName = req.localName();
if (serverName.isNull()) {
// well, they did ask for it
res.action(ActionCode.REQ_LOCAL_NAME_ATTRIBUTE, null);
}
} else {
serverName = req.serverName();
}
// Version for the second mapping loop and
// Context that we expect to get for that version
String version = null;
Context versionContext = null;
boolean mapRequired = true;
while (mapRequired) {
// This will map the the latest version by default
connector.getService().getMapper().map(serverName, decodedURI,
version, request.getMappingData());
// 省略部分代码
}
// 省略部分代码
}
以MessageBytes的类型是T_BYTES为例:
- parsePathParameters方法去除URI中分号表示的路径参数;
- req.getURLDecoder()得到一个UDecoder实例,它的convert方法对URI解码,这里的解码只是移除百分号,计算百分号后两位的十六进制数字值以替代原来的三位百分号编码;
- normalize方法规格化URI,解释路径中的“.”和“..”;
- convertURI方法利用Connector的uriEncoding属性将URI的字节转换为字符表示;
- 注意connector.getService().getMapper().map(serverName, decodedURI, version, request.getMappingData()) 这行,之前Service启动时MapperListener注册了该Service内的各Host和Context。根据URI选择Context时,Mapper的map方法采用的是convertURI方法解码后的URI与每个Context的路径去比较,可参考Mapper类和Context配置文档。
容器处理
如果请求可以被传给容器的Pipeline即当postParseRequest方法返回true时,则由容器继续处理,在service方法中有connector.getService().getContainer().getPipeline().getFirst().invoke(request, response)这一行:
- Connector调用getService返回StandardService;
- StandardService调用getContainer返回StandardEngine;
- StandardEngine调用getPipeline返回与其关联的StandardPipeline;
- 在Tomcat启动分析(七) - 容器组件与Engine组件中曾提到StandardEngine的构造函数为自己的Pipeline添加了基本阀StandardEngineValve,StandardPipeline调用getFirst得到第一个阀去处理请求,由于基本阀是最后一个,所以最后会由基本阀去处理请求,后续处理流程请看下一篇文章。