Scalable IO in Java

经典的网络编程

一般网络编程都具有以下几个步骤:

  • 读取请求 Read request
  • 解码请求 Decode request
  • 处理服务 Process services
  • 加密回应 Encode reply
  • 发送回应 Send reply

但是每一步的处理的内容和成本都不一样。xml、json、file等等

image.png

每种类型的处理程序都需要在各自都线程中来进行,用代码表示就是如下

class Server implements Runnable {
    public void run() {
        try {
            ServerSocket ss = new ServerSocket(PORT);
            while (!Thread.interrupted())
                new Thread(new Handler(ss.accept())).start();
            // or, single-threaded, or a thread pool
        } catch (IOException ex) { /* ... */ }
    }

    static class Handler implements Runnable {
        final Socket socket;
        Handler(Socket s) { socket = s; }
        public void run() {
            try {
                byte[] input = new byte[MAX_INPUT];
                socket.getInputStream().read(input);
                byte[] output = process(input);
                socket.getOutputStream().write(output);
            } catch (IOException ex) { /* ... */ }
        }
        private byte[] process(byte[] cmd) { /* ... */ }
    }
}

如果当前运行线程没有被中断就一直循环创建一个线程或者线程池用来处理ServerSocket里面的Socket请求。

注意:Thread.interrupted()和Thread.isInterrupted()

这样会造成我们需要为每一个socket请求创建一个线程来处理对应的数据。一旦用户过多或者处理程序时间较长就会造成各种各样的问题。无法并发,前面的活没干完后面的需要等着,负载等等

优化方向

  • 增加负载

  • 增加硬件 (CPU, memory, disk, bandwidth)

  • 同时满足可用性和性能目标

  • 减短延迟

  • 满足高峰需求

  • 提高服务质量

  • 通常来说Divide-and-conquer(分而治之)是实现任何可扩展性目标的最佳方法

Divide-and-conquer(分而治之)

  • 将整体任务切割成小任务。每个小任务只执行单一任务,并且不会阻塞其他小任务的运行

  • 用IO事件来触发每个小任务的启动

  • java.nio 中支持的基本机制

  • 非阻塞读取和写入

  • 用感测到的IO事件来调度相关的任务

  • 事件驱动设计中可能出现的无尽变化

Event-driven Designs 事件驱动设计

  • 比较有效的方法

  • 占用更少的资源,每个客户端不一定需要单独创建一个线程

  • 减少开销。减少Context的切换可以相应的减少锁定

  • 调度可能会更慢,所以必须手动将动作绑定到事件

  • 更难的编程

  • 将动作分解为简单非阻塞的

  • 类似于GUI事件驱动的动作

  • 无法消除所有阻塞。比如:GC,页面错误等

  • 必须跟踪服务的逻辑状态

AWT事件机制

IO事件驱动使用相似的想法,但设计不同

image.png

java.awt是一个软件包,包含用于创建用户界面和绘制图形图像的所有分类

Reactor Pattern(反应堆模式)

  • Reactor通过调度来响应IO事件。如:AWT thread
  • Handler执行非阻塞动作。如:AWT ActionListeners
  • Manager将Handler绑定到事件上。如:AWT addActionListener

预先使用Manager将Handler绑定到指定的事件上,如onClick

用户点击按钮的时候,Reactor获取到事件,并调度事先绑定好的处理程序

经典的Reactor设计

单线程版本

image.png

java.nio 支持

  • Channels

  • 支持非阻塞的读取文件和socket连接

  • Buffers

  • Channels通过Buffers可以直接读取或者写入对象

  • Selectors

  • 通知一组Channels触发了哪些IO事件

  • SelectionKeys

  • 维护IO事件的状态和绑定

Reactor 实现

Setup

class Reactor implements Runnable {
        //Selector选择器
        final Selector selector;
        //Socket服务通道
        final ServerSocketChannel serverSocket;

        Reactor(int port) throws IOException {
            //创建一个Selector
            selector = Selector.open();
            //创建一个Socket Channel
            serverSocket = ServerSocketChannel.open();
            //将Socket Channel绑定到指定端口
            serverSocket.socket().bind(
                    new InetSocketAddress(port));
            //设置Socket Channel为非阻塞
            serverSocket.configureBlocking(false);
            //将Selector和Socket Channel注册到SelectionKey
            SelectionKey sk =
                    serverSocket.register(selector, SelectionKey.OP_ACCEPT);
            //将SelectionKey附加到接受者
            sk.attach(new Acceptor());
        }

