Tomcat源码浅析

以嵌入SpringBoot的Tomcat源码(版本:tomcat-embed-9.0.16)为基础浅析Tomcat的组成以及实现原理。

1. 核心组件

1.1 Lifecycle-生命周期

Tomcat中几乎所有的重要组件都直接或间接实现了此接口,也就是说Tomcat把几乎所有的组件都设计成了具有了生命周期的特征(初始化、启、停、销毁)。

Lifecycle的核心接口如下:

  • void addLifecycleListener(LifecycleListener listener)
  • LifecycleListener[] findLifecycleListeners()
  • LifecycleState getState()
  • void init() throws LifecycleException
  • void start() throws LifecycleException
  • void stop() throws LifecycleException
  • void destroy() throws LifecycleException

接口抽象了初始化、启、停、销毁四个核心操作。直接实现此接口的是抽象类LifecycleBase,此抽象类维护了一个类型为LifecycleState的生命周期状态字段state并且实现了初始化、启、停、销毁四个接口,但同时又定义了四个与之对应的抽象方法:

  • abstract void initInternal() throws LifecycleException
  • abstract void startInternal() throws LifecycleException
  • abstract void stopInternal() throws LifecycleException
  • abstract void destroyInternal() throws LifecycleException

LifecycleBase基本上是一个模版方法模式的实现,这一层除了调用对应的抽象方法外其主要承担了对生命周期状态的维护,比如在调用initInternal()方法之前把状态设置为LifecycleState.INITIALIZING,其他方法与之类似,但是start()方法逻辑有点儿不一样,它会根据生命周期的状态执行对应的方法,但最终还是会执行startInternal()方法,比如,如果当前生命周期状态为LifecycleState.NEW则会执行init()方法,如下为start()方法相关源码:

@Override
public final synchronized void start() throws LifecycleException {

    // 略
    
    if (state.equals(LifecycleState.NEW)) {
        init();
    } else if (state.equals(LifecycleState.FAILED)) {
        stop();
    } else if (!state.equals(LifecycleState.INITIALIZED) &&
            !state.equals(LifecycleState.STOPPED)) {
        invalidTransition(Lifecycle.BEFORE_START_EVENT);
    }

    try {
        setStateInternal(LifecycleState.STARTING_PREP, null, false);
        startInternal();
        if (state.equals(LifecycleState.FAILED)) {
            stop();
        } else if (!state.equals(LifecycleState.STARTING)) {
            invalidTransition(Lifecycle.AFTER_START_EVENT);
        } else {
            setStateInternal(LifecycleState.STARTED, null, false);
        }
    } catch (Throwable t) {
        handleSubClassException(t, "lifecycleBase.startFail", toString());
    }
}

LifecycleBase的一个直接子类为LifecycleMBeanBase,此类一个重要功能是把生命周期相关的组件纳入JMX的管理中,在其方法initInternal()中实现,如下:

@Override
protected void initInternal() throws LifecycleException {

    // If oname is not null then registration has already happened via
    // preRegister().
    if (oname == null) {
        mserver = Registry.getRegistry(null, null).getMBeanServer();
        // 把this注入JMB
        oname = register(this, getObjectNameKeyProperties());
    }
}

所以Tomcat中很多组件都可以通过JMX管理。LifecycleMBeanBase的重要子类有StandardServer、StandardService、Connector、StandardPipeline、ContainerBase(和另外一个核心组件有关,下面会详细介绍),下面的章节会详细介绍这些组件。如下为Lifecycle相关类图:

image

1.2 Container-容器

除了Lifecycle外另一个Tomcat的核心接口是Container,即容器。据此接口注视描述:容器是一个可以执行从客户端接收的请求并且基于这些请求给出响应的对象。其实之所以说是因为这些容器都维护了一个对象:Pipeline,容器对请求的处理最终都委托给了Pipeline,那么作为容器本身它更多的是使子类拥有维护和管理一个或多个子容器的能力。当然Container作为Tomcat中一个非常核心的组件也继承了Lifecycle。

Container核心方法有:

  • Log getLogger()
  • Pipeline getPipeline()
  • String getName()
  • void addChild(Container child)
  • Container findChild(String name)
  • void removeChild(Container child)
  • Container[] findChildren()
  • void logAccess(Request req, Response resp, long time, boolean useDefault)

Container的直接子类为ContainerBase,它提供了几乎每一个子类都需要的通用功能,如下为它的一些核心方法和属性:

核心属性:

  • HashMap<String, Container> children
    容器的自容器集合。
  • Container parent
    容器的父容器。
  • Pipeline pipeline = new StandardPipeline(this)
    容器的管道。
  • List<ContainerListener> listeners
    容器监听器集合。

核心方法:

  • void addChild(Container child)
    添加一个子容器。
  • void addContainerListener(ContainerListener listener)
    添加一个容器监听器。
  • Container[] findChildren()
    返回子容器集合。
  • synchronized void addValve(Valve valve)
    向管道中添加一个阀。
  • void fireContainerEvent(String type, Object data)
    发布容器事件。
  • 生命周期方法:startInternal()、stopInternal()、destroyInternal()
    这个三个方法的逻辑是相似的,例如startInternal()方法,首先如果Cluster和Realm实现了Lifecycle接口则启动他们,接下来多线程启动子容器,最后启动Pipeline并修改容器状态为LifecycleState.STARTING,这里所说的启动即调用start()方法。stopInternal()和destroyInternal()方法也是按照此逻辑执行的。

容器相关类图如下:

image

容器的重要子类有StandardEngine、StandardHost、StandardContext、StandardWrapper。

1.3 Pipeline-管道

Pipeline即管道的意思,其标准实现类为StandardPipeline。Pipeline管理一个或多个Valve,Valve即阀的意思,就像管道上的阀门或者开关一样,每一个流经管道的请求都会被它处理一下,它有一个invoke()方法,invoke()方法封装了请求的处理逻辑,所以一个Valve即是对请求的一个处理单元。Valve还有一个类型为Valve的next属性。这些Valve首尾相连组成一个调用链。所以Pipeline实际上是由多个Valve首尾相组成的,那么一个容器的执行其实是组成Pipeline的一系列Valve按顺序对请求的处理的过程,同时这里也是一个责任链模式。相关类图:

image

1.4 StandarWrapper

StandarWrapper是Wrapper的标准实现。一个Wrapper即表示一个Servlet(此处的Servlet即是我们开发和定义的Servlet),可以认为Wrapper是Servlet在Tomcat内部的表现形式,同时Wrapper也是一个Container。对应Web.xml中一个servlet元素。

1.4.1 核心属性

  • String name
    Servlet名称。
  • String servletClass
    字符串表示的Servlet类。
  • Servlet instance
    Servlet实例。
  • ArrayList<String>: mappings
    此servlet处理的url映射。
  • HashMap<String, String>: parameters
    servlet的配置参数。

1.4.2 核心方法

  • Servlet loadServlet()
    实例化并返回Servlet对象。
  • void initServlet(Servlet servlet)
    调用指定servlet的init()方法。
  • Servlet getServlet()
    返回Servlet实例。
  • String findInitParameter(String name)
    获取servlet配置参数。
  • Servlet allocate()
    分配一个当前Servlet的实例。

1.5 StandardContext

StandardContext是Context接口的标准实现。一个Context表示一个部署在Tomcat中的Web应用,它是一个Servlet上下文容器,管理和维护多个Wrapper作为其子容器(children),也就是说一个Context包含多个Servlet。appBase(默认webapps)指定目录下的一个子目录就对应一个Context,表示一个Web应用。一个Tomcat可以部署多个Web应用也就存在多个Context。

1.5.1 核心属性

  • String name
    Context名称。
  • String[] applicationListeners
    监听器集合。
  • ApplicationParameter[] applicationParameters
    此应用的配置参数集合。
  • ApplicationContext context
    此应用的Servlet上下文。
  • String path
    此应用的目录路径。
  • String docBase
    应用的文档根路径。
  • Map<String, String> mimeMappings
    此应用的MIME映射。
  • Map<String, String> parameters
    此应用的配置参数
  • int sessionTimeout = 30
    session超时时间,默认30分钟。
  • String[] welcomeFiles
    此应用的欢迎页面。
  • Map<String, String> servletMappings
    此应用中Servlet urlPattern和名称的映射集合。
  • HashMap<String, Container> children
    此应用中的Servlet集合。
  • String requestEncoding

1.5.2 核心方法

  • Wrapper createWrapper()
    创建Wrapper实例。
  • void reload()
    重启此应用。
  • void startInternal()
    启动此应用.
  • addChild(Container child)
    向此应用添加一个Servlet。
  • boolean loadOnStartup(Container children[])
    以Servlet的loadOnStartup属性按顺序实例化Servlet。
  • Container[] findChildren()
    获取所有Servlet。

1.6 StandardHost

StandardHost是Host接口的标准实现,一个Host表示一个虚拟主机容器。一个Tomcat可以配置多个不同域名的虚拟主机,每一个虚拟主机对应一个appBase。一个Host管理和维护多个StandardContext。此组件对应Tomcat配置文件server.xml中Host标签。

1.6.1 核心属性

  • String name
    Host名称。
  • String[] aliases
    Host别名,一个Host可以配置多个别名
  • String appBase = "webapps"
    此Host对应的应用的基目录,即应用程序存放的目录。默认Tomcat home目录下的webapps目录。
  • boolean autoDeploy = true
    应用是否自动部署。
  • String contextClass =
    "org.apache.catalina.core.StandardContext"

    默认Servlet上下文容器类。
  • String workDir
    Host工作目录。
  • HashMap<String, Container> children
    此应用中的StandardContext集合。

