Thrift之TServer服务模型

Thrift提供的服务类实现有:

Tserver服务模型
  • 阻塞服务模型: TSimpleServer,TThreadPoolServer
  • 非阻塞服务模型:TNonblockingServer,THsHaServer,TThreadedSelectorServer

TSimpleServer

使用的是单线程阻塞模型,他会启动一个线程,循环监听server的端口,并接收请求来进行处理。每次只能接收和处理一个socket连接,这种模型一般都不会使用。

TThreadPoolServer

为了解决单线程Server的问题,TThreadPoolServer通过线程池的方式来处理请求,线程池使用了SynchronousQueue同步队列。

private static ExecutorService createDefaultExecutorService(Args args) {
    SynchronousQueue<Runnable> executorQueue =
      new SynchronousQueue<Runnable>();
    return new ThreadPoolExecutor(args.minWorkerThreads,
                                  args.maxWorkerThreads,
                                  args.stopTimeoutVal,
                                  TimeUnit.SECONDS,
                                  executorQueue);
  }

主线程一直循环接收请求,接收到请求时将请求封装为WorkerProcess抛给线程池来处理。这种仍然是多线程阻塞模式


TThreadPoolServer

TNonblockingServer

在并发的场景下,如果并发数大于上述线程池的大小时,同样会导致并发的问题,所以TNonblockingServer通过使用java NIO的方法来解决线程池的问题。TNonblockingServer采用的是单线程非阻塞的模式,借助Channel/Selector机制。所有的socket都会被注册到selector上,在一个线程循环监控所有的socket。每次selector完成一次select,就会将已就绪的socket取出来进行处理。

  /**
   启动一个线程
   */
  @Override
  protected boolean startThreads() {
    // start the selector
    try {
      selectAcceptThread_ = new SelectAcceptThread((TNonblockingServerTransport)serverTransport_);
      selectAcceptThread_.start();
      return true;
    } catch (IOException e) {
      LOGGER.error("Failed to start selector thread!", e);
      return false;
    }
  }
image.png
/**
     选取并处理io事件,对于有数据到来的 socket 进行数据读取操作,
对于有数据发送的 socket 则进行数据发送,
对于监听 socket 则产生一个新业务 socket 并将其注册到 selector 中。
**/
    private void select() {
      try {
        // wait for io events.
        selector.select();

        // process the io events we received
        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
        while (!stopped_ && selectedKeys.hasNext()) {
          SelectionKey key = selectedKeys.next();
          selectedKeys.remove();

          // skip if not valid
          if (!key.isValid()) {
            cleanupSelectionKey(key);
            continue;
          }

          // if the key is marked Accept, then it has to be the server
          // transport.
          if (key.isAcceptable()) {
            handleAccept();
          } else if (key.isReadable()) {
            // deal with reads
            handleRead(key);
          } else if (key.isWritable()) {
            // deal with writes
            handleWrite(key);
          } else {
            LOGGER.warn("Unexpected state in select! " + key.interestOps());
          }
        }
      } catch (IOException e) {
        LOGGER.warn("Got an IOException while selecting!", e);
      }
    }

select方法的执行是在上边启动的线程中,那么很明显,如果接受到socket的任务后handle中有阻塞的操作,必然会导致整个服务的阻塞。而且TNonblockingServer是单线程的模型,对于并发调用的任务依然是按顺序一个一个的执行。所以,这种服务模型比较适应于并发高,业务逻辑简单的场景。对于并发低,接收到数据执行的任务比较繁重的场景,可能就需要用到别的服务模型了。
Java NIO 参考文章

THsHaServer

由于TNonBlockingServer的缺点,THsHaServer继承了TNonBlockingServer,并通过线程池来提高任务处理的并发能力。THsHaServer是半同步半异步的处理模式,Half-Aysnc用于IO事件处理(Accept/Read/Write),Half-Sync用于业务handler对rpc的同步处理上。

半同步:通过SelectAcceptThread线程同步轮询IO就绪事件,调用就绪的channel来accept,read,write事件。
半异步:将上述事件的调用封装成一个Runnale交给线程池执行,而同步轮询的SelectAcceptThread线程中直接返回进行下一轮轮询。

THsHaServer启动时创建线程池


THsHaServer创建线程池

THsHaServer通过重载Invoke,通过线程池来处理所有的事件:


重载Invoke

显而易见,THsHaServer的多线程完美解决的了TNonBlocking的单线程的事件处理逻辑,但是线程池同样是有上限的,并发量较大时,并且单次发送的数据量比较大时,监听到的新的socket请求同样不会被及时处理。

TThreadedSelectorServer

TThreadedSelectorServer是当前Thrift中能提供的最高级的线程服务模型。

