Thrift RPC实战(二) Thrift 网络服务模型

限于篇幅关系,在观察源码的时候,只列举了部分源代码

TServer类层次体系

Paste_Image.png

TSimpleServer/TThreadPoolServer是阻塞服务模型
TNonblockingServer/THsHaServer/TThreadedSelectotServer是非阻塞服务模型(NIO)

1 TServer抽象类的定义

内部静态类Args的定义, 用于TServer类用于串联软件栈(传输层, 协议层, 处理层)


public abstract class TServer {

  public static class Args extends AbstractServerArgs<Args> {

    public Args(TServerTransport transport) {

      super(transport);

    }

  }

 

  public static abstract class AbstractServerArgs<T extends AbstractServerArgs<T>> {

    public AbstractServerArgs(TServerTransport transport);

    public T processorFactory(TProcessorFactory factory);

    public T processor(TProcessor processor);

    public T transportFactory(TTransportFactory factory);

    public T protocolFactory(TProtocolFactory factory);

  }

}

TServer类定义的抽象类


public abstract class TServer {

  public abstract void serve();

  public void stop();

 

  public boolean isServing();

  public void setServerEventHandler(TServerEventHandler eventHandler);

}

评注:

抽象函数serve由具体的TServer实例来实现, 而并非所有的服务都需要优雅的退出, 因此stop没有被定义为抽象

2 TSimpleServer

TSimpleServer的工作模式采用最简单的阻塞IO,实现方法简洁明了,便于理解,但是一次只能接收和处理一个socket连接,效率比较低,主要用于演示Thrift的工作过程,在实际开发过程中很少用到它。

工作方式如图:


Paste_Image.png

抽象的代码可简单描述如下:


// *) server socket进行监听

serverSocket.listen();

while ( isServing() ) {

  // *) 接受socket链接

  client = serverSocket.accept();

  // *) 封装处理器

  processor = factory.getProcess(client);

  while ( true ) {

    // *) 阻塞处理rpc的输入/输出

    if ( !processor.process(input, output) ) {

      break;   

    }  

  }

}

3 ThreadPoolServer

TThreadPoolServer模式采用阻塞socket方式工作,,主线程负责阻塞式监听“监听socket”中是否有新socket到来,业务处理交由一个线程池来处

工作模式图:


Paste_Image.png

ThreadPoolServer解决了TSimple不支持并发和多连接的问题, 引入了线程池. 实现的模型是One Thread Per Connection
线程池代码片段:


private static ExecutorService createDefaultExecutorService(Args args) {
  SynchronousQueue<Runnable> executorQueue =
    new SynchronousQueue<Runnable>();
  return new ThreadPoolExecutor(args.minWorkerThreads,
                                args.maxWorkerThreads,
                                args.stopTimeoutVal,
                                TimeUnit.SECONDS,
                                executorQueue);
}

评注:
  采用同步队列(SynchronousQueue), 线程池采用能线程数可伸缩的模式.
主线程循环简单描述代码:


setServing(true);

while (!stopped_) {

  try {

    TTransport client = serverTransport_.accept();

    WorkerProcess wp = new WorkerProcess(client);

    executorService_.execute(wp);

  } catch (TTransportException ttx) {

  }

}

TThreadPoolServer模式优点:
线程池模式中,拆分了监听线程(accept)和处理客户端连接的工作线程(worker),数据读取和业务处理都交由线程池完成,主线程只负责监听新连接,因此在并发量较大时新连接也能够被及时接受。线程池模式比较适合服务器端能预知最多有多少个客户端并发的情况,这时每个请求都能被业务线程池及时处理,性能也非常高。
TThreadPoolServer模式缺点:
线程池模式的处理能力受限于线程池的工作能力,当并发请求数大于线程池中的线程数时,新请求也只能排队等待

4 TNonblockingServer

TNonblockingServer该模式也是单线程工作,但是采用NIO的模式, 借助Channel/Selector机制, 采用IO事件模型来处理.

