Tomcat NIO 线程模型分析

Tomcat7线程模型

tomcat 的nio 线程模型也是reactor 模型,由accept 线程负责接受连接请求,把请求转发给其中一个Poller
线程,去注册读事件,Poller 线程就负责该连接的读和写,交给后面的线程池去处理,从读报文,触发后面的servlet请求都由线程池的线程完成。

Accept线程

backlog = 100; 默认是100,也就是tcp的accept 队列为100,默认还是比较少的。

最大连接数

maxConnections = 10000; 如果连接数超过了maxConnections,则等待连接释放,其实这里底层TCP 链接是还可以建立的,只有内核的accept 队列没有满,假如tomcat的链接数达到了10000,accept线程就不从accept的队列取出链接,这样就很容易导致不能建立链接了。

核心代码Run 方法如下:

int errorDelay = 0;

            // Loop until we receive a shutdown command
            while (running) {

                // Loop if endpoint is paused
                while (paused && running) {
                    state = AcceptorState.PAUSED;
                    try {
                        Thread.sleep(50);
                    } catch (InterruptedException e) {
                        // Ignore
                    }
                }

                if (!running) {
                    break;
                }
                state = AcceptorState.RUNNING;

                try {
                    //if we have reached max connections, wait
                    countUpOrAwaitConnection();

                    SocketChannel socket = null;
                    try {
                        // Accept the next incoming connection from the server
                        // socket
                        socket = serverSock.accept();
                    } catch (IOException ioe) {
                        // We didn't get a socket
                        countDownConnection();
                        if (running) {
                            // Introduce delay if necessary
                            errorDelay = handleExceptionWithDelay(errorDelay);
                            // re-throw
                            throw ioe;
                        } else {
                            break;
                        }
                    }
                    // Successful accept, reset the error delay
                    errorDelay = 0;

                    // Configure the socket
                    if (running && !paused) {
                        // setSocketOptions() will hand the socket off to
                        //这里把sock 分发到poller 线程
                        // an appropriate processor if successful
                        if (!setSocketOptions(socket)) {
                            closeSocket(socket);
                        }
                    } else {
                        closeSocket(socket);
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.error(sm.getString("endpoint.accept.fail"), t);
                }
            }
            state = AcceptorState.ENDED;
        }

Poller 线程

Poller线程负责轮询注册在对应selector 上连接的读写请求事件。因为Accept接收到链接请求后,回封装成一个event,放到Poller的事件队列,poller 回从里面取出事件获取socket。

Poller 线程个数 pollerThreadCount默认2个

pollerThreadCount = Math.min(2,Runtime.getRuntime().availableProcessors());

Accept 选择poller是Round Robin,所以两个poller线程负责的socket 各占一半

同步读

poller io 线程还有点和reactor 模型不一样的是,poller 线程不负责具体的读http 消息,而是有可读事件时,分配给 SocketProcessor 来处理,SocketProcessor 是一个task,具体由tomcat的工作线程池来执行,所以一个连接上的http 请求数据报的读取和poller 的线程是异步的,正是因为这样,poller 在分配一个读事件给SocketProcessor 后,就取消了可读事件的监听,下面是poller worker线程的processKey 方法,用来分配读写事件。

protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
    
    try {
        if ( close ) {
            cancelledKey(sk);
        } else if ( sk.isValid() && attachment != null ) {
            if (sk.isReadable() || sk.isWritable() ) {
                if ( attachment.getSendfileData() != null ) {
                    processSendfile(sk,attachment, false);
                } else {
                    //先取消读事件,意思是防止读
                    unreg(sk, attachment, sk.readyOps());
                    boolean closeSocket = false;
                    // Read goes before write
                    if (sk.isReadable()) {
                        //创建socketprocessor来读http 请求包和业务逻辑的执行
                        if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                            closeSocket = true;
                        }
                    }
                    if (!closeSocket && sk.isWritable()) {
                        if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
                            closeSocket = true;
                        }
                    }
                    if (closeSocket) {
                        cancelledKey(sk);
                    }
                }
            }
        } else {
            //invalid key
            cancelledKey(sk);
        }
    } catch ( CancelledKeyException ckx ) {
        cancelledKey(sk);
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        log.error("",t);
    }
}

注意上面的unreg方法如下

protected void unreg(SelectionKey sk, NioSocketWrapper attachment, int readyOps) {
    //this is a must, so that we don't have multiple threads messing with the socket
    reg(sk,attachment,sk.interestOps()& (~readyOps));
}

官方解释是说防止多个线程同时读一个socket,也就是一个请求连接的数据。想象一种场景,如果一个hSocketProcess ttp请求的包只来了一部分,也就是SocketProcess 在等待后面一部分,后面部分来的时候,触发读事件,重新创建一个SocketProcessor,这样会导致两个processor 同时处理一个socket数据,会导致混乱。

何时重新注册读事件
  • 1 上次请求处理完成,会重新注册读事件,因为连接是持久keeplivve的
  • 2 处理半包的情况,需要重新注册读事件