AcceptThread线程对象,专门负责处理监听socket上的新连接。
Set<SelectorThread>负责处理所有的网络I/O请求。
invoker负责处理SelectorThread接收到的请求。


TThreadedSelectorServer的三个参数
执行逻辑

源码分析
accept线程启动时通过循环一直进行select

protected class AcceptThread extends Thread {

    // The listen socket to accept on
    private final TNonblockingServerTransport serverTransport;
    private final Selector acceptSelector;

    private final SelectorThreadLoadBalancer threadChooser;

    /**
     * Set up the AcceptThead
     * 
     * @throws IOException
     */
    public AcceptThread(TNonblockingServerTransport serverTransport,
        SelectorThreadLoadBalancer threadChooser) throws IOException {
      this.serverTransport = serverTransport;
      this.threadChooser = threadChooser;
      this.acceptSelector = SelectorProvider.provider().openSelector();
      this.serverTransport.registerSelector(acceptSelector);
    }

    /**
     * The work loop. Selects on the server transport and accepts. If there was
     * a server transport that had blocking accepts, and returned on blocking
     * client transports, that should be used instead
     */
    public void run() {
      try {
        if (eventHandler_ != null) {
          eventHandler_.preServe();
        }

        while (!stopped_) {
          select();
        }
      } catch (Throwable t) {
        LOGGER.error("run() on AcceptThread exiting due to uncaught error", t);
      } finally {
        try {
          acceptSelector.close();
        } catch (IOException e) {
          LOGGER.error("Got an IOException while closing accept selector!", e);
        }
        // This will wake up the selector threads
        TThreadedSelectorServer.this.stop();
      }
    }

acceptSelector选择器等到connect事件。判断selectedKeys是不是accept事件,如果是则调用handleAccept();否则不进行处理

private void select() {
      try {
        // wait for connect events.
        acceptSelector.select();

        // process the io events we received
        Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();
        while (!stopped_ && selectedKeys.hasNext()) {
          SelectionKey key = selectedKeys.next();
          selectedKeys.remove();

          // skip if not valid
          if (!key.isValid()) {
            continue;
          }

          if (key.isAcceptable()) {
            handleAccept();
          } else {
            LOGGER.warn("Unexpected state in select! " + key.interestOps());
          }
        }
      } catch (IOException e) {
        LOGGER.warn("Got an IOException while selecting!", e);
      }
    }

通过doAccept获取serverTransport中一个TNonblockingTransport,通过threadChooser线程负载均衡器来选取selector线程,通过doAddAccept将selector和TNonblockingTransport绑定,来完成接下来的I/O读写事件。

/**
     * Accept a new connection.
     */
    private void handleAccept() {
      final TNonblockingTransport client = doAccept();
      if (client != null) {
        // Pass this connection to a selector thread
        final SelectorThread targetThread = threadChooser.nextThread();

        if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {
          doAddAccept(targetThread, client);
        } else {
          // FAIR_ACCEPT
          try {
            invoker.submit(new Runnable() {
              public void run() {
                doAddAccept(targetThread, client);
              }
            });
          } catch (RejectedExecutionException rx) {
            LOGGER.warn("ExecutorService rejected accept registration!", rx);
            // close immediately
            client.close();
          }
        }
      }
    }

调用SelectorThread的addAcceptedConnection,通过wakeup方法唤醒SelectorThread中的seletor。

private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {
      if (!thread.addAcceptedConnection(client)) {
        client.close();
      }
    }

 public boolean addAcceptedConnection(TNonblockingTransport accepted) {
    try {
      acceptedQueue.put(accepted);
    } catch (InterruptedException e) {
      LOGGER.warn("Interrupted while adding accepted connection!", e);
      return false;
    }
      selector.wakeup();
      return true;
    }

SelectorThread线程的run方法实现:

    public void run() {
      try {
        while (!stopped_) {
          select();
          processAcceptedConnections();
          processInterestChanges();
        }
        for (SelectionKey selectionKey : selector.keys()) {
          cleanupSelectionKey(selectionKey);
        }
      } catch (Throwable t) {
        LOGGER.error("run() on SelectorThread exiting due to uncaught error", t);
      } finally {
        try {
          selector.close();
        } catch (IOException e) {
          LOGGER.error("Got an IOException while closing selector!", e);
        }
        // This will wake up the accept thread and the other selector threads
        TThreadedSelectorServer.this.stop();
      }
    }

SelectorThread方法的select()监听IO事件,仅仅用于处理数据读取和数据写入。
当selector可读时,就会调用handleRead;当selector可写时,就会调用handlerWrite;