        /*
        也可以使用SPI提供接口:
        SelectorProvider p = SelectorProvider.provider();
        selector = p.openSelector();
        serverSocket = p.openServerSocketChannel();
         */
    }

Dispatch Loop

// class Reactor continued
        public void run() { //通常在新线程中执行
            try {
                //如果当前线程没有中断就循环执行
                while (!Thread.interrupted()) {
                    //查询选择器中获取已经准备好的并且注册过的操作
                    selector.select();
                    //获取所有已经准备好的并且注册过的操作
                    Set selected = selector.selectedKeys();
                    //循环遍历
                    for (Object o : selected) {
                        //调度任务并处理事件操作
                        dispatch((SelectionKey) o);
                    }
                    //移除选择器
                    selected.clear();
                }
            } catch (IOException ex) { /* ... */ }
        }

        //处理事件操作
        void dispatch(SelectionKey k) {
            //获取SelectionKey中绑定的处理程序,如果不为空就执行
            Runnable r = (Runnable) (k.attachment());
            if (r != null)
                r.run();
        }

Acceptor

// class Reactor continued
        // 创建接收器
        class Acceptor implements Runnable {
            public void run() {
                try {
                    //获取连接成功到客户端连接
                    SocketChannel c = serverSocket.accept();
                    if (c != null) {
                        //如果不为空就处理客户端连接以及selector
                        new Handler(selector, c);
                    }
                } catch (IOException ex) { /* ... */ }
            }
        }

Handler setup

//处理程序
        final class Handler implements Runnable {
            //指定最大输入bytes
            private static final int MAXIN = 1024;
            //指定最大输出bytes
            private static final int MAXOUT = 1024;
            //客户端连接
            final SocketChannel socket;
            final SelectionKey sk;
            ByteBuffer input = ByteBuffer.allocate(MAXIN);
            ByteBuffer output = ByteBuffer.allocate(MAXOUT);
            static final int READING = 0, SENDING = 1;
            int state = READING;

            Handler(Selector sel, SocketChannel c) throws IOException {
                socket = c;
                //配置非阻塞模式
                c.configureBlocking(false);
                //将客户端连接和读注册到SelectionKey
                sk = socket.register(sel, SelectionKey.OP_READ);
                //将SelectionKey附加到当前线程的run
                sk.attach(this);
                //将SelectionKey的操作设置为读取
                sk.interestOps(SelectionKey.OP_READ);
                //唤醒Selector
                sel.wakeup();
            }
        }

Request handling

// class Handler continued

            //输入处理完成
            boolean inputIsComplete() { /* ... */
                return true;
            }

            //输出处理完成
            boolean outputIsComplete() { /* ... */
                return true;
            }

            //处理过程中
            void process() { /* ... */ }

            public void run() {
                try {
                    //根据不同的状态进行不同的处理程序
                    if (state == READING) read();
                    else if (state == SENDING) send();
                } catch (IOException ex) { /* ... */ }
            }

            //读取数据
            void read() throws IOException {
                //从客户端获取数据
                socket.read(input);
                //如果读取完成
                if (inputIsComplete()) {
                    //处理数据
                    process();
                    //标记为发送状态
                    state = SENDING;
                    // 将SelectionKey的操作设置为写入
                    sk.interestOps(SelectionKey.OP_WRITE);
                }
            }

            //发送数据
            void send() throws IOException {
                //将数据写入客户端连接
                socket.write(output);
                //发送完成后将SelectionKey中的绑定取消
                if (outputIsComplete()) sk.cancel();
            }
        }

Per-State Handlers

GoF State-Object pattern 状态模式,针对状态重新绑定对应的处理程序

//处理程序
        class Handler {
            // 初始化为读取状态
            public void run() { 
                //客户端读取数据
                socket.read(input);
                //读取完成
                if (inputIsComplete()) {
                    //处理数据
                    process();
                    //附加新的处理程序Sender
                    sk.attach(new Sender());
                    //标记状态为写入
                    sk.interest(SelectionKey.OP_WRITE);
                    //唤醒SelectionKey中绑定的Selector
                    sk.selector().wakeup();
                }
            }
            
            //处理程序Sender
            class Sender implements Runnable {
                public void run(){ // ...
                    //写入数据
                    socket.write(output);
                    //写入完成之后将SelectionKey中的绑定取消
                    if (outputIsComplete()) sk.cancel();
                }
            }
        }