所有的socket都被注册到selector中,在一个线程中通过seletor循环监控所有的socket,每次selector结束时,处理所有的处于就绪状态的socket,对于有数据到来的socket进行数据读取操作,对于有数据发送的socket则进行数据发送,对于监听socket则产生一个新业务socket并将其注册到selector中。

工作原理图:

Paste_Image.png

nio部分关键代码如下:


private void select() {
  try {
    // wait for io events.
    selector.select();

    // process the io events we received
    Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
    while (!stopped_ && selectedKeys.hasNext()) {
      SelectionKey key = selectedKeys.next();
      selectedKeys.remove();

      // skip if not valid
      if (!key.isValid()) {
        cleanupSelectionKey(key);
        continue;
      }

      // if the key is marked Accept, then it has to be the server
      // transport.
      if (key.isAcceptable()) {
        handleAccept();
      } else if (key.isReadable()) {
        // deal with reads
        handleRead(key);
      } else if (key.isWritable()) {
        // deal with writes
        handleWrite(key);
      } else {
        LOGGER.warn("Unexpected state in select! " + key.interestOps());
      }
    }
  } catch (IOException e) {
    LOGGER.warn("Got an IOException while selecting!", e);
  }
}

TNonblockingServer模式优点:
相比于TSimpleServer效率提升主要体现在IO多路复用上,TNonblockingServer采用非阻塞IO,对accept/read/write等IO事件进行监控和处理,同时监控多个socket的状态变化;
TNonblockingServer模式缺点:
TNonblockingServer模式在业务处理上还是采用单线程顺序来完成,在业务处理比较复杂、耗时的时候,例如某些接口函数需要读取数据库执行时间较长,会导致整个服务被阻塞住,此时该模式效率也不高,因为多个调用请求任务依然是顺序一个接一个执行

5 THsHaServer

鉴于TNonblockingServer的缺点, THsHaServer继承TNonblockingServer,引入了线程池去处理, 其模型把读写任务放到线程池去处理.THsHaServer是Half-sync/Half-async的处理模式, Half-aysnc是在处理IO事件上(accept/read/write io), Half-sync用于handler对rpc的同步处理上.

工作模式图:

Paste_Image.png

/**
 * Helper to create an invoker pool
 */
protected static ExecutorService createInvokerPool(Args options) {
  int minWorkerThreads = options.minWorkerThreads;
  int maxWorkerThreads = options.maxWorkerThreads;
  int stopTimeoutVal = options.stopTimeoutVal;
  TimeUnit stopTimeoutUnit = options.stopTimeoutUnit;

  LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
  ExecutorService invoker = new ThreadPoolExecutor(minWorkerThreads,
    maxWorkerThreads, stopTimeoutVal, stopTimeoutUnit, queue);

  return invoker;
}

THsHaServer的优点:
与TNonblockingServer模式相比,THsHaServer在完成数据读取之后,将业务处理过程交由一个线程池来完成,主线程直接返回进行下一次循环操作,效率大大提升;
THsHaServer的缺点:
主线程需要完成对所有socket的监听以及数据读写的工作,当并发请求数较大时,且发送数据量较多时,监听socket上新连接请求不能被及时接受。

6. TThreadedSelectorServer

TThreadedSelectorServer是对以上NonblockingServer的扩充, 其分离了Accept和Read/Write的Selector线程, 同时引入Worker工作线程池. 它也是种Half-sync/Half-async的服务模型

TThreadedSelectorServer模式是目前Thrift提供的最高级的模式,它内部有如果几个部分构成:
(1) 一个AcceptThread线程对象,专门用于处理监听socket上的新连接;
(2) 若干个SelectorThread对象专门用于处理业务socket的网络I/O操作,所有网络数据的读写均是有这些线程来完成;
(3) 一个负载均衡器SelectorThreadLoadBalancer对象,主要用于AcceptThread线程接收到一个新socket连接请求时,决定将这个新连接请求分配给哪个SelectorThread线程。
(4) 一个ExecutorService类型的工作线程池,在SelectorThread线程中,监听到有业务socket中有调用请求过来,则将请求读取之后,交个ExecutorService线程池中的线程完成此次调用的具体执行;主要用于处理每个rpc请求的handler回调处理(这部分是同步的).
工作模式图:

Paste_Image.png

TThreadedSelectorServer模式中有一个专门的线程AcceptThread用于处理新连接请求,因此能够及时响应大量并发连接请求;另外它将网络I/O操作分散到多个SelectorThread线程中来完成,因此能够快速对网络I/O进行读写操作,能够很好地应对网络I/O较多的情况

从accpect线程到selectorThreads关键代码

protected boolean startThreads() {
  try {
    for (int i = 0; i < args.selectorThreads; ++i) {
      selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));//建立事件选择线程池
    }
    acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
      createSelectorThreadLoadBalancer(selectorThreads));//建立accept接受请求线程
    for (SelectorThread thread : selectorThreads) {
      thread.start();
    }
    acceptThread.start();
    return true;
  } catch (IOException e) {
    LOGGER.error("Failed to start threads!", e);
    return false;
  }
}

负载均衡器SelectorThreadLoadBalancer对象部分关键代码:

protected SelectorThreadLoadBalancer createSelectorThreadLoadBalancer(Collection<? extends SelectorThread> threads) {
    return new SelectorThreadLoadBalancer(threads);
}

/**
 * A round robin load balancer for choosing selector threads for new
 * connections.
 */
protected static class SelectorThreadLoadBalancer {
    private final Collection<? extends SelectorThread> threads;
    private Iterator<? extends SelectorThread> nextThreadIterator;

    public <T extends SelectorThread> SelectorThreadLoadBalancer(Collection<T> threads) {
        if (threads.isEmpty()) {
            throw new IllegalArgumentException("At least one selector thread is required");
        }
        this.threads = Collections.unmodifiableList(new ArrayList<T>(threads));
        nextThreadIterator = this.threads.iterator();
    }
    //根据循环负载均衡策略获取一个SelectorThread
    public SelectorThread nextThread() {
        // Choose a selector thread (round robin)
        if (!nextThreadIterator.hasNext()) {
            nextThreadIterator = threads.iterator();
        }
        return nextThreadIterator.next();
    }
}

从SelectorThread线程中,监听到有业务socket中有调用请求,转到业务工作线程池关键代码

private void handleAccept() {
    final TNonblockingTransport client = doAccept();//取得客户端的连接
    if (client != null) {
        // Pass this connection to a selector thread
        final SelectorThread targetThread = threadChooser.nextThread();//获取目标SelectorThread

        if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {
            doAddAccept(targetThread, client);
        } else {
            // FAIR_ACCEPT
            try {
                invoker.submit(new Runnable() {// 提交client的业务给到工作线程
                    public void run() {
                        doAddAccept(targetThread, client);
                    }
                });
            } catch (RejectedExecutionException rx) {
                LOGGER.warn("ExecutorService rejected accept registration!", rx);
                // close immediately
                client.close();
            }
        }
    }
}

demo地址:
码云:http://git.oschina.net/shunyang/thrift-all/tree/master/thrift-demo
github:https://github.com/shunyang/thrift-all/tree/master/thrift-demo
本文参考文章:

http://www.cnblogs.com/mumuxinfei/p/3875165.html

http://blog.csdn.net/sunmenggmail/article/details/46818147

欢迎大家扫码关注我的微信公众号,与大家一起分享技术与成长中的故事。


我的微信公众号.jpg
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,080评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,422评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,630评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,554评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,662评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,856评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,014评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,752评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,212评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,541评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,687评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,347评论 4 331
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,973评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,777评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,006评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,406评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,576评论 2 349

推荐阅读更多精彩内容