    /*
     * Select and process IO events appropriately: If there are existing
     * connections with data waiting to be read, read it, buffering until a
     * whole frame has been read. If there are any pending responses, buffer
     * them until their target client is available, and then send the data.
     */
    private void select() {
      try {
        // wait for io events.
        selector.select();

        // process the io events we received
        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
        while (!stopped_ && selectedKeys.hasNext()) {
          SelectionKey key = selectedKeys.next();
          selectedKeys.remove();

          // skip if not valid
          if (!key.isValid()) {
            cleanupSelectionKey(key);
            continue;
          }

          if (key.isReadable()) {
            // deal with reads
            handleRead(key);
          } else if (key.isWritable()) {
            // deal with writes
            handleWrite(key);
          } else {
            LOGGER.warn("Unexpected state in select! " + key.interestOps());
          }
        }
      } catch (IOException e) {
        LOGGER.warn("Got an IOException while selecting!", e);
      }
    }

如果连接中有数据可读,读取完数据之后将会缓存在Frame中,如果Frame已经完整可读时,将会通过requestInvoke方法,将请求的数据封装为一个Runnable对象,提交到线程池进行执行。

protected void handleRead(SelectionKey key) {
      FrameBuffer buffer = (FrameBuffer) key.attachment();
      if (!buffer.read()) {
        cleanupSelectionKey(key);
        return;
      }

      // if the buffer's frame read is complete, invoke the method.
      if (buffer.isFrameFullyRead()) {
        if (!requestInvoke(buffer)) {
          cleanupSelectionKey(key);
        }
      }
    }

如果连接中有写数据已经处理完了,整个Rpc过程也就结束了。

    protected void handleWrite(SelectionKey key) {
      FrameBuffer buffer = (FrameBuffer) key.attachment();
      if (!buffer.write()) {
        cleanupSelectionKey(key);
      }
    }

select()方法完成后,线程继续运行processAcceptedConnections()方法处理下一个连接的IO事件
当前seletorThread线程会尝试从阻塞队列获取下一个accepted的TNonblockingTransport来处理读写I/O事件。也正是通过这种方式来复用seletorThread。

private void processAcceptedConnections() {
      // Register accepted connections
      while (!stopped_) {
        TNonblockingTransport accepted = acceptedQueue.poll();
        if (accepted == null) {
          break;
        }
        registerAccepted(accepted);
      }
    }

    protected FrameBuffer createFrameBuffer(final TNonblockingTransport trans,
        final SelectionKey selectionKey,
        final AbstractSelectThread selectThread) {
        return processorFactory_.isAsyncProcessor() ?
                  new AsyncFrameBuffer(trans, selectionKey, selectThread) :
                  new FrameBuffer(trans, selectionKey, selectThread);
    }

    private void registerAccepted(TNonblockingTransport accepted) {
      SelectionKey clientKey = null;
      try {
        clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ);

        FrameBuffer frameBuffer = createFrameBuffer(accepted, clientKey, SelectorThread.this);

        clientKey.attach(frameBuffer);
      } catch (IOException e) {
        LOGGER.warn("Failed to register accepted connection to selector!", e);
        if (clientKey != null) {
          cleanupSelectionKey(clientKey);
        }
        accepted.close();
      }
    }

结合Java NIO,我们可以看到Thrift四种服务模式TSimpleServer,TThreadpoolServer,TNonBlockingServer,THsHaServer,TThreadedSelectorServer的不同特点。同样通过他们之间的对比,我们能够看到没有更好的服务模式,只有更合适的服务模式。

参考文章:
Apache Thrift系列详解(二) - 网络服务模型
Thrift 服务模型和序列化机制深入学习

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,948评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,371评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,490评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,521评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,627评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,842评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,997评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,741评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,203评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,534评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,673评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,339评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,955评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,770评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,000评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,394评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,562评论 2 349

推荐阅读更多精彩内容

  • 前言 Thrift提供的网络服务模型:单线程、多线程、事件驱动,从另一个角度划分为:阻塞服务模型、非阻塞服务模型。...
    零壹技术栈阅读 1,754评论 0 3
  • NIO(Non-blocking I/O,在Java领域,也称为New I/O),是一种同步非阻塞的I/O模型,也...
    Demon_code阅读 388评论 0 0
  • Thrift是什么? Thrift是Facebook于2007年开发的跨语言的rpc服框架,提供多语言的编译功能,...
    jiangmo阅读 9,403评论 0 6
  • 限于篇幅关系,在观察源码的时候,只列举了部分源代码 TServer类层次体系 TSimpleServer/TThr...
    shunyang阅读 2,869评论 0 9
  • 注:1)本人非科班出身,文章的来源主要是基于一些能找到的资料,在理解的基础上做一些总结归纳,以期对IO相关的知识体...
    Drew_Zhong阅读 1,003评论 0 2