//状态为LONG时,代表半包的状态,没有读完,需要等待,并重新注册可读事件,
//而且socket 关联的process 不能从connectionsremove掉 
if (state == SocketState.LONG) {
    // In the middle of processing a request/response. Keep the
    // socket associated with the processor. Exact requirements
    // depend on type of long poll
    //longPoll 如果不是异步请求,会注册读事件
    longPoll(wrapper, processor);
    if (processor.isAsync()) {
        getProtocol().addWaitingProcessor(processor);
    }
} else if (state == SocketState.OPEN) {
    // In keep-alive but between requests. OK to recycle
    // processor. Continue to poll for the next request.
    //处理完成的请求,可以remove掉process,因为不知道下次请求什么时候来,
    // 同时也需要重新注册读事件
    connections.remove(socket);
    getLog().info("Tomcat process finish start to release process "+processor.getRequest().toString());
    release(processor);
    getLog().info("Tomcat release process "+processor.getRequest().toString()+ "start to register read event for next read!!!");
    wrapper.registerReadInterest();
}

所以从上面的分析可以得出结论,tomcat nio 模型读不同于netty的reactor 模型,io 读写由io 线程负责,读完了就交给业务线程支持,继续读后面的请求数据。但是tomcat是一个请求读完,处理完业务逻辑,再继续读下一个请求的数据,这对http 这种独占的协议无可厚非,如果想在http协议上实现类似rpc 自定义协议的连接复用时,即发请求可以不用等当前请求返回,就可以继续发,对发送多可以实现少量的连接发送大量的请求,但是由于服务端不能并发的读,必然会导致读缓冲区瞬间满了,不能被读走的请求,由于tcp 滑动窗口因子,也会导致发送方停止下来

工作线程池executor

执行请求的线程池

public void createExecutor() {
        internalExecutor = true;
        TaskQueue taskqueue = new TaskQueue();
        TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
        executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
        taskqueue.setParent( (ThreadPoolExecutor) executor);
    }
  • 队列:无界队列 TaskQueue,
  • 最小线 程minSpareThreads = 10
  • 最大线程 maxThreads = 200

TaskQueue

taskQueue 对 offer方法做了些手脚,就是让exeecutor的核心线程池达到最大值,如果按正常的逻辑,当线程超过CoreSize 时,任务回往offer到TaskQueue 中,而tomcat的TaskQueue 是无界的队列,所以默认的话tomcat都只有core size个线程在跑,这样估计吞吐量不够,所以tomcat的TaskQueue修改了offer方法,如下:

@Override
public boolean offer(Runnable o) {
  //we can't do any checks
    if (parent==null) return super.offer(o);
    //we are maxed out on threads, simply queue the object
    if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
    //we have idle threads, just add it to the queue
    if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
    //if we have less threads than maximum force creation of a new thread
    //关键点在这里,只要工作线程小于最大值,就返回false,这时线程池会去创建新的线程。
    if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
    //if we reached here, we need to add it to the queue
    return super.offer(o);
}

如果用来tomcat sever.xml 指定的 exector ,即把Executor 启用

<Executor name="tomcatThreadPool" namePrefix="catalina-exec-"
        maxThreads="150" minSpareThreads="4" maxQueueSize="1000"/>

则创建的是Tomcat 自己实现的StandardThreadExecutor,该线程池唯一不同的是,可以指定队列容量的大小,默认是Integer.MAX_VALUE,相当于无界l。
可以通过maxQueueSize 属性指定,代码如下:

@Override
protected void startInternal() throws LifecycleException {

    taskqueue = new TaskQueue(maxQueueSize);
    TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
    executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
    executor.setThreadRenewalDelay(threadRenewalDelay);
    if (prestartminSpareThreads) {
        executor.prestartAllCoreThreads();
    }
    taskqueue.setParent(executor);

    setState(LifecycleState.STARTING);
}

Tomcat 异步处理

Servlet3.0 支持了异步,tomcat7 对异步也要支持,在tomcat的工作线程处理完后,如果时异步的话,不能结束掉当前这个请求,要等待业务线程触发了asyncContext.complete() 方法,执行这个complete时,tomcat 会把该请求对于的socketprocess 获取到,再教给上面说的executor 去执行。所以我们在通过request.startAsynce()时,最好不要用asyncContext.start()方法去执行一些操作,这样的话,这个异步处理还是需要tomcat的线程,来执行,就没有意义了。

Tomcat 异步写

tomcat 的 response flush时,是阻塞的,如果写缓冲区不可用,则会阻塞住flush的线程,如果想要异步flush。则需要给response的outputStream 添加一个writerListener,有了writerListener tomcat就异步写,不会阻塞。但是需要注意的是,必须用tomcat的ServletOutputStream 才支持,默认的servlet api 下的ServletOutputStream是没有该方法的。

public abstract voidsetWriteListener(javax.servlet.WriteListener listener);
// If we know that the request is bad this early, add the
// Connection: close header.
if (keepAlive && statusDropsConnection(statusCode)) {
    keepAlive = false;
}
if (!keepAlive) {
    // Avoid adding the close header twice
    if (!connectionClosePresent) {
        headers.addValue(Constants.CONNECTION).setString(
                Constants.CLOSE);
    }
} else if (!http11 && !getErrorState().isError()) {
    headers.addValue(Constants.CONNECTION).setString(Constants.KEEPALIVE);
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容