【Netty】从Java.IO到Java.NIO再到Netty

我们应该已经知道,Netty是一个基于NIO的异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。Netty在Java NIO的基础上提供了更高层的抽象和封装,因此要想对Netty有所深入了解,势必要对Java.NIO有所了解,而NIO是对传统IO由阻塞向异步非阻塞IO的巨大跨越,因此了解传统Java.IO对了解Java.NIO也大有裨益。

传统IO弊端
首先我们看下传统IO的网络编程的一个简单例子,由此将进入对传统Java.IO的介绍:

    public static void main(String[] agrs) throws Exception{
        ServerSocket serverSocket = new ServerSocket(8899);
        while (true) {    
            Socket socket = serverSocket.accept();      
            new Thread(() -> {
                try {
                    BufferedReader input = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                    String clientInputStr = input.readLine(); 
                    
                    System.out.println("客户端"+socket.getRemoteSocketAddress()+"发过来的内容:" + clientInputStr); 
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();    
        } 
    }

java.io最核心的一个概念是流(Stream),java.io可以看成是面向流的编程,它将数据的输入输出抽象为流,流是一组有顺序的,单向的,有起点和终点的数据集合,就像水流。在Java中,一个流要么是输入流,要么是输出流。按照流中的最小数据单元又分为字节流和字符流。

字节流:以 8 位(即1byte=8bit)作为一个数据单元,数据流中最小的数据单元是字节。它的顶级父类是InputStream和OutputStream。
字符流:以 16 位(即1char=2byte=16bit)作为一个数据单元,数据流中最小的数据单元是字符, Java 中的字符是 Unicode 编码,一个字符占用两个字节。它的顶级父类是Reader和Writer。

所有的Java IO流都是阻塞的,这意味着,当一条线程执行accept(),read()或者write()方法时,这条线程会一直阻塞直到有连接请求,或读取到了一些数据或者要写出去的数据已经全部写出,在这期间这条线程不能做任何其他的事情,而如果想支持多个连接,那就需为每个连接新开个线程去支持读写操作,如上代码所示。这种模式在用户负载增加时,性能将下降非常的快(大家应该都知道无限开线程的后果)。

非阻塞Reactor模式引入
随着网络应用的发展和网络服务的用户逐渐增多,需要有一种新的方案去解决传统网络的这种问题,在上世纪90年代,便提出了一种Reactor模式,Reactor模式是一种高并发事件驱动的网络服务模式,它的实现可以用Java实现、C++实现或其他语言实现,而java.nio就是依照reactor模式设计的,此外,其他的一些框架也采用(或实现)了Reactor模式,如Redis,Nginx,Netty(Netty是Java NIO更高层的抽象)等。Reactor的框架及流程图如下所示(参照Douglas C. Schmidt 的《Reactor》):

reactor模式及逻辑流程.png

它的结构包括了5个部分,因为这些结构和Java nio的实现有些出入(有些对不上号,有些需用户自己实现),此不介绍了,感兴趣的读者可以参考Douglas C. Schmidt 的《Reactor》一文介绍。而大家对Reactor模式的认识更喜好Doug Lea 《Scalable IO in Java》中的一文介绍。如下图所示:

单线程的Reactor模式

Java NIO网络编程
已知了java.nio就是依照reactor模式设计的,我们再看一个Java NIO网络编程的一个简单例子,由此将进入对传统Java NIO的介绍:

public static void main(String[] agrs) throws Exception{
        int portsNum = 5;
        int[] ports = new int[portsNum];
        for (int i = 0; i < portsNum; i++){
            ports[i] = 9000 + i;
        }
        
        Selector selector = Selector.open();
        
        for (int i = 0; i < ports.length; i++){
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress(ports[i]));
            
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("服务端监听端口:" + ports[i]);
        }
        
        while(true) {
            selector.select();
            
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            for(Iterator<SelectionKey> it = selectedKeys.iterator(); it.hasNext();){
                SelectionKey selectionKey = it.next();
                
                if (selectionKey.isAcceptable()){
                    ServerSocketChannel serverSocketChannel = (ServerSocketChannel)selectionKey.channel();
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    
                    socketChannel.register(selector, SelectionKey.OP_READ);
                    
                    it.remove();
                    System.out.println("接受客户端连接:" + socketChannel);
                }else if (selectionKey.isReadable()){
                    SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
                    
                    int byteRead = 0;
                    while(true){
                        ByteBuffer byteBuffer = ByteBuffer.allocate(512);
                        byteBuffer.clear();
                        
                        int read = socketChannel.read(byteBuffer);
                        if (read <= 0){
                            break;
                        }
                        
                        byteBuffer.flip();
                        socketChannel.write(byteBuffer);
                        byteRead += read;
                    }
                    
                    it.remove();
                    System.out.println("读取:" + byteRead + ", 来自:" + socketChannel);
                }
            }
        }
    }

从上述代码可以看出,一个单线程的java.nio的网络编程流程基本为:
1.创建一个selector;
2.创建一个或多个Channel通道,注册到selector,并注册关心事件;
3.调用select()方法,阻塞等待关心事件发生,关心事件发生后,通过循环SelectionKey集合,再通过SelectionKey获取相关联的通道处理相应事件。
(tips:即使上面的serverSocketChannel.configureBlocking(false);及关心事件用完即删it.remove();在Netty中也会有相应源码)

由此再看[reactor模式及逻辑流程.png]一图,java.nio的流程和该图中的结构及流程图是非常相似的,
Selector:相当于Synchronous Event Demultiplexer。
SelectionKey: 相当于event,和一个SocketChannel关联。
SocketChannel:相当于handle。
java nio中没有提供initial dispatcher的抽象,这部分功能需要用户自行实现。
java nio中没有提供event handler的抽象,这部分功能需要用户自行实现。

Java NIO有三大核心组件:
1.Channel
Java NIO中的所有I/O操作都基于Channel对象,就像流操作都要基于Stream对象一样。一个Channel(通道)代表和某一实体的连接,这个实体可以是文件、网络套接字等。也就是说,通道是Java NIO提供的一座桥梁,用于我们的程序和操作系统底层I/O服务进行交互。但是,一个通道,既可以读又可以写,而一个Stream是单向的。

2.Buffer
NIO中所使用的缓冲区不是一个简单的byte数组,而是封装过的Buffer类,通过它提供的API,我们可以灵活的操纵数据。在Java NIO当中,我们是面向(块)或是缓冲区(buffer)编程的。Buffer本身就是一块内存,底层实现除了数组之外,Buffer还提供了对于数据的结构化访问方式,并且可以追踪到系统的读写过程。
NIO提供了多种 Buffer 类型与Java基本类型相对应(但没有BoolBuffer),如ByteBuffer、CharBuffer、IntBuffer等,区别就是读写缓冲区时的单位长度不一样。Buffer中有3个很重要的变量,它们是理解Buffer工作机制的关键,分别是capacity (总容量)、position (指针当前位置)、limit (读/写边界位置)。

3.Selector
Selector(选择器)是一个特殊的组件,用于采集各个通道的状态(或者说事件)。我们先将通道注册到选择器,并设置好关心的事件,然后就可以通过调用select()方法,静静地等待事件发生。
通道有如下4个事件可供我们监听:
Accept:有可以接受的连接
Connect:连接成功
Read:有数据可读
Write:可以写入数据了

为什么要用Selector?
如果用阻塞I/O,需要多线程(浪费内存),如果用非阻塞I/O,需要不断重试(耗费CPU)。Selector的出现解决了这尴尬的问题,非阻塞模式下,通过Selector,我们的线程只为已就绪的通道工作,不用盲目的重试了。比如,当所有通道都没有数据到达时,也就没有Read事件发生,我们的线程会在select()方法处被挂起,从而让出了CPU资源。
【注:从上述三大组件至此大部分摘自 一文让你彻底理解 Java NIO 核心组件

Netty对Java NIO的抽象
了解了Java NIO及知道java.nio的网络编程流程后,我们再看Netty源码就显得轻松点了,下面我们将在netty源码中找出java.nio网络编程的影子。

1.创建一个selector;
在创建boss线程组时,我们会调用EventLoopGroup bossGroup = new NioEventLoopGroup();方法,一直点NioEventLoopGroup实现,最后会来到NioEventLoop实现,由selector = openSelector();一句,每个NioEventLoop都会创建一个selector。

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        selector = openSelector();
        selectStrategy = strategy;
    }

2.创建一个或多个Channel通道,注册到selector,并注册关心事件;
Netty中创建NioServerSocketChannel通道,并注册到boss线程的selector中的源码在bootstrap.bind(address)中,即服务端的ip和端口绑定方法中,先在initAndRegister()方法的

    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            ...
        }

        ChannelFuture regFuture = config().group().register(channel);
    }

