Reactor模式

什么是Reactor模式

Reactor模式是一种设计模式,它是基于事件驱动的,可以并发的处理多个服务请求,当请求抵达后,依据多路复用策略,同步的派发这些请求至相关的请求处理程序。

Reactor模式角色构成

在早先的论文An Object Behavioral Pattern for
Demultiplexing and Dispatching Handles for Synchronous Events
中Reactor模式主要有五大角色组成,分别如下:

Handle:操作系统提供的一种资源,用于表示一个个的事件,在网络编程中可以是一个连接事件,一个读取事件,一个写入事件,Handle是事件产生的发源地
Synchronous Event Demultiplexer:本质上是一个系统调用,用于等待事件的发生,调用方在调用它的时候会被阻塞,一直阻塞到同步事件分离器上有事件产生为止
Initiation Dispatcher:定义了一些用于控制事件的调度方式的规范,提供对事件管理。它本身是整个事件处理器的核心所在,Initiation Dispatcher会通过Synchronous Event Demultiplexer来等待事件的发生。一旦事件发生,Initiation Dispatcher首先会分离出每一个事件,然后调用事件处理器,最后调用相关的回调方法来处理这些事件
Event Handler:定义事件处理方法以供InitiationDispatcher回调使用
Concrete Event Handler:是事件处理器的实现。它本身实现了事件处理器所提供的各种回调方法,从而实现了特定于业务的逻辑。它本质上就是我们所编写的一个个的处理器实现。

img

Reactor模式实现流程

  1. 初始化 Initiation Dispatcher,然后将若干个Concrete Event Handler注册到 Initiation Dispatcher中,应用会标识出该事件处理器希望Initiation Dispatcher在某些事件发生时向其发出通知
  2. Initiation Dispatcher 会要求每个事件处理器向其传递内部的Handle,该Handle向操作系统标识了事件处理器
  3. 当所有的Concrete Event Handler都注册完毕后,就会启动 Initiation Dispatcher的事件循环,使用Synchronous Event Demultiplexer同步阻塞的等待事件的发生
  4. 当与某个事件源对应的Handle变为ready状态时,Synchronous Event Demultiplexer就会通知 Initiation Dispatcher
  5. Initiation Dispatcher会触发事件处理器的回调方法响应这个事件
img

Java NIO对Reactor的实现

在Java的NIO中,对Reactor模式有无缝的支持,即使用Selector类封装了操作系统提供的Synchronous Event Demultiplexer功能。Doug Lea(Java concurrent包的作者)在Scalable IO in Java中对此有非常详细的描述。概况来说其主要流程如下:

  1. 服务器端的Reactor线程对象会启动事件循环,并使用Selector来实现IO的多路复用
  2. 注册Acceptor事件处理器到Reactor中,Acceptor事件处理器所关注的事件是ACCEPT事件,这样Reactor会监听客户端向服务器端发起的连接请求事件
  3. 客户端向服务器端发起连接请求,Reactor监听到了该ACCEPT事件的发生并将该ACCEPT事件派发给相应的Acceptor处理器来进行处理。Acceptor处理器通过accept()方法得到与这个客户端对应的连接(SocketChannel),然后将该连接所关注的READ/WRITE事件以及对应的READ/WRITE事件处理器注册到Reactor中,这样一来Reactor就会监听该连接的READ/WRITE事件了。
  4. 当Reactor监听到有读或者写事件发生时,将相关的事件派发给对应的处理器进行处理
  5. 每当处理完所有就绪的感兴趣的I/O事件后,Reactor线程会再次执行select()阻塞等待新的事件就绪并将其分派给对应处理器进行处理

Doug Lea 在Scalable IO in Java中分别描述了单线程的Reactor,多线程模式的Reactor以及多Reactor线程模式。

单线程的Reactor,主要依赖Java NIO中的Channel,Buffer,Selector,SelectionKey。在单线程Reactor模式中,不仅I/O操作在该Reactor线程上,连非I/O的业务操作也在该线程上进行处理了,这可能会大大延迟I/O请求的响应

img

在多线程Reactor中添加了一个工作线程池,将非I/O操作从Reactor线程中移出转交给工作者线程池来执行。这样能够提高Reactor线程的I/O响应,不至于因为一些耗时的业务逻辑而延迟对后面I/O请求的处理,但是所有的I/O操作依旧由一个Reactor来完成,包括I/O的accept()、read()、write()以及connect()操作

img

多Reactor线程模式将“接受客户端的连接请求”和“与该客户端的通信”分在了两个Reactor线程来完成。mainReactor完成接收客户端连接请求的操作,它不负责与客户端的通信,而是将建立好的连接转交给subReactor线程来完成与客户端的通信,这样一来就不会因为read()数据量太大而导致后面的客户端连接请求得不到即时处理的情况。并且多Reactor线程模式在海量的客户端并发请求的情况下,还可以通过实现subReactor线程池来将海量的连接分发给多个subReactor线程,在多核的操作系统中这能大大提升应用的负载和吞吐量

img

代码示例:

// NIO selector 多路复用reactor线程模型
public class NIOReactor {

  // 处理业务操作的线程池
  private static ExecutorService workPool = Executors.newCachedThreadPool();

  // 封装了selector.select()等事件轮询的代码
  abstract class ReactorThread extends Thread {

    Selector selector;
    LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();

    volatile boolean running = false;

    private ReactorThread() throws IOException {
      selector = Selector.open();
    }

    // Selector监听到有事件后,调用这个方法
    public abstract void handler(SelectableChannel channel) throws Exception;