1.6.1 核心方法

  • void addChild(Container child)
    添加一个StandardContext子容器。
  • void addAlias(String alias)
    添加一个别名。

1.7 Connector

Connector即连接器,是一个和通信协议管理相关的组件,负责处理和客户端的通信,Tomcat可以有多个Connector,其中最常用的是Http Connector,此外还包括AJP Connector。

Connector继承了LifecycleMBeanBase,通过源码可以看到Connector主要管理ProtocolHandler,其持有一个ProtocolHandler类型的属性,Connector的生命周期方法主要还是调用ProtocolHandler对应的相关方法。 ProtocolHandler是协议处理的抽象,其重要实现有Http11NioProtocol:用于处理Http1.1协议;Connector还包含了一些和Http协议相关的核心参数,比如:最大cookie数量、最大参数数量、post最大报文长度等。

1.7.1 核心属性

  • long asyncTimeout = 30000
    异步请求超时时间。
  • int redirectPort = 443
    重定向端口。
  • int maxCookieCount = 200
    最大cookie数量。
  • int maxParameterCount = 10000
    Tomcat可以自动解析的最大参数数量,包括GET和POST。超过这个数量的参数会被忽略。
  • int maxPostSize = 2 * 1024 * 1024
    最大post大小,2MB。
  • ProtocolHandler protocolHandler
    协议处理器。

1.7.2 核心方法

  • 生命周期方法:initInternal()、startInternal()、stopInternal()、destroyInternal()、pause()、resume()
  • Request createRequest()
    创建一个Servlet请求。
  • Response createResponse()
    创建一个Servlet响应。

1.8 StandardEngine

StandardEngine是Engine接口的标准实现,Engine表示一个Servlet引擎,代表处理请求的切入点,Engine接收和处理来自Connector的请求并处理和返回响应给Connector。一个Engine管理和维护多个虚拟Host。此外Engine还和Tomcat的集群技术有关。

1.8.1 核心属性

  • String name
    Engine名称。
  • String defaultHost
    默认Host名称

1.8.2 核心方法

  • void addChild(Container child)
    添加一个子容器虚拟Host。
  • void logAccess(Request request, Response response, long time, boolean useDefault)
    打印访问日志。
  • Container[] findChildren()
    获取所有Host。

1.9 StandardService

Service用于管理Connector和Engine,它会把一个Engine和多个Connector连接起来。其标准实现为StandardService,此类维护了一个Connector数组和一个Engine属性。

1.9.1 核心属性

  • Connector connectors[]
    此服务管理的连接器。
  • Engine engine
    此服务管理的Engine。
  • Mapper mapper = new Mapper()
    请求匹配器,此类根据请求的uri解析出处理此请求的Host、Context、Wrapper。

1.9.2 核心方法

  • Engine getContainer()
  • void setContainer(Engine engine)
    设置并启动Engine。
  • void addConnector(Connector connector)
    收集并启动Connector。
  • Connector[] findConnectors()
  • void removeConnector(Connector connector)
  • Mapper getMapper()
  • 生命周期方法:initInternal()、startInternal()、stopInternal()、destroyInternal()
    这三个方法还是对Engine和Connector管理。

1.10 StandardServer

Server表示整个Tomcat容器,管理和维护多个Service,同时也管理Tomcat容器本身。其标准实现为StandardServer,此类会维护一个Service数组。此外StandardServer还会启动一个用于接收关闭Tomcat命令的端口。

1.10.1 核心属性

  • int port = 8005
    用于接收关闭Tomcat命令的端口号。
  • String address = "localhost"
    用于接收关闭Tomcat命令的地址。
  • Service services[]
    此Server维护的Service集合。
  • String shutdown = "SHUTDOWN"
    关闭Tomcat的命令。

1.10.2 核心属性

  • void addService(Service service)
    添加并启动一个Service。
  • Service findService(String name)
  • Service[] findServices()
  • void removeService(Service service)

这些组件的呈环环相扣之状,看起来就像下面的样子:

image

类关系图:


image

2. Tomcat启动

Tomcat的启动大致可以分为三部分:Tomcat核心组件的创建、Tomcat核心组件的初始化和启动、Tomcat协议绑定以及线程资源初始化。

2.1 Tomcat核心组件的创建

ServletWebServerApplicationContext的onRefresh()方法在Spring上下文启动的过程中被调用,此方法调用createWebServer()方法创建WebServer对象,createWebServer()方法则调用ServletWebServerFactory#getWebServer()方法创建WebServer对象,源码如下:

private void createWebServer() {
    WebServer webServer = this.webServer;
    ServletContext servletContext = getServletContext();
    if (webServer == null && servletContext == null) {
        ServletWebServerFactory factory = getWebServerFactory();
        this.webServer = factory.getWebServer(getSelfInitializer());
    }
    else if (servletContext != null) {
        try {
            getSelfInitializer().onStartup(servletContext);
        }
        catch (ServletException ex) {
            throw new ApplicationContextException("Cannot initialize servlet context",
                    ex);
        }
    }
    initPropertySources();
}

当使用Spring Boot引入Tomcat相关依赖时ServletWebServerFactory的实现类为TomcatServletWebServerFactory,所以创建WebServer对象的逻辑都在TomcatServletWebServerFactory#getWebServer()方法中,如下为此方法的源码:

public WebServer getWebServer(ServletContextInitializer... initializers) {
    Tomcat tomcat = new Tomcat();
    File baseDir = (this.baseDirectory != null) ? this.baseDirectory
            : createTempDir("tomcat");
    tomcat.setBaseDir(baseDir.getAbsolutePath());
    Connector connector = new Connector(this.protocol);
    tomcat.getService().addConnector(connector);
    customizeConnector(connector);
    tomcat.setConnector(connector);
    tomcat.getHost().setAutoDeploy(false);
    configureEngine(tomcat.getEngine());
    for (Connector additionalConnector : this.additionalTomcatConnectors) {
        tomcat.getService().addConnector(additionalConnector);
    }
    prepareContext(tomcat.getHost(), initializers);
    return getTomcatWebServer(tomcat);
}

此方法:

  1. 创建Tomcat对象。
  2. 创建Connector对象,构造函数实例化ProtocolHandler,其实现类默认为Http11NioProtocol,Http11NioProtocol的构造函数创建了NioEndpoint对象。
  3. 调用Tomcat#getService()方法,此方法会创建StandardServerStandardService
  4. 调用Tomcat#getHost()方法,此方法会创建StandardHostStandardEngine
  5. 调用prepareContext()方法配置Host,创建TomcatEmbeddedContext、添加默认Servlet。

2.2 Tomcat核心组件的初始化和启动

可以看到TomcatServletWebServerFactory#getWebServer()方法几乎创建了所有的核心组件,但是此方法的最后一行“形迹可疑”,可以看到最后一行调用了getTomcatWebServer()方法,此方法的源码:

protected TomcatWebServer getTomcatWebServer(Tomcat tomcat) {
    return new TomcatWebServer(tomcat, getPort() >= 0);
}

貌似很简单,只是new了一个TomcatWebServer对象,但是当深入TomcatWebServer的构造函数就会发现此方法简直别有洞天,它不仅仅是简单地创建了一个TomcatWebServer对象,它还通过调用initialize()方法调用了Tomcat#start()方法,源码如下:

public TomcatWebServer(Tomcat tomcat, boolean autoStart) {
    Assert.notNull(tomcat, "Tomcat Server must not be null");
    this.tomcat = tomcat;
    this.autoStart = autoStart;
    initialize();
}

private void initialize() throws WebServerException {
    synchronized (this.monitor) {
        try {
            addInstanceIdToEngineName();

            Context context = findContext();
            context.addLifecycleListener((event) -> {
                if (context.equals(event.getSource())
                        && Lifecycle.START_EVENT.equals(event.getType())) {
                    
                    removeServiceConnectors();
                }
            });

            // 这里是重点
            this.tomcat.start();

            rethrowDeferredStartupExceptions();

            try {
                ContextBindings.bindClassLoader(context, context.getNamingToken(),
                        getClass().getClassLoader());
            }
            catch (NamingException ex) {
                // Naming is not enabled. Continue
            }

            startDaemonAwaitThread();
        }
        catch (Exception ex) {
            stopSilently();
            throw new WebServerException("Unable to start embedded Tomcat", ex);
        }
    }
}

上面Tomcat#start()方法的调用点燃了Tomcat核心组件初始化和启动的导火索,此方法会调用StandardServer#start()方法,因为Tomcat的核心组件呈环环相扣之状且StandardServer为最外层的组件,所以这一调用了引发自外而内的初始化连锁反应,拉开了Tomcat核心组件初始化和启动的序幕,如下为这些组件初始化和启动的调用时序图:

image

从上面时序图可以看出Tomcat核心组件初始化和启动是一系列init()、initInternal()、start()、startInternal()方法按顺序调用的过程。

2.3 Tomcat协议绑定以及线程资源初始化

接着,ServletWebServerApplicationContext的finishRefresh()方法被调用,finishRefresh()方法最终调用了TomcatWebServer#start()方法,start()方法调用了addPreviouslyRemovedConnectors()方法,相关源码如下:

@Override
public void start() throws WebServerException {
    synchronized (this.monitor) {
        if (this.started) {
            return;
        }
        try {
            addPreviouslyRemovedConnectors();
            Connector connector = this.tomcat.getConnector();
            if (connector != null && this.autoStart) {
                performDeferredLoadOnStartup();
            }
            checkThatConnectorsHaveStarted();
            this.started = true;
            logger.info("Tomcat started on port(s): " + getPortsDescription(true)
                    + " with context path '" + getContextPath() + "'");
        }
        catch (ConnectorStartFailedException ex) {
            stopSilently();
            throw ex;
        }
        catch (Exception ex) {
            throw new WebServerException("Unable to start embedded Tomcat server",
                    ex);
        }
        finally {
            Context context = findContext();
            ContextBindings.unbindClassLoader(context, context.getNamingToken(),
                    getClass().getClassLoader());
        }
    }
}