Multithreaded Designs 多线程设计

  • 战略性的为扩展性增加线程

  • 主要适用于多处理器

  • 工作线程

  • Reactor可以快速的触发处理程序

  • 因为处理程序过多或者处理时间过程会减慢Reactor的速度

  • 将非IO处理放到其他的线程

  • 多个Reactor处理线程

  • Reactor线程任务过多会导致IO饱和

  • 分配一些任务给其他Reactor线程

  • 负载均衡以匹配CPU和IO速率

Worker Threads 工作线程设计

  • 将非IO处理放到其他的线程来加快Reactor线程

  • 比计算绑定处理重新处理为事件驱动的形式更简单

  • 应该仍然是纯非阻塞计算

  • 足够的处理胜过开销

  • 很难与IO重叠处理

  • 最好能先将所有输入读入缓冲区

  • 使用线程池可以进行调优和控制

  • 通常需要的线程数比客户端少得多

image.png

Handler with Thread Pool 多线程处理

class Handler implements Runnable {
            // 创建一个线程池 
            static PooledExecutor pool = new PooledExecutor(...);
            //设置处理状态
            static final int PROCESSING = 3;
            //读数据操作,设计到多线程读取需要加线程锁
            synchronized void read() { 
                //读取数据
                socket.read(input);
                //读取完成
                if (inputIsComplete()) {
                    //标记为处理状态
                    state = PROCESSING;
                    //将处理过程放到线程池中执行
                    pool.execute(new Processer());
                }
            }

            //处理数据线程
            class Processer implements Runnable {
                public void run() { processAndHandOff(); }
            }
            
            //处理数据并关闭
            synchronized void processAndHandOff() {
                //处理数据
                process();
                //标记处理完成并标记发送状态
                state = SENDING; // 或者绑定其他操作
                //将SelectionKey的操作设置为写入
                sk.interest(SelectionKey.OP_WRITE);
            }
        }

协调任务Coordinating Tasks

  • Handoffs 传递

  • 循环任务的启用、触发或调用下一个任务

  • 通常是最快的但同时也是脆弱的

  • 给每个处理程序触发回调

  • 设置状态、附加处理程序等等

  • 状态模式

  • Queues 队列

  • 比如跨阶段传递buffers

  • Futures

  • 当每个任务产生结果时触发

  • 协调层位于连接或等待/通知之上

Using PooledExecutor 使用线程池执行

  • 一个可优化的工作线程池

  • 主方法执行(Runnable r)

  • 控制

  • 任务队列的类型(任何通道)

  • 最大线程数

  • 最小线程数

  • "Warm" 与按需加载线程

  • 保持活动间隔,直到空闲线程死亡

  • 如有必要,稍后将其替换为新的

  • 饱和策略

  • 阻塞、下降、生产运行等

Multiple Reactor Threads 多个Reactor线程

  • 使用Reactor线程池

  • 用于匹配CPU和IO速率

  • 静态或动态构造

  • 每个Reactor都有自己的选择器,线程,调度循环

  • 主接收器分配到专用的Reactor

image.png

Using other java.nio features 使用其他的java.nio特性

  • 一个Reactor对应多个Selectors

  • 将不同的处理程序绑定到不同的IO事件

  • 调度需要仔细处理线程安全

  • 文件传输

  • 自动化的文件到网络或网络到文件的复制

  • 内存映射文件

  • 通过缓冲区访问文件

  • 直接访问缓冲区

  • 有可能实现零拷贝传输吗

  • 但是有设置和完成的开销

  • 最适合长时间连接的应用

Connection-Based Extensions 基础连接的扩展

  • 不能使用单个服务请求

  • 客户端连接

  • 客户端发送一系列消息/请求

  • 客户端断开连接

  • 举例

  • 数据库和事务监控器

  • 多人游戏,聊天等

  • 可以扩展基本的网络服务模式

  • 处理许多相对长连接的客户

  • 跟踪客户端和会话状态(包括丢弃)

  • 分布式部署服务

原文:Doug Lea Scalable IO in Java

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351