    @Override
    public void run() {
      // 轮询Selector事件
      while (running) {
        try {
          // 执行队列中的任务
          Runnable task;
          while ((task = taskQueue.poll()) != null) {
            task.run();
          }
          selector.select(1000);
          // 获取查询结果
          Set<SelectionKey> selectionKeys = selector.selectedKeys();
          // 遍历查询结果
          Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
          while (keyIterator.hasNext()) {
            // 被封装的查询结果
            SelectionKey selectionKey = keyIterator.next();
            keyIterator.remove();
            int readyOps = selectionKey.readyOps();
            // 关注 Read 和 Accept两个事件
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0
                || readyOps == 0) {
              try {
                SelectableChannel channel = (SelectableChannel) selectionKey.attachment();
                channel.configureBlocking(false);
                handler(channel);
                // 如果关闭了,就取消这个KEY的订阅
                if (!channel.isOpen()) {
                  selectionKey.cancel();
                }

              } catch (Exception e) {
                // 如果有异常,就取消这个KEY的订阅
                selectionKey.cancel();
                e.printStackTrace();
              }
            }
          }

        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    }

    private SelectionKey register(SelectableChannel channel) throws Exception {
      // 为什么register要以任务提交的形式,让reactor线程去处理?
      // 因为线程在执行channel注册到selector的过程中,会和调用selector.select()方法的线程争用同一把锁
      // 而select()方法实在eventLoop中通过while循环调用的,争抢的可能性很高,
      // 为了让register能更快的执行,就放到同一个线程来处理
      FutureTask<SelectionKey> futureTask =
          new FutureTask<>(() -> channel.register(selector, 0, channel));
      taskQueue.add(futureTask);
      return futureTask.get();
    }

    private void doStart() {
      if (!running) {
        running = true;
        start();
      }
    }
  }

  private ServerSocketChannel serverSocketChannel;

  // 1、创建多个线程 - accept处理reactor线程 (accept线程)
  private ReactorThread[] mainReactorThreads = new ReactorThread[1];

  // 2、创建多个线程 - io处理reactor线程  (I/O线程)
  private ReactorThread[] subReactorThreads = new ReactorThread[8];

  // 初始化线程组
  private void newGroup() throws IOException {
    // 创建mainReactor线程, 只负责处理serverSocketChannel
    for (int i = 0; i < mainReactorThreads.length; i++) {
      mainReactorThreads[i] =
          new ReactorThread() {
            AtomicInteger incr = new AtomicInteger(0);

            @Override
            public void handler(SelectableChannel channel) throws Exception {
              // 只做请求分发,不做具体的数据读取
              ServerSocketChannel ch = (ServerSocketChannel) channel;
              SocketChannel socketChannel = ch.accept();
              socketChannel.configureBlocking(false);
              // 收到连接建立的通知之后,分发给I/O线程继续去读取数据
              int index = incr.getAndIncrement() % subReactorThreads.length;
              ReactorThread workEventLoop = subReactorThreads[index];
              workEventLoop.doStart();
              SelectionKey selectionKey = workEventLoop.register(socketChannel);
              selectionKey.interestOps(SelectionKey.OP_READ);
              System.out.println(
                  Thread.currentThread().getName() + "收到新连接 : " + socketChannel.getRemoteAddress());
            }
          };
    }

    // 创建IO线程,负责处理客户端连接以后socketChannel的IO读写
    for (int i = 0; i < subReactorThreads.length; i++) {
      subReactorThreads[i] =
          new ReactorThread() {

            @Override
            public void handler(SelectableChannel channel) throws Exception {
              // work线程只负责处理IO处理,不处理accept事件
              SocketChannel ch = (SocketChannel) channel;
              ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
              while (ch.isOpen() && ch.read(requestBuffer) != -1) {
                // 长连接情况下,需要手动判断数据有没有读取结束 (此处做一个简单的判断: 超过0字节就认为请求结束了)
                if (requestBuffer.position() > 0) break;
              }
              if (requestBuffer.position() == 0) return; // 如果没数据了, 则不继续后面的处理
              requestBuffer.flip();
              byte[] content = new byte[requestBuffer.limit()];
              requestBuffer.get(content);
              System.out.println(new String(content));
              System.out.println(
                  Thread.currentThread().getName() + "收到数据,来自:" + ch.getRemoteAddress());

              // TODO 业务操作 数据库、接口...
              workPool.submit(() -> {});

              // 响应结果 200
              String response =
                  "HTTP/1.1 200 OK\r\n" + "Content-Length: 11\r\n\r\n" + "Hello World";
              ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
              while (buffer.hasRemaining()) {
                ch.write(buffer);
              }
            }
          };
    }
  }

  // 始化channel,并且绑定一个eventLoop线程
  private void initAndRegister() throws Exception {
    // 1、 创建ServerSocketChannel
    serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.configureBlocking(false);
    // 2、 将serverSocketChannel注册到selector
    int index = new Random().nextInt(mainReactorThreads.length);
    mainReactorThreads[index].doStart();
    SelectionKey selectionKey = mainReactorThreads[index].register(serverSocketChannel);
    selectionKey.interestOps(SelectionKey.OP_ACCEPT);
  }

  // 绑定端口
  private void bind() throws IOException {
    //  1、 正式绑定端口,对外服务
    serverSocketChannel.bind(new InetSocketAddress(8080));
    System.out.println("启动完成,端口8080");
  }

  public static void main(String[] args) throws Exception {
    NIOReactor nioReactor = new NIOReactor();
    // 1、 创建main和sub两组线程
    nioReactor.newGroup();
    // 2、 创建serverSocketChannel,注册到mainReactor线程上的selector上
    nioReactor.initAndRegister();
    // 3、 为serverSocketChannel绑定端口
    nioReactor.bind();
  }
}

扫码_搜索联合传播样式-微信标准绿版.png
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容