private void addPreviouslyRemovedConnectors() {
    Service[] services = this.tomcat.getServer().findServices();
    for (Service service : services) {
        Connector[] connectors = this.serviceConnectors.get(service);
        if (connectors != null) {
            for (Connector connector : connectors) {
                service.addConnector(connector);
                if (!this.autoStart) {
                    stopProtocolHandler(connector);
                }
            }
            this.serviceConnectors.remove(service);
        }
    }
}

addPreviouslyRemovedConnectors()方法的意思是添加前面删除的Connector,initialize()方法执行的时候向Context容器中添加了一个匿名LifecycleListener,在监听到Context的Lifecycle.START_EVENT事件时会把Service容器中的Connector删除并缓存至的TomcatWebServer的Map属性集合serviceConnectors中,等到TomcatWebServer#start()方法执行时再添加回来,据源码注释,此举是为了在Service启动之前先不要绑定协议,这。。。,如果此处有顺序方面强烈需求就不能换个别的方法么?感觉这种做法好牵强,这个不重要,这里关注的重点是是如何绑定协议的。答案就是Service的addConnector()方法,在上面介绍StandardService的时候提到了此方法会收集并启动Connector,也就是调用Connector#start()方法,从上面的时序图可以看到,此方法“兜兜绕绕”最终调用了NioEndpoint#start()方法。此方法在其父类AbstractEndpoint中定义,源码如下:

public final void start() throws Exception {
    if (bindState == BindState.UNBOUND) {
        bindWithCleanup();
        bindState = BindState.BOUND_ON_START;
    }
    startInternal();
}

private void bindWithCleanup() throws Exception {
    try {
        bind();
    } catch (Throwable t) {
        // Ensure open sockets etc. are cleaned up if something goes
        // wrong during bind
        ExceptionUtils.handleThrowable(t);
        unbind();
        throw t;
    }
}

从上面源码可以看到,在绑定状态为未绑定的时候进行绑定,具体绑定端口和地址的代码在bind()方法中,此方法实现在类NioEndpoint类中,如下:

@Override
public void bind() throws Exception {
    initServerSocket();

    // Initialize thread count defaults for acceptor, poller
    if (acceptorThreadCount == 0) {
        // FIXME: Doesn't seem to work that well with multiple accept threads
        acceptorThreadCount = 1;
    }
    if (pollerThreadCount <= 0) {
        //minimum one poller thread
        pollerThreadCount = 1;
    }
    setStopLatch(new CountDownLatch(pollerThreadCount));

    // Initialize SSL if needed
    initialiseSsl();

    selectorPool.open();
}

protected void initServerSocket() throws Exception {
    if (!getUseInheritedChannel()) {
        serverSock = ServerSocketChannel.open();
        socketProperties.setProperties(serverSock.socket());
        InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
        serverSock.socket().bind(addr,getAcceptCount());
    } else {
        // Retrieve the channel provided by the OS
        Channel ic = System.inheritedChannel();
        if (ic instanceof ServerSocketChannel) {
            serverSock = (ServerSocketChannel) ic;
        }
        if (serverSock == null) {
            throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
        }
    }
    serverSock.configureBlocking(true); //mimic APR behavior
}

可以看到initServerSocket()方法:

  1. 创建了ServerSocketChannel对象。
  2. 绑定了地址和端口。
  3. 配置了ServerSocketChannel的阻塞模式为true。

这里有两个非常有意思的地方:

  1. ServerSocket绑定的时候设置了客户连接请求队列的最大长度,默认100。
  2. ServerSocketChannel的阻塞模式配置为了true,也就是阻塞模式,这好像颠覆了我们以往对NIO对认知,其实不管NIO还是BIO,accept()方法阻塞都不是问题,问题是BIO中的读写是阻塞的。Tomcat在接收到SocketChannel后会立刻将其配置为非阻塞。

NioEndpoint#start()方法不但通过调用bindWithCleanup()方法绑定了协议还调用了startInternal()方法,此方法初始化了用于处理请求的线程资源,源码如下:

@Override
public void startInternal() throws Exception {

    if (!running) {
        running = true;
        paused = false;

        processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                socketProperties.getProcessorCache());
        eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                        socketProperties.getEventCache());
        nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                socketProperties.getBufferPool());

        
        if ( getExecutor() == null ) {
            // 创建线程池
            createExecutor();
        }

        initializeConnectionLatch();

        // 启动Poller线程
        pollers = new Poller[getPollerThreadCount()];
        for (int i=0; i<pollers.length; i++) {
            pollers[i] = new Poller();
            Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
            pollerThread.setPriority(threadPriority);
            pollerThread.setDaemon(true);
            pollerThread.start();
        }

        // 启动Acceptor线程
        startAcceptorThreads();
    }
}

protected void startAcceptorThreads() {
    int count = getAcceptorThreadCount();
    acceptors = new ArrayList<>(count);

    for (int i = 0; i < count; i++) {
        Acceptor<U> acceptor = new Acceptor<>(this);
        String threadName = getName() + "-Acceptor-" + i;
        acceptor.setThreadName(threadName);
        acceptors.add(acceptor);
        Thread t = new Thread(acceptor, threadName);
        t.setPriority(getAcceptorThreadPriority());
        t.setDaemon(getDaemon());
        t.start();
    }
}

这个方法做了如下四件事:

  1. 初始化processorCache、eventCache、nioChannels三个缓存栈
    这个三个缓存分别用于缓存SocketProcessorBase对象、PollerEvent对象和NioChannel对象,栈初始化大小128,最大500,Tomcat为了不频繁地创建这个三个对象每次创建使用过后就将其属性置空并缓存起来,下次使用的时候直接从缓存中获取。

  2. 创建工作线程池
    此线程池即为Tomcat处理请求的工作线程池。createExecutor()方法负责创建此线程池:

    public void createExecutor() {
        internalExecutor = true;
        TaskQueue taskqueue = new TaskQueue();
        TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
        taskqueue.setParent( (ThreadPoolExecutor) executor);
    }
    

    通过源码可以看到,默认配置下线程池核心线程数为10最大线程数200,非核心线程最大空闲时间为60s;工作队列是一个无界的链表阻塞队列,而且在使用到工作队列前会把线程数量开到最大,也就是说在线程数量达到最大线程数量之前是使用不到工作对列的。拒绝策略直接抛出RejectedExecutionException异常。线程命名规则:http-nio-端口号-exec-编号,例如:http-nio-8080-exec-0。

  3. 启动Poller线程
    Tomcat会启动最多2个Poller线程,它会取2和服务器可用处理器数量最小值作为启动Poller的数量,Poller线程启动后会不停的判断是否有事件到达,一旦有事件到达则取出事件交由相应组件处理。Poller线程命名规则:http-nio-端口号-ClientPoller-编号,例如:http-nio-8080-ClientPoller-0

  4. 启动Acceptor线程
    Tomcat默认启动1个Acceptor线程,此线程等待客户端的连接请求,由上面可知因为ServerSocketChannel被配置了阻塞模式,所以此线程会阻塞在ServerSocketChannel#accept()方法,当客户端连接到达此线程获取SocketChannel并对其进行配置。Acceptor线程命名规则:http-nio-端口号-Acceptor-编号,例如:http-nio-8080-Acceptor-0。

3. Tomcat请求处理过程

3.1 Tomcat NIO处理

Acceptor线程会一直等待客户端的连接,在此期间会一直阻塞直到客户端的请求到来,当客户端请求到来,Acceptor线程会获取SocketChannel对象并把它交给NioEndpoint#setSocketOptions( SocketChannel socket)方法处理,如下为相关源码:

@Override
public void run() {

    int errorDelay = 0;

    // Loop until we receive a shutdown command
    while (endpoint.isRunning()) {

        // 略
        state = AcceptorState.RUNNING;

        try {
            // 略
            U socket = null;
            try {
                // 这里会调用ServerSocketChannel#accept()方法并阻塞
                socket = endpoint.serverSocketAccept();
            } catch (Exception ioe) {
                // 略
            }
            errorDelay = 0;

            // Configure the socket
            if (endpoint.isRunning() && !endpoint.isPaused()) {
                // 连接到达交由NioEndpoint的setSocketOptions()方法处理
                if (!endpoint.setSocketOptions(socket)) {
                    endpoint.closeSocket(socket);
                }
            } else {
                endpoint.destroySocket(socket);
            }
        } catch (Throwable t) {
            // 略
        }
    }
    state = AcceptorState.ENDED;
}

NioEndpoint的setSocketOptions()方法并不像这个方法的名称那样只是设置了Socket参数,它还创建了NioChannel对象并把它注册到了Poller线程,当然设置参数也很重要,尤其是这里把SocketChannel的阻塞模式配置成了非阻塞,下面是此方法的源码:

@Override
protected boolean setSocketOptions(SocketChannel socket) {
    try {
    
        // 配置阻塞模式为非阻塞
        socket.configureBlocking(false);
        Socket sock = socket.socket();
        
        // 设置Socket参数
        socketProperties.setProperties(sock);

        // 从缓存中获取或创建NioChannel对象
        NioChannel channel = nioChannels.pop();
        if (channel == null) {
            // 创建Socket缓冲,读写缓冲默认大小8KB,默认不使用直接内存
            SocketBufferHandler bufhandler = new SocketBufferHandler(
                    socketProperties.getAppReadBufSize(),
                    socketProperties.getAppWriteBufSize(),
                    socketProperties.getDirectBuffer());
            if (isSSLEnabled()) {
                channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
            } else {
                channel = new NioChannel(socket, bufhandler);
            }
        } else {
            channel.setIOChannel(socket);
            channel.reset();
        }
        // 把NioChannel对象注册到Poller
        getPoller0().register(channel);
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        try {
            log.error(sm.getString("endpoint.socketOptionsError"), t);
        } catch (Throwable tt) {
            ExceptionUtils.handleThrowable(tt);
        }
        // Tell to close the socket
        return false;
    }
    return true;
}

public Poller getPoller0() {
    int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
    return pollers[idx];
}

getPoller0()方法会以轮训的方式从Poller[]中获取一个Poller线程。Poller线程的register()方法会创建NioSocketWrapper对象和PollerEvent对象并分别设置感兴趣的事件为SelectionKey.OP_READ和OP_REGISTER,而后通过调用addEvent()方法放入Poller维护的一个同步队列中,源码如下:

public void register(final NioChannel socket) {
    socket.setPoller(this);
    NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
    socket.setSocketWrapper(ka);
    ka.setPoller(this);
    ka.setReadTimeout(getConnectionTimeout());
    ka.setWriteTimeout(getConnectionTimeout());
    ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
    ka.setSecure(isSSLEnabled());
    PollerEvent r = eventCache.pop();
    ka.interestOps(SelectionKey.OP_READ);
    if ( r==null ) 
        r = new PollerEvent(socket,ka,OP_REGISTER);
    else 
        r.reset(socket,ka,OP_REGISTER);
    // 放入队列
    addEvent(r);
}

private void addEvent(PollerEvent event) {
    // PollerEvent放入队列
    events.offer(event);
    if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
}

Poller线程的run()方法会不停地通过调用events()方法判断同步队列中是否有PollerEvent,当同步队列中有PollerEvent时,events()方法会调用PollerEvent#run()方法, 此方法会把Poller中的Selector注册到SocketChannel上监听其读事件,同时Poller线程的run()方法会检测Selector上是否有事件到达,如果有则从Selector获取SelectionKey并做相应的处理,相关源码如下:

@Override
public void run() {
    // 死循环,不停的判断是否有PollerEvent到达
    while (true) {

        boolean hasEvents = false;

        try {
            if (!close) {
                // 检测是否有PollerEvent
                hasEvents = events();
                
                // 调用获取事件数量
                if (wakeupCounter.getAndSet(-1) > 0) {
                    keyCount = selector.selectNow();
                } else {
                    keyCount = selector.select(selectorTimeout);
                }
                wakeupCounter.set(0);
            }
            if (close) {
                events();
                timeout(0, false);
                try {
                    selector.close();
                } catch (IOException ioe) {
                    log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe);
                }
                break;
            }
        } catch (Throwable x) {
            ExceptionUtils.handleThrowable(x);
            log.error(sm.getString("endpoint.nio.selectorLoopError"), x);
            continue;
        }
        //either we timed out or we woke up, process events first
        if ( keyCount == 0 ) hasEvents = (hasEvents | events());

        Iterator<SelectionKey> iterator =
            keyCount > 0 ? selector.selectedKeys().iterator() : null;
        // 遍历SelectionKey
        while (iterator != null && iterator.hasNext()) {
            SelectionKey sk = iterator.next();
            NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
            
            if (attachment == null) {
                iterator.remove();
            } else {
                iterator.remove();
                // 处理事件
                processKey(sk, attachment);
            }
        }

        //process timeouts
        timeout(keyCount,hasEvents);
    }//while

    getStopLatch().countDown();
}

public boolean events() {
    boolean result = false;

    PollerEvent pe = null;
    for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
        result = true;
        try {
            // 调用PollerEvent#run()方法注册Selector
            pe.run();
            pe.reset();
            if (running && !paused) {
                eventCache.push(pe);
            }
        } catch ( Throwable x ) {
            log.error(sm.getString("endpoint.nio.pollerEventError"), x);
        }
    }

    return result;
}

PollerEvent#run():
@Override
public void run() {
    if (interestOps == OP_REGISTER) {
        try {
            // 注册Selector,监听可读事件
            socket.getIOChannel().register(
                    socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
        } catch (Exception x) {
            log.error(sm.getString("endpoint.nio.registerFail"), x);
        }
    } else {
        final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
        try {
            if (key == null) {
                socket.socketWrapper.getEndpoint().countDownConnection();
                ((NioSocketWrapper) socket.socketWrapper).closed = true;
            } else {
                final NioSocketWrapper socketWrapper = (NioSocketWrapper) key.attachment();
                if (socketWrapper != null) {
                    //we are registering the key to start with, reset the fairness counter.
                    int ops = key.interestOps() | interestOps;
                    socketWrapper.interestOps(ops);
                    key.interestOps(ops);
                } else {
                    socket.getPoller().cancelledKey(key);
                }
            }
        } catch (CancelledKeyException ckx) {
            try {
                socket.getPoller().cancelledKey(key);
            } catch (Exception ignore) {}
        }
    }
}

从上面源码可以看到Poller线程的run()方法最终把NioSocketWrapper和SelectionKey一并交由NioEndpoint#processKey()方法处理,此方法又根据SelectionKey是否可读、可写做了一个分发,如下:

protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
    try {
        if ( close ) {
            cancelledKey(sk);
        } else if ( sk.isValid() && attachment != null ) {
            if (sk.isReadable() || sk.isWritable() ) {
                if ( attachment.getSendfileData() != null ) {
                    processSendfile(sk,attachment, false);
                } else {
                    unreg(sk, attachment, sk.readyOps());
                    boolean closeSocket = false;
                    // 是否可读
                    if (sk.isReadable()) {
                        if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                            closeSocket = true;
                        }
                    }
                    
                    // 是否可写
                    if (!closeSocket && sk.isWritable()) {
                        if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
                            closeSocket = true;
                        }
                    }
                    if (closeSocket) {
                        cancelledKey(sk);
                    }
                }
            }
        } else {
            //invalid key
            cancelledKey(sk);
        }
    } catch ( CancelledKeyException ckx ) {
        cancelledKey(sk);
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        log.error(sm.getString("endpoint.nio.keyProcessingError"), t);
    }
}

processKey()方法根据可读可写调用processSocket()方法,processSocket()方法实现在类AbstractEndpoint中,此方法创建SocketProcessorBase任务并将其交由线程池处理,如下:

public boolean processSocket(SocketWrapperBase<S> socketWrapper,
        SocketEvent event, boolean dispatch) {
    try {
        if (socketWrapper == null) {
            return false;
        }
        // 创建或从缓存中获取SocketProcessorBase
        SocketProcessorBase<S> sc = processorCache.pop();
        if (sc == null) {
            sc = createSocketProcessor(socketWrapper, event);
        } else {
            sc.reset(socketWrapper, event);
        }
        Executor executor = getExecutor();
        if (dispatch && executor != null) {
            // SocketProcessorBase任务交由线程池处理
            executor.execute(sc);
        } else {
            sc.run();
        }
    } catch (RejectedExecutionException ree) {
        getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
        return false;
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        getLog().error(sm.getString("endpoint.process.fail"), t);
        return false;
    }
    return true;
}

如下是Tomcat处理NIO相关类的类图:

image

那么这里基本上可以看到Tomcat默认配置下的线程模型:
1个Acceptor线程+2个Poller线程+最多200个线程池线程

而且每级线程都均搭配一个缓冲队列,默认配置下:

  • Acceptor线程有一个最多容纳100个连接器请求的队列。
  • 每个Poller线程有一个不限大小的SynchronizedQueue队列。
  • 线程池拥有一个不限大小的工作队列。

3.2 Tomcat请求解析

Tomcat对请求对解析涉及到很多类,他们有NioEndpoint#SocketProcessor、AbstractProtocol.ConnectionHandler、Http11Processor、Http11InputBuffer、CoyoteAdapter等,下面跟随请求解析的过程一一介绍这些类。

3.2.1 NioEndpoint#SocketProcessor

SocketProcessorBase即是Tomcat工作线程池中运行的任务,它实现了Runnable接口,实现run()方法的同时又定义了抽象方法doRun(),其实现类NioEndpoint#SocketProcessor实现了doRun()方法,此方法会把SocketWrapperBase交由AbstractEndpoint.Handler#process()方法处理,并根据其返回状态是否为SocketState.CLOSED来决定是否需要关闭Socket,如下:

@Override
protected void doRun() {
    NioChannel socket = socketWrapper.getSocket();
    SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());

    try {
        int handshake = -1;

        try {
            if (key != null) {
                if (socket.isHandshakeComplete()) {
                    handshake = 0;
                } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
                        event == SocketEvent.ERROR) {
                    handshake = -1;
                } else {
                    handshake = socket.handshake(key.isReadable(), key.isWritable());
                    event = SocketEvent.OPEN_READ;
                }
            }
        } catch (IOException x) {
            handshake = -1;
            if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);
        } catch (CancelledKeyException ckx) {
            handshake = -1;
        }
        if (handshake == 0) {
            SocketState state = SocketState.OPEN;
            // 交由AbstractEndpoint.Handler#process()方法处理
            if (event == null) {
                state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
            } else {
                state = getHandler().process(socketWrapper, event);
            }
            if (state == SocketState.CLOSED) {
                // 关闭Socket
                close(socket, key);
            }
        } else if (handshake == -1 ) {
            close(socket, key);
        } else if (handshake == SelectionKey.OP_READ){
            socketWrapper.registerReadInterest();
        } else if (handshake == SelectionKey.OP_WRITE){
            socketWrapper.registerWriteInterest();
        }
    } catch (CancelledKeyException cx) {
        socket.getPoller().cancelledKey(key);
    } catch (VirtualMachineError vme) {
        ExceptionUtils.handleThrowable(vme);
    } catch (Throwable t) {
        log.error(sm.getString("endpoint.processing.fail"), t);
        socket.getPoller().cancelledKey(key);
    } finally {
        socketWrapper = null;
        event = null;
        //return to cache
        if (running && !paused) {
            processorCache.push(this);
        }
    }
}