ChannelFuture regFuture = config().group().register(channel);一句,由register(channel)方法及类的继承关系,进入SingleThreadEventLoop的register(Channel channel)方法

public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }

然后在在AbstractChannel的register(EventLoop eventLoop, final ChannelPromise promise)方法中

        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ...
            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    ...
                }
            }
        }

进入该类下私有的register0(promise);方法,

private void register0(ChannelPromise promise) {
            try {
                ...
                boolean firstRegistration = neverRegistered;
                doRegister();
                neverRegistered = false;
                registered = true;
                ...
            } catch (Throwable t) {
            }
        }

最后进入doRegister();的实现类重写方法,即AbstractNioChannel下的doRegister()方法中,

protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    throw e;
                }
            }
        }
    }

由javaChannel().register(eventLoop().selector, 0, this);一句,可见将NioServerSocketChannel通道,注册到了boss线程的selector中。

我们再看客户端的channel注册:
boss线程启动后,会监听客户端的连接,主要是在下面代码中监听的(见《Netty的启动过程二》一文介绍):

            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }

在《Netty的启动过程二》中知道,当有客户端请求连接时,会进入内部类NioMessageUnsafe的read()方法中,继而在doReadMessages(readBuf);方法,然后在buf.add(new NioSocketChannel(this, ch));一句中,创建用于读取和写入数据的NioSocketChannel,并注册关心事件SelectionKey.OP_READ;此后,仍然在内部类NioMessageUnsafe的read()方法中,在pipeline.fireChannelRead(readBuf.get(i))方法,在经历NioServerSocketChannel的pipeline中首尾handler的read方法,最终来到了ServerBootstrapAcceptor的channelRead(ChannelHandlerContext ctx, Object msg)方法(上述过程详见《Netty的启动过程二):

        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;
            child.pipeline().addLast(childHandler);
            for (Entry<ChannelOption<?>, Object> e: childOptions) {
                if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                    logger.warn("Unknown channel option: " + e);
                }
            }
            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }

            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        }

由childGroup.register(child)一句,便和NioServerSocketChannel通道注册一样,将NioSocketChannel注册到了worker线程的selector中。

3.调用select()方法,阻塞等待关心事件发生,关心事件发生后,通过循环SelectionKey集合,再通过SelectionKey获取相关联的通道处理相应事件。
由《Netty的启动过程二》一文其实已经写明了,它是在这里无限循环读取NioServerSocketChannel(NioSocketChannel)上发生的关心事件然后各自channel的handler处理的。

    @Override
    protected void run() {
        for (;;) {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        ...
                }
                processSelectedKeys();
        }
    }

从上述分析可知,Netty很巧妙的封装了Java NIO支持,提供了reactor的所有封装,在一定程度上简化了nio网络编程,用户在使用中只需实现网络数据包的event handler即可。当然,还需了解服务端和客户端的启动模式,并知晓如何监听连接的,如何读取数据的,及数据在ChannelPipeline中的流向,再以netty自带的编解码工具,出站入站适配工具,便可对Netty上手了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352

推荐阅读更多精彩内容