3.2.2 AbstractProtocol.ConnectionHandler

AbstractEndpoint.Handler的process()方法会调用AbstractHttp11Protocol的createProcessor()方法创建Http11Processor对象并调用其process()方法。

3.2.3 Http11Processor

当创建Http11Processor时,其父类AbstractProcessor的构造函数会创建org.apache.coyote.Request和org.apache.coyote.Response对象。Http11Processor的构造函数则创建了HttpParser
、Http11InputBuffer、Http11OutputBuffer对象。Http11Processor的process()方法在其父类AbstractProcessorLight中实现,此方法则调用Http11Processor的service()方法处理SocketWrapperBase,如下:

@Override
public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
        throws IOException {

    SocketState state = SocketState.CLOSED;
    Iterator<DispatchType> dispatches = null;
    do {
        if (dispatches != null) {
            DispatchType nextDispatch = dispatches.next();
            state = dispatch(nextDispatch.getSocketStatus());
        } else if (status == SocketEvent.DISCONNECT) {
            // Do nothing here, just wait for it to get recycled
        } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
            state = dispatch(status);
            if (state == SocketState.OPEN) {
                // 调用service方法
                state = service(socketWrapper);
            }
        } else if (status == SocketEvent.OPEN_WRITE) {
            // Extra write event likely after async, ignore
            state = SocketState.LONG;
        } else if (status == SocketEvent.OPEN_READ){
            state = service(socketWrapper);
        } else {
            // Default to closing the socket if the SocketEvent passed in
            // is not consistent with the current state of the Processor
            state = SocketState.CLOSED;
        }


        if (state != SocketState.CLOSED && isAsync()) {
            state = asyncPostProcess();
        }

        if (dispatches == null || !dispatches.hasNext()) {
            dispatches = getIteratorAndClearDispatches();
        }
    } while (state == SocketState.ASYNC_END ||
            dispatches != null && state != SocketState.CLOSED);

    return state;
}

Http11Processor的service()方法是Tomcat解析Http协议的核心方法,其负责解析HTTP协议请求行和请求头。它会把通道里的字节读出来并按照Http协议报文进行解析,这里并没把所有Http报文的内容全部解析出来,只是解析出了请求行和请求头,body部分并没有在此处解析。 解析的结果设置到org.apache.coyote.Request对象中,解析完成后会把Request和Response对象交由CoyoteAdapter处理,如下为service()方法源码:

@Override
public SocketState service(SocketWrapperBase<?> socketWrapper)
    throws IOException {
    RequestInfo rp = request.getRequestProcessor();
    rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);

    // Setting up the I/O
    setSocketWrapper(socketWrapper);
    inputBuffer.init(socketWrapper);
    outputBuffer.init(socketWrapper);

    // Flags
    keepAlive = true;
    openSocket = false;
    readComplete = true;
    boolean keptAlive = false;
    SendfileState sendfileState = SendfileState.DONE;

    while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null &&
            sendfileState == SendfileState.DONE && !protocol.isPaused()) {

        try {
            // 解析请求行
            if (!inputBuffer.parseRequestLine(keptAlive, protocol.getConnectionTimeout(),
                    protocol.getKeepAliveTimeout())) {
                if (inputBuffer.getParsingRequestLinePhase() == -1) {
                    return SocketState.UPGRADING;
                } else if (handleIncompleteRequestLineRead()) {
                    break;
                }
            }

            if (protocol.isPaused()) {
                // 503 - Service unavailable
                response.setStatus(503);
                setErrorState(ErrorState.CLOSE_CLEAN, null);
            } else {
                keptAlive = true;
                request.getMimeHeaders().setLimit(protocol.getMaxHeaderCount());
                // 解析请求头
                if (!inputBuffer.parseHeaders()) {
                    openSocket = true;
                    readComplete = false;
                    break;
                }
                if (!protocol.getDisableUploadTimeout()) {
                    socketWrapper.setReadTimeout(protocol.getConnectionUploadTimeout());
                }
            }
        } catch (IOException e) {
            if (log.isDebugEnabled()) {
                log.debug(sm.getString("http11processor.header.parse"), e);
            }
            setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
            break;
        } catch (Throwable t) {
            // 400 - Bad Request
            response.setStatus(400);
            setErrorState(ErrorState.CLOSE_CLEAN, t);
        }
        // 略
        if (getErrorState().isIoAllowed()) {
            try {
                rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
                // 交由CoyoteAdapter处理
                getAdapter().service(request, response);
                
                if(keepAlive && !getErrorState().isError() && !isAsync() &&
                        statusDropsConnection(response.getStatus())) {
                    setErrorState(ErrorState.CLOSE_CLEAN, null);
                }
            } catch (InterruptedIOException e) {
                setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
            } catch (HeadersTooLargeException e) {
                log.error(sm.getString("http11processor.request.process"), e);
                
                if (response.isCommitted()) {
                    setErrorState(ErrorState.CLOSE_NOW, e);
                } else {
                    response.reset();
                    response.setStatus(500);
                    setErrorState(ErrorState.CLOSE_CLEAN, e);
                    response.setHeader("Connection", "close"); // TODO: Remove
                }
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error(sm.getString("http11processor.request.process"), t);
                // 500 - Internal Server Error
                response.setStatus(500);
                setErrorState(ErrorState.CLOSE_CLEAN, t);
                getAdapter().log(request, response, 0);
            }
        }

        // Finish the handling of the request
        rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);
        if (!isAsync()) {
            endRequest();
        }
        rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);

        if (getErrorState().isError()) {
            response.setStatus(500);
        }

        if (!isAsync() || getErrorState().isError()) {
            request.updateCounters();
            if (getErrorState().isIoAllowed()) {
                inputBuffer.nextRequest();
                outputBuffer.nextRequest();
            }
        }

        if (!protocol.getDisableUploadTimeout()) {
            int connectionTimeout = protocol.getConnectionTimeout();
            if(connectionTimeout > 0) {
                socketWrapper.setReadTimeout(connectionTimeout);
            } else {
                socketWrapper.setReadTimeout(0);
            }
        }

        rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);

        sendfileState = processSendfile(socketWrapper);
    }

    rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);

    if (getErrorState().isError() || protocol.isPaused()) {
        return SocketState.CLOSED;
    } else if (isAsync()) {
        return SocketState.LONG;
    } else if (isUpgrade()) {
        return SocketState.UPGRADING;
    } else {
        if (sendfileState == SendfileState.PENDING) {
            return SocketState.SENDFILE;
        } else {
            if (openSocket) {
                if (readComplete) {
                    return SocketState.OPEN;
                } else {
                    return SocketState.LONG;
                }
            } else {
                return SocketState.CLOSED;
            }
        }
    }
}

从上面源码可以看出,请求行和请求头的具体解析是在类Http11InputBuffer中完成的,Http11InputBuffer中定义了一系列解析Http请求行和请求头的方法,例如:parseRequestLine()、parseHeaders()、isChunking()、doRead()等,这些方法都调用到了一个叫fill()的方法,此方法最终会依赖到Http11InputBuffer中关联的SocketWrapperBase对象中的NioChannel对象,类NioChannel提供了对ByteBuffer读写能力。

3.2.4 CoyoteAdapter

CoyoteAdapter类的service()方法按顺序做了如下四个操作:

  1. 创建org.apache.catalina.connector包下的Request(ServletRequest)和Response(ServletResponse)对象并基于org.apache.coyote包下的Request和Response对象设置各种属性。
  2. 基于Request关联的Mapper类找到处理当前请求的Host、Context、Wrapper。
    Mapper类是找到处理请求的Servlet的关键。
  3. 执行容器,即执行Pipeline所维护的一系列Valve。
  4. 调用org.apache.catalina.connector.Response的finishResponse()方法结束响应

Tomcat中名字叫Request和Response的类其实有两对,他们之间的关系如下:

image

CoyoteAdapter的service()方法创建了org.apache.catalina.connector.Request对象和org.apache.catalina.connector.Response对象,并关联org.apache.coyote包下的Request和Response对象,接着调用postParseRequest()方法对这两对Request和Response对象进行各种属性设置,接下来调用了postParseRequest方法,此方法除了调用parsePathParameters()方法解析路径参数之外还,还执行了如下一行代码:

connector.getService().getMapper().map(serverName, decodedURI, version, request.getMappingData());

这行代码涉及到了:

  • 两个核心类:Mapper、MappingData
  • 两个重要的参数(version为null):serverName、decodedURI
    serverName用于找Host,decodedURI用于找Context和Wrapper。

先来看看Mapper和MappingData的结构体:

image

我们会好奇Tomcat是如何找到处理请求的Servlet的,Mapper会告诉我们答案。Mapper的成员变量是一个包含Host、Context、Wrapper的资源池,其方法则提供了根据serverName、decodedURI找寻这些资源的能力,这些方法封装了一系列寻找对应组件的规则。Mapper的相关方法找到对应的Host、Context、Wrapper然后把这些资源汇总到MappingData对象中,如下为MappingData的源码:

public class MappingData {

    public Host host = null;
    public Context context = null;
    public int contextSlashCount = 0;
    public Context[] contexts = null;
    public Wrapper wrapper = null;
    public boolean jspWildCard = false;

    public final MessageBytes contextPath = MessageBytes.newInstance();
    public final MessageBytes requestPath = MessageBytes.newInstance();
    public final MessageBytes wrapperPath = MessageBytes.newInstance();
    public final MessageBytes pathInfo = MessageBytes.newInstance();

    public final MessageBytes redirectPath = MessageBytes.newInstance();

    // Fields used by ApplicationMapping to implement javax.servlet.http.Mapping
    public MappingMatch matchType = null;

    public void recycle() {
        host = null;
        context = null;
        contextSlashCount = 0;
        contexts = null;
        wrapper = null;
        jspWildCard = false;
        contextPath.recycle();
        requestPath.recycle();
        wrapperPath.recycle();
        pathInfo.recycle();
        redirectPath.recycle();
        matchType = null;
    }
}

Mapper的相应方法封装了一系列寻找处理请求的Host、Context、Wrapper的规则,由于相关源码太长这里就不贴了。

找到相应的容器后接下来就是执行容器的Pipeline,即Pipeline所关联的一系列Valve,这些Valve有:

  • StandardEngine的StandardEngineValve。
  • StandardHost的ErrorReportValve和StandardHostValve。
  • StandardContext的NonLoginAuthenticator和StandardContextValve。
  • StandardWrapper的StandardWrapperValve。

如下图:

image

StandardWrapperValve的invoke()方法首先从StandardWrapper处通过调用allocate()分配一个Servlet实例,其次创建ApplicationFilterChain
对象执行其doFilter()方法,ApplicationFilterChain维护了一个Servlet对象和一个ApplicationFilterConfig类型的数组filters,ApplicationFilterConfig持有一个Filter对象,那么ApplicationFilterChain的doFilter()方法实际上是先执行这些过滤器Filter,然后再执行Servlet,这里即是Spring的DispatcherServlet,如下:

@Override
public void doFilter(ServletRequest request, ServletResponse response)
    throws IOException, ServletException {

    if( Globals.IS_SECURITY_ENABLED ) {
        final ServletRequest req = request;
        final ServletResponse res = response;
        try {
            java.security.AccessController.doPrivileged(
                new java.security.PrivilegedExceptionAction<Void>() {
                    @Override
                    public Void run()
                        throws ServletException, IOException {
                        internalDoFilter(req,res);
                        return null;
                    }
                }
            );
        } catch( PrivilegedActionException pe) {}
    } else {
        internalDoFilter(request,response);
    }
}

private void internalDoFilter(ServletRequest request,
                              ServletResponse response)
    throws IOException, ServletException {

    // Call the next filter if there is one
    if (pos < n) {
        ApplicationFilterConfig filterConfig = filters[pos++];
        try {
            Filter filter = filterConfig.getFilter();

            if (request.isAsyncSupported() && "false".equalsIgnoreCase(
                    filterConfig.getFilterDef().getAsyncSupported())) {
                request.setAttribute(Globals.ASYNC_SUPPORTED_ATTR, Boolean.FALSE);
            }
            if( Globals.IS_SECURITY_ENABLED ) {
                final ServletRequest req = request;
                final ServletResponse res = response;
                Principal principal =
                    ((HttpServletRequest) req).getUserPrincipal();

                Object[] args = new Object[]{req, res, this};
                SecurityUtil.doAsPrivilege ("doFilter", filter, classType, args, principal);
            } else {
                // 执行过滤器
                filter.doFilter(request, response, this);
            }
        } catch (IOException | ServletException | RuntimeException e) {
            throw e;
        } catch (Throwable e) {
            e = ExceptionUtils.unwrapInvocationTargetException(e);
            ExceptionUtils.handleThrowable(e);
            throw new ServletException(sm.getString("filterChain.filter"), e);
        }
        return;
    }

    // We fell off the end of the chain -- call the servlet instance
    try {
        if (ApplicationDispatcher.WRAP_SAME_OBJECT) {
            lastServicedRequest.set(request);
            lastServicedResponse.set(response);
        }

        if (request.isAsyncSupported() && !servletSupportsAsync) {
            request.setAttribute(Globals.ASYNC_SUPPORTED_ATTR,
                    Boolean.FALSE);
        }
        // Use potentially wrapped request from this point
        if ((request instanceof HttpServletRequest) &&
                (response instanceof HttpServletResponse) &&
                Globals.IS_SECURITY_ENABLED ) {
            final ServletRequest req = request;
            final ServletResponse res = response;
            Principal principal =
                ((HttpServletRequest) req).getUserPrincipal();
            Object[] args = new Object[]{req, res};
            SecurityUtil.doAsPrivilege("service",
                                       servlet,
                                       classTypeUsedInService,
                                       args,
                                       principal);
        } else {
            // 执行Servlet
            servlet.service(request, response);
        }
    } catch (IOException | ServletException | RuntimeException e) {
        throw e;
    } catch (Throwable e) {
        e = ExceptionUtils.unwrapInvocationTargetException(e);
        ExceptionUtils.handleThrowable(e);
        throw new ServletException(sm.getString("filterChain.servlet"), e);
    } finally {
        if (ApplicationDispatcher.WRAP_SAME_OBJECT) {
            lastServicedRequest.set(null);
            lastServicedResponse.set(null);
        }
    }
}

下面是请求处理的相关类图:

image

3.3 Tomcat响应处理

3.3.1 CoyoteWriter和CoyoteOutputStream

Servlet的Response对象维护两个组件用于响应客户端:ServletOutputStream(实现类为CoyoteOutputStream)和PrintWriter(实现类为CoyoteWriter),这两个组件分别面向字节和字符,它们提供了write()和print()系列方法用于Servlet向客户端发送信息。值得注意的一点是ServletOutputStream和PrintWriter只能选其一进行响应内容的写入,否则会抛出异常。它们之间都关系如下:

image

3.3.2 OutputBuffer和SocketBufferHandler

OutputBuffer和SocketBufferHandler是Servlet响应客户端过程中的两个缓存。

OutputBuffer维护了两个缓存:ByteBuffer和CharBuffer,分别用于字节缓存和字符缓存,默认大小为8KB,虽说有两种但是当刷新时,CharBuffer中的内容还是会转换成字节存储至ByteBuffer中,而且ServletOutputStream也是直接使用ByteBuffer。

SocketBufferHandler是Socket的读写缓存,其维护了两个类型为ByteBuffer的成员变量:readBuffer和writeBuffer,大小为8KB,分别用于读写缓存。SocketBufferHandler在创建的时候可以指定是否使用直接内存,如下:

private volatile ByteBuffer readBuffer;
private volatile ByteBuffer writeBuffer;

private final boolean direct;

public SocketBufferHandler(int readBufferSize, int writeBufferSize,
        boolean direct) {
    this.direct = direct;
    if (direct) {
        readBuffer = ByteBuffer.allocateDirect(readBufferSize);
        writeBuffer = ByteBuffer.allocateDirect(writeBufferSize);
    } else {
        readBuffer = ByteBuffer.allocate(readBufferSize);
        writeBuffer = ByteBuffer.allocate(writeBufferSize);
    }
}

当调用CoyoteWriter和CoyoteOutputStream的write()和print()系列方法时响应内容首先会被暂时缓存至OutputBuffer中,write()和print()系列方都可以多次调用,当OutputBuffer缓存满时响应内容会被刷新至SocketBufferHandler的writeBuffer中,如果SocketBufferHandler的writeBuffer也满了则会把响应内容直接写入SocketChannel中,当然在第一次写入SocketChannel之前Http11Processor的prepareResponse()方法会被调用用于向SocketChannel写入响应状态以及响应头。

当调用CoyoteWriter和CoyoteOutputStream的flush()方法时,会把OutputBuffer中缓存的响应内容刷至SocketBufferHandler的writeBuffer中,而后把SocketBufferHandler的writeBuffer缓存写入SocketChannel中。

当调用CoyoteWriter和CoyoteOutputStream的close()方法时,“0\r\n\r\n”会被写入SocketChannel,这个是是分块传输的结束符,表示服务器响应完毕。

下面以CoyoteWriter的write(String s)方法为例分析其源码的调用轨迹,揭开其响应客户端的神秘面纱。
CoyoteWriter的write(String s)方法为入口,其源码如下:

@Override
public void write(String s) {
    write(s, 0, s.length());
}
    
@Override
public void write(String s, int off, int len) {

    if (error) {
        return;
    }

    try {
        // 调用OutputBuffer的write()方法
        ob.write(s, off, len);
    } catch (IOException e) {
        error = true;
    }
}

OutputBuffer相关源码:

@Override
public void write(String s, int off, int len) throws IOException {

    if (suspended) {
        return;
    }

    if (s == null) {
        throw new NullPointerException(sm.getString("outputBuffer.writeNull"));
    }

    int sOff = off;
    int sEnd = off + len;
    while (sOff < sEnd) {
        // 把字符缓存转换至字节缓存
        int n = transfer(s, sOff, sEnd - sOff, cb);
        sOff += n;
        // 判断是否写满了
        if (isFull(cb)) {
            flushCharBuffer();
        }
    }

    charsWritten += len;
}

private void flushCharBuffer() throws IOException {
    realWriteChars(cb.slice());
    clear(cb);
}

public void realWriteChars(CharBuffer from) throws IOException {

    while (from.remaining() > 0) {
        // 把CharBuffer中缓存的字符转换成字节并缓存至ByteBuffer中
        conv.convert(from, bb);
        if (bb.remaining() == 0) {
            // Break out of the loop if more chars are needed to produce any output
            break;
        }
        if (from.remaining() > 0) {
            // 如果from还有就把字节缓存刷新出去
            flushByteBuffer();
        } else if (conv.isUndeflow() && bb.limit() > bb.capacity() - 4) {
            // See TestOutputBuffer#testUtf8SurrogateBody()
            flushByteBuffer();
        }
    }

}

private void flushByteBuffer() throws IOException {
    // 真的要写字节啦
    realWriteBytes(bb.slice());
    clear(bb);
}
    
public void realWriteBytes(ByteBuffer buf) throws IOException {
    if (closed) {
        return;
    }
    if (coyoteResponse == null) {
        return;
    }

    // If we really have something to write
    if (buf.remaining() > 0) {
        // real write to the adapter
        try {
            coyoteResponse.doWrite(buf);
        } catch (CloseNowException e) {
            closed = true;
            throw e;
        } catch (IOException e) {
            throw new ClientAbortException(e);
        }
    }

}

可以看到数据先是被写入了OutputBuffer的缓存中,当OutputBuffer的缓存ByteBuffer满则会调用coyote.Response的doWrite()把字节数组写出去,看下此方法 的源码:

public void doWrite(ByteBuffer chunk) throws IOException {
    int len = chunk.remaining();
    outputBuffer.doWrite(chunk);
    contentWritten += len - chunk.remaining();
}

发现,此方法直接调用outputBuffer的doWrite()方法,此处的outputBuffer即是Http11OutputBuffer,此类的doWrite()方法如下:

@Override
public int doWrite(ByteBuffer chunk) throws IOException {

    if (!response.isCommitted()) {
        // 如果response还没有提交过则调用Http11Processor的prepareResponse()方法
        // 把响应状态以及响应头写入SocketChannel。
        response.action(ActionCode.COMMIT, null);
    }

    if (lastActiveFilter == -1) {
        return outputStreamOutputBuffer.doWrite(chunk);
    } else {
        return activeFilters[lastActiveFilter].doWrite(chunk);
    }
}

如果这里是第一次写SocketChannel则response.isCommitted()为false,那么response.action(ActionCode.COMMIT, null)则被执行,此方法会调用Http11Processor的prepareResponse()方法把响应状态以及响应头写入SocketChannel。因为没有添加contentLength并且是Http1.1协议所以会添加Transfer-Encoding: chunked这个请求头,即以分块的模式发送响应内容。接下来如果lastActiveFilter为-1说明activeFilters数组中是空的,实际运行中此处数组有一个元素ChunkedOutputFilter,ChunkedOutputFilter是一个把数据组成数据块格式的过滤器,接下来看下它的doWrite()方法:

@Override
public int doWrite(ByteBuffer chunk) throws IOException {

    int result = chunk.remaining();

    if (result <= 0) {
        return 0;
    }
    // chunkHeader设置以16进制表示的chunk的字节长度
    int pos = calculateChunkHeader(result);

    chunkHeader.position(pos + 1).limit(chunkHeader.position() + 9 - pos);
    
    // 写数据块长度
    buffer.doWrite(chunkHeader);

    // 写数据块
    buffer.doWrite(chunk);

    chunkHeader.position(8).limit(10);
    // 写数据块结束字符
    buffer.doWrite(chunkHeader);

    return result;
}

可以看到此方法调用了三次buffer的doWrite()方法写入了一个数据块,下面是一个以数据块响应的小例子:

HTTP/1.1 200 OK
Content-Type: text/plain
Transfer-Encoding: chunked

25\r\n
This is the data in the first chunk\r\n

1C\r\n
and this is the second one\r\n

0\r\n
\r\n

以数据块传输的格式:

  • 第一行:数据块长度十六进制表示+\r\n
  • 第二行:数据块内容+\r\n
  • 结束:0\r\n\r\n

那么此处的buffer又是什么的呢?它是Http11OutputBuffer的内部类SocketOutputBuffer:

protected class SocketOutputBuffer implements HttpOutputBuffer {

    @Override
    public int doWrite(ByteBuffer chunk) throws IOException {
        try {
            int len = chunk.remaining();
            socketWrapper.write(isBlocking(), chunk);
            len -= chunk.remaining();
            byteCount += len;
            return len;
        } catch (IOException ioe) {
            response.action(ActionCode.CLOSE_NOW, ioe);
            // Re-throw
            throw ioe;
        }
    }

    @Override
    public long getBytesWritten() {
        return byteCount;
    }

    @Override
    public void end() throws IOException {
        socketWrapper.flush(true);
    }

    @Override
    public void flush() throws IOException {
        socketWrapper.flush(isBlocking());
    }
}

因为是Http11OutputBuffer的内部类,所以共享了Http11OutputBuffe的属性socketWrapper,即NioSocketWrapper,而且调用了其write()方法,此方法引出了第二个缓存SocketBufferHandler,如下:

public final void write(boolean block, ByteBuffer from) throws IOException {
    if (from == null || from.remaining() == 0) {
        return;
    }

    if (block) {
        writeBlocking(from);
    } else {
        writeNonBlocking(from);
    }
}

protected void writeBlocking(ByteBuffer from) throws IOException {
    if (socketBufferHandler.isWriteBufferEmpty()) {
        // Socket write buffer is empty. Write the provided buffer directly
        // to the network.
        // TODO Shouldn't smaller writes be buffered anyway?
        writeBlockingDirect(from);
    } else {
        socketBufferHandler.configureWriteBufferForWrite();
        transfer(from, socketBufferHandler.getWriteBuffer());
        if (!socketBufferHandler.isWriteBufferWritable()) {
            doWrite(true);
            writeBlockingDirect(from);
        }
    }
}

write()方法的注解写的很有意思,它是这样写的:把给定的数据写到socket写缓存中,这里的写缓存是指SocketBufferHandler的writeBuffer,如果在写的过程中writeBuffer满了,writeBuffer中缓存的内容被写入网络并且此方法又开始填充数据,根据数据的大小,有可能会分多次写入网络。也就是说数据在写入网络(SocketChannel)之前会先缓存至SocketBufferHandler的writeBuffe,当writeBuffe满了才会写入网络。那么接下来就看下数据是如何被写入SocketChannel的,可以看到writeBlocking()方法调用了doWrite()方法,此方法又调用了实现在NioEndpoint.NioSocketWrapper类中的doWrite()方法,其源码如下:

@Override
protected void doWrite(boolean block, ByteBuffer from) throws IOException {
    long writeTimeout = getWriteTimeout();
    Selector selector = null;
    try {
        selector = pool.get();
    } catch (IOException x) {}
    try {
        // 调用NioSelectorPool的write()方法
        pool.write(from, getSocket(), selector, writeTimeout, block);
        if (block) {
            // Make sure we are flushed
            do {
                if (getSocket().flush(true, selector, writeTimeout)) {
                    break;
                }
            } while (true);
        }
        updateLastWrite();
    } finally {
        if (selector != null) {
            pool.put(selector);
        }
    }
}

可以看到在此方法中调用了pool.write()方法并把NioChanncel传递进去,此处的pool即为NioSelectorPool,NioSelectorPool#write()根据是否阻塞走了两个不同的流程,如下:

public int write(ByteBuffer buf, NioChannel socket, Selector selector,
                 long writeTimeout, boolean block) throws IOException {
    if ( SHARED && block ) {
        return blockingSelector.write(buf,socket,writeTimeout);
    }
    SelectionKey key = null;
    int written = 0;
    boolean timedout = false;
    int keycount = 1; //assume we can write
    long time = System.currentTimeMillis(); //start the timeout timer
    try {
        while ( (!timedout) && buf.hasRemaining() ) {
            int cnt = 0;
            if ( keycount > 0 ) { //only write if we were registered for a write
                cnt = socket.write(buf); //write the data
                if (cnt == -1) throw new EOFException();

                written += cnt;
                if (cnt > 0) {
                    time = System.currentTimeMillis(); //reset our timeout timer
                    continue; //we successfully wrote, try again without a selector
                }
                if (cnt==0 && (!block)) break; //don't block
            }
            if ( selector != null ) {
                //register OP_WRITE to the selector
                if (key==null) key = socket.getIOChannel().register(selector, SelectionKey.OP_WRITE);
                else key.interestOps(SelectionKey.OP_WRITE);
                if (writeTimeout==0) {
                    timedout = buf.hasRemaining();
                } else if (writeTimeout<0) {
                    keycount = selector.select();
                } else {
                    keycount = selector.select(writeTimeout);
                }
            }
            if (writeTimeout > 0 && (selector == null || keycount == 0) ) timedout = (System.currentTimeMillis()-time)>=writeTimeout;
        }//while
        if ( timedout ) throw new SocketTimeoutException();
    } finally {
        if (key != null) {
            key.cancel();
            if (selector != null) selector.selectNow();//removes the key from this selector
        }
    }
    return written;
}

可以看到当非阻塞模式时直接调用NioChannel的write()方法,而阻塞模式时则委托给了NioBlockingSelector的write()方法,深入NioBlockingSelector的write()方法发现其也调用了NioChannel的write()方法,那么看下NioChannel的write()方法做了什么:

@Override
public int write(ByteBuffer src) throws IOException {
    checkInterruptStatus();
    return sc.write(src);
}

可以看到这里直接调用了SocketChannel的write()方法,至此,一次网络写入调用完毕,flush()方法和close()方法的调用过程与之大致相似,只是调用过程中方法不同,但最终都会调用SocketChannel的write()方法,close()方法只是写入了一个分块传输的结束符。下面是Servlet响应过程的时序图:

image

当使用Servlet时,我一直纠结在使用CoyoteOutputStream或CoyoteWriter写完响应内容后要不要调用它们的close()方法?经过测试发现好像不调用close()方法响应内容也能正确写回客户端。经过查看Tomcat的相关源码发现,其实是不用调用close()方法的,即使调用了也没有关系,实际上不管我们是否调用了close()方法Tomcat都会帮我们结束响应,Tomcat在执行完Servlet后会判断我们是否通过调用close()方法结束了响应,如果没有则调用Response#finishResponse()方法,此番操作在CoyoteAdapter#service()方法中实现。看下Response#finishResponse()方法的源码:

public void finishResponse() throws IOException {
    // Writing leftover bytes
    outputBuffer.close();
}

这里的outputBuffer即为时序图中OutputBuffer类,从上面的时序图中可以看到此类的close()方法最终结束了响应,OutputBuffer的close()方法第一次被调用后其持有的boolean属性closed被置为true,第二次被调用时如果closed为true就直接return,源码如下:

@Override
public void close() throws IOException {
    // 如果已经调用过则直接return
    if (closed) {
        return;
    }
    if (suspended) {
        return;
    }

    // 略
    
    // 第一次调用标记已经调用过了
    closed = true;

    Request req = (Request) coyoteResponse.getRequest().getNote(CoyoteAdapter.ADAPTER_NOTES);
    req.inputBuffer.close();

    // 结束响应的操作
    coyoteResponse.action(ActionCode.CLOSE, null);
}

从上面的时序图可以看到实际上Response#finishResponse()方法的调用就等同于调用了CoyoteOutputStream或CoyoteWriter的close()方法。

3.3.3 连接关闭

CoyoteWriter和CoyoteOutputStream的close()方法只是结束本次响应,并没有关闭连接,那么连接是什么时候关闭的呢?

正常情况下(没有其他异常发生)有四种情况会导致Tomcat关闭连接:

  1. 客户端设置Connection: close
    这种情况明确表明客户端已不需要此连接。
  2. Tomcat持有的连接数量大于maxKeepAliveRequests(默认100)
    Tomcat默认最大连接数是10000,为了自我保护,当其保持的未关闭连的接数量大于指定值时即使新来的连接指定了Connection: keep-alive Tomcat也不再保持此连接,而是响应完毕后直接关闭。
  3. 客户端主动关闭了连接
    客户端关闭连接会导致Tomcat直接关闭连接。
  4. Tomcat持有连接时长超过了keepAliveTimeout(默认60s,可通过server.connection-timeout参数指定)
    在没有以上三种情况出现的时候,客户端没有指定Connection或者指定了Connection: keep-alive都会使Tomcat维持此连接不关闭,但是Tomcat不会一直这样持有此连接不关闭,当持有这些连接的时长大于keepAliveTimeout时Tomcat就会关闭连接。

由于Http1.1默认Connection: keep-alive所以下面看下Tomcat是如何实现第四种情况的。

把视线拉回NioEndpoint的内部类Poller,Poller的run()方法除了不停的检测是否有PollerEvent到达还在不停的通过调用timeout()方法检测其持有的Selector上的通道是否读超时、写超时,timeout()方法负责检测这些通道是否读超时、写超时,源码如下:

protected void timeout(int keyCount, boolean hasEvents) {
    long now = System.currentTimeMillis();
    if (nextExpiration > 0 && (keyCount > 0 || hasEvents) && (now < nextExpiration) && !close) {
        return;
    }
    //timeout
    int keycount = 0;
    try {
        // 遍历Poller的Selector所持有的连接
        for (SelectionKey key : selector.keys()) {
            keycount++;
            try {
                NioSocketWrapper ka = (NioSocketWrapper) key.attachment();
                if ( ka == null ) {
                    cancelledKey(key); //we don't support any keys without attachments
                } else if (close) {
                    key.interestOps(0);
                    ka.interestOps(0); //avoid duplicate stop calls
                    processKey(key,ka);
                } else if ((ka.interestOps()&SelectionKey.OP_READ) == SelectionKey.OP_READ ||
                          (ka.interestOps()&SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
                    boolean isTimedOut = false;
                    // 检测是否读超时
                    if ((ka.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
                        long delta = now - ka.getLastRead();
                        long timeout = ka.getReadTimeout();
                        isTimedOut = timeout > 0 && delta > timeout;
                    }
                    // 检查是否写超时
                    if (!isTimedOut && (ka.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
                        long delta = now - ka.getLastWrite();
                        long timeout = ka.getWriteTimeout();
                        isTimedOut = timeout > 0 && delta > timeout;
                    }
                    if (isTimedOut) {
                        key.interestOps(0);
                        ka.interestOps(0); //avoid duplicate timeout calls
                        ka.setError(new SocketTimeoutException());
                        // 如果超时当成一个ERROR处理
                        if (!processSocket(ka, SocketEvent.ERROR, true)) {
                            cancelledKey(key);
                        }
                    }
                }
            }catch ( CancelledKeyException ckx ) {
                cancelledKey(key);
            }
        }//for
    } catch (ConcurrentModificationException cme) {}
    long prevExp = nextExpiration;
    nextExpiration = System.currentTimeMillis() +
            socketProperties.getTimeoutInterval();
    
}

可以看到在判断连接是否读写超时时类NioSocketWrapper扮演类一个非常重要的角色,其维护了四个有关超时的属性:

  • lastWrite: 最后一次写的时间戳
  • writeTimeout: 写超时时间
  • lastRead: 最后一次读的时间戳
  • readTimeout: 读超时时间

每当从连接中读写字节时都会更新最后一次读写时间戳,读写超时时间默认60s,在使用Spring Boot时可以通过server.connection-timeout参数设置。

有了以上四个属性后那么读写超时的判断就非常简单了,最后一次读写时间与当前时间差如果大于读写超时时间那么就是读写超时了,此时可以看到这里当成了ERROR事件通过调用AbstractEndpoint#processSocket()方法处理,看下此方法:

public boolean processSocket(SocketWrapperBase<S> socketWrapper,
        SocketEvent event, boolean dispatch) {
    try {
        if (socketWrapper == null) {
            return false;
        }
        SocketProcessorBase<S> sc = processorCache.pop();
        if (sc == null) {
            sc = createSocketProcessor(socketWrapper, event);
        } else {
            sc.reset(socketWrapper, event);
        }
        Executor executor = getExecutor();
        if (dispatch && executor != null) {
            executor.execute(sc);
        } else {
            sc.run();
        }
    } catch (RejectedExecutionException ree) {
        getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
        return false;
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        return false;
    }
    return true;
}

可以看到此方法把关闭连接的操作当成一个任务放入了线程持中,接下来看下NioEndpoint#SocketProcessor任务是如何处理ERROR事件的,其doRun()方法如下:

@Override
protected void doRun() {
    NioChannel socket = socketWrapper.getSocket();
    SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());

    try {
        int handshake = -1;

        try {
            if (key != null) {
                if (socket.isHandshakeComplete()) {
                    handshake = 0;
                } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT ||
                        event == SocketEvent.ERROR) {
                    // 1. handshake设置为-1
                    handshake = -1;
                } 
                // 略
            }
        } catch (IOException x) {
            handshake = -1;
            if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);
        } catch (CancelledKeyException ckx) {
            handshake = -1;
        }
        if (handshake == 0) {
            // 略
        } else if (handshake == -1 ) {
            // 2. 调用close()方法
            close(socket, key);
        } else if (handshake == SelectionKey.OP_READ){
            socketWrapper.registerReadInterest();
        } else if (handshake == SelectionKey.OP_WRITE){
            socketWrapper.registerWriteInterest();
        }
    } catch (CancelledKeyException cx) {
        socket.getPoller().cancelledKey(key);
    } // 略
}

从上面的源码可以看到当event为SocketEvent.ERROR时,handshake被赋值了-1,下面当handshake为-1时,直接调用了close()方法,这里的close()方法是NioEndpoint类中的方法,源码如下:

private void close(NioChannel socket, SelectionKey key) {
    try {
        // 调用Poller#cancelledKey()方法
        if (socket.getPoller().cancelledKey(key) != null) {
            if (running && !paused) {
                if (!nioChannels.push(socket)) {
                    socket.free();
                }
            }
        }
    } catch (Exception x) {
        log.error(sm.getString("endpoint.err.close"), x);
    }
}

可以看到此方法又调用了Poller#cancelledKey()方法,回到Poller#cancelledKey()方法:

public NioSocketWrapper cancelledKey(SelectionKey key) {
    NioSocketWrapper ka = null;
    try {
        if ( key == null ) return null;//nothing to do
        ka = (NioSocketWrapper) key.attach(null);
        if (ka != null) {
            getHandler().release(ka);
        }
        if (key.isValid()) key.cancel();
        if (ka != null) {
            try {
                ka.getSocket().close(true);
            } catch (Exception e){}
        }
        // 略
    } catch (Throwable e) {
        ExceptionUtils.handleThrowable(e);
    }
    return ka;
}

可以看到你这里执行了ka.getSocket().close(true);这么一行代码,这里的ka.getSocket()返回的是NioChannel对象,那么就看下NioChannel#close()方法做了什么操作:

public void close(boolean force) throws IOException {
    if (isOpen() || force ) close();
}

如果连接开启就调用无参的close()方法,相关源码:

@Override
public void close() throws IOException {
    getIOChannel().socket().close();
    getIOChannel().close();
}

public SocketChannel getIOChannel() {
    return sc;
}

protected SocketChannel sc;

这里先关闭SocketChannel的Socket再关闭SocketChannel,至此连接关闭完成。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容