Netty学习笔记

NIO

public class NIOServer {

    /*标识数字*/
    private  int flag = 0;
    /*缓冲区大小*/
    private  int BLOCK = 4096;
    /*接受数据缓冲区*/
    private  ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK);
    /*发送数据缓冲区*/
    private  ByteBuffer receivebuffer = ByteBuffer.allocate(BLOCK);
    private  Selector selector;

    public NIOServer(int port) throws IOException {
        // 打开服务器套接字通道
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 服务器配置为非阻塞
        serverSocketChannel.configureBlocking(false);
        // 检索与此通道关联的服务器套接字
        ServerSocket serverSocket = serverSocketChannel.socket();
        // 进行服务的绑定
        serverSocket.bind(new InetSocketAddress(port));
        // 通过open()方法找到Selector
        selector = Selector.open();
        // 注册到selector,等待连接
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("Server Start----8888:");
    }


    // 监听
    private void listen() throws IOException {
        while (true) {
            // 阻塞 直到有事件到达
            selector.select();
            // 返回此选择器的已选择键集。
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                iterator.remove();
                handleKey(selectionKey);
            }
        }
    }

    // 处理请求
    private void handleKey(SelectionKey selectionKey) throws IOException {
        // 接受请求
        ServerSocketChannel server = null;
        SocketChannel client = null;
        String receiveText;
        String sendText;
        int count=0;
        // 测试此键的通道是否已准备好接受新的套接字连接。
        if (selectionKey.isAcceptable()) {
            // 返回为之创建此键的通道。
            server = (ServerSocketChannel) selectionKey.channel();
            // 接受到此通道套接字的连接。
            // 此方法返回的套接字通道(如果有)将处于阻塞模式。
            client = server.accept();
            // 配置为非阻塞
            client.configureBlocking(false);
            // 注册到selector,等待连接
            client.register(selector, SelectionKey.OP_READ);
        } else if (selectionKey.isReadable()) {
            // 返回为之创建此键的通道。
            client = (SocketChannel) selectionKey.channel();
            //将缓冲区清空以备下次读取
            receivebuffer.clear();
            //读取服务器发送来的数据到缓冲区中
            count = client.read(receivebuffer);
            if (count > 0) {
                receiveText = new String( receivebuffer.array(),0,count);
                System.out.println("服务器端接受客户端数据--:"+receiveText);
                client.register(selector, SelectionKey.OP_WRITE);
            }
        } else if (selectionKey.isWritable()) {
            //将缓冲区清空以备下次写入
            sendbuffer.clear();
            // 返回为之创建此键的通道。
            client = (SocketChannel) selectionKey.channel();
            sendText="message from server--" + flag++;
            //向缓冲区中输入数据
            sendbuffer.put(sendText.getBytes());
            //将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向服务器,就要复位
            sendbuffer.flip();
            //输出到通道
            client.write(sendbuffer);
            System.out.println("服务器端向客户端发送数据--:"+sendText);
            client.register(selector, SelectionKey.OP_READ);
        }
    }

    /**
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        int port = 8888;
        NIOServer server = new NIOServer(port);
        server.listen();
    }
}
public class NIOClient {

    /*标识数字*/
    private static int flag = 0;
    /*缓冲区大小*/
    private static int BLOCK = 4096;
    /*接受数据缓冲区*/
    private static ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK);
    /*发送数据缓冲区*/
    private static ByteBuffer receivebuffer = ByteBuffer.allocate(BLOCK);
    /*服务器端地址*/
    private final static InetSocketAddress SERVER_ADDRESS = new InetSocketAddress(
            "localhost", 8888);

    public static void main(String[] args) throws IOException {
        // 打开socket通道
        SocketChannel socketChannel = SocketChannel.open();
        // 设置为非阻塞方式
        socketChannel.configureBlocking(false);
        // 打开选择器
        Selector selector = Selector.open();
        // 注册连接服务端socket动作
        socketChannel.register(selector, SelectionKey.OP_CONNECT);
        // 连接
        socketChannel.connect(SERVER_ADDRESS);
        // 分配缓冲区大小内存

        Set<SelectionKey> selectionKeys;
        Iterator<SelectionKey> iterator;
        SelectionKey selectionKey;
        SocketChannel client;
        String receiveText;
        String sendText;
        int count=0;

        while (true) {
            //选择一组键,其相应的通道已为 I/O 操作准备就绪。
            //此方法执行处于阻塞模式的选择操作。
            selector.select();
            //返回此选择器的已选择键集。
            selectionKeys = selector.selectedKeys();
            //System.out.println(selectionKeys.size());
            iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                selectionKey = iterator.next();
                if (selectionKey.isConnectable()) {
                    System.out.println("client connect");
                    client = (SocketChannel) selectionKey.channel();
                    // 判断此通道上是否正在进行连接操作。
                    // 完成套接字通道的连接过程。
                    if (client.isConnectionPending()) {
                        client.finishConnect();
                        System.out.println("完成连接!");
                        sendbuffer.clear();
                        sendbuffer.put("Hello,Server".getBytes());
                        sendbuffer.flip();
                        client.write(sendbuffer);
                    }
                    client.register(selector, SelectionKey.OP_READ);
                } else if (selectionKey.isReadable()) {
                    client = (SocketChannel) selectionKey.channel();
                    //将缓冲区清空以备下次读取
                    receivebuffer.clear();
                    //读取服务器发送来的数据到缓冲区中
                    count=client.read(receivebuffer);
                    if(count>0){
                        receiveText = new String( receivebuffer.array(),0,count);
                        System.out.println("客户端接受服务器端数据--:"+receiveText);
                        client.register(selector, SelectionKey.OP_WRITE);
                    }

                } else if (selectionKey.isWritable()) {
                    sendbuffer.clear();
                    client = (SocketChannel) selectionKey.channel();
                    sendText = "message from client--" + (flag++);
                    sendbuffer.put(sendText.getBytes());
                    //将缓冲区各标志复位,因为向里面put了数据标志被改变要想从中读取数据发向服务器,就要复位
                    sendbuffer.flip();
                    client.write(sendbuffer);
                    System.out.println("客户端向服务器端发送数据--:"+sendText);
                    client.register(selector, SelectionKey.OP_READ);
                }
            }
            selectionKeys.clear();
        }
    }
}

Buffer

NIO ByteBuffer

//重要属性
private int position;
private int limit;
private int capacity;

写模式时 position指向当前待写的索引处,limitcapacity都指向数组索引最大值+1处

写模式

读模式 需要调用flip()方法

  public final Buffer flip() {
        limit = position;
        position = 0;
        mark = -1;
        return this;
    }

position置为0,limit置为写模式下postion,标记最大可读位置。capacity不变表示此buffer内存数组最大容量。

读模式

Netty ByteBuf

Neety ByteBuf不同于NIO ByteBuffer,ByteBuffer读写模式使用同一套索引,转换需要调用flip。而ByteBuf同时维护了两个索引:读索引和写索引,readIndexwriteIndex。有三种种类的buffer,堆缓冲/直接缓冲/混合缓冲

heap buffers

即在JVM堆中分配的字节数组,最普遍的缓冲形式,分配释放也较为廉价,一般使用方式:


head buffers

direct buffers

直接缓存由JVM通过本地调用在对外进行分配,分配释放比heap buffers更为昂贵,但是效率高省去了一次拷贝,因为堆内存在socket中进行传输之前,都需要进行一次heap buffer 到 direct buffer的拷贝,使用方式:


direct buffers

composite buffers

对各种ByteBuf的组合使用,相当于ByteBuf的容器,使用方式:


composite buffers

引用计数 refCount

ByteBuf采用引用计数,ByteBuf被分配后reCount=1,调用retain()方法refCount+1,调用release()方法refCount-1.当refCount==0时代表ByeBuf生命周期已过,会被释放或者归池。调用refCount==0的ByteBuf将会抛出IllegalReferenceCountException。需要注意的是有些过程是netty内部帮助release的,比如SimpleChannelInboundHandler

 @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;
        try {
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I imsg = (I) msg;
                channelRead0(ctx, imsg);
            } else {
                release = false;
                ctx.fireChannelRead(msg);
            }
        } finally {
            if (autoRelease && release) {
                ReferenceCountUtil.release(msg);
            }
        }
    }

默认构造器实现类autoRelease为true,channelRead0模板方法由用户进行实现,处理信息,处理完成后可以看到netty帮助释放了。
ByteBuf拷贝,copy()进行深拷贝即内部字节数组也会即你想嗯拷贝,duplicate()进行浅拷贝,只对readIndex和writeIndex进行拷贝,但共用同一个字节数组。同样readSlice()浅拷贝,readBytes()深拷贝。

Netty

Netty服务端典型代码:

final EchoServerHandler serverHandler = new EchoServerHandler();
        EventLoopGroup boosGroup = new NioEventLoopGroup(1);
        EventLoopGroup workGroup = new NioEventLoopGroup(3);
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(boosGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(serverHandler);
                        }
                    })
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture f = b.bind().sync();
            System.out.println(EchoServer.class.getName() +
                    " started and listening for connections on " + f.channel().localAddress());
            f.channel().closeFuture().sync();
        } finally {
            boosGroup.shutdownGracefully().sync();
            workGroup.shutdownGracefully().sync();
        }

ServerSocketChannel初始化

ServerBootstrap中进行Channel等初始化工作实在bind()方法中进行的,一系列调用链后在initAndRegister()方法中进行ServerSocketChannel的初始化工作。

 final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

这里重要的是channel实例化和channel初始化两部操作,channelFactory.newChannel()使用Channel的反射工厂生成一个Channel实例。

private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }
    /**
     * Create a new instance
     */
    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    /**
     * Create a new instance using the given {@link SelectorProvider}.
     */
    public NioServerSocketChannel(SelectorProvider provider) {
        this(newSocket(provider));
    }
    /**
     * Create a new instance using the given {@link ServerSocketChannel}.
     */
    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

使用默认provider构建了一个java nio的ServerSocketChannel实例,后再调用重载构造方法,设置感兴趣事件OP_ACCEPT用于监听客户端连接事件,构造器一直调用上层构造器,其中在父类AbstractCHannel中实例化Unsafe和Pipeline,Unsafe封装了Java底层的Socket操作,实际上是沟通 Netty 上层和 Java 底层的重要的桥梁。
实例化过程:

  • 调用 NioServerSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER) 打开一个新的 Java NIO ServerSocketChannel

  • AbstractChannel(Channel parent) 中初始化 AbstractChannel 的属性:

    • parent 属性置为 null

    • unsafe 通过newUnsafe() 实例化一个 unsafe 对象, 它的类型是 AbstractNioMessageChannel#AbstractNioUnsafe 内部类

    • pipeline 是 new DefaultChannelPipeline(this) 新创建的实例.

  • AbstractNioChannel 中的属性:

    • SelectableChannel ch 被设置为 Java ServerSocketChannel, 即 NioServerSocketChannel#newSocket 返回的 Java NIO ServerSocketChannel.

    • readInterestOp 被设置为 SelectionKey.OP_ACCEPT

    • SelectableChannel ch 被配置为非阻塞的 ch.configureBlocking(false)

  • NioServerSocketChannel 中的属性:

    • ServerSocketChannelConfig config = new NioServerSocketChannelConfig(this, javaChannel().socket())

接下来是Channel的初始化工作:

@Override
    void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
        }

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

init()方法中首先设置配置Options和Attributes,重点在于最后向Pipeline中添加一个ServerBootstrapAcceptor,ServerBootStrapAcceptor继承自ChannelInboundHandlerAdapter,核心是其中的channelRead()方法

@Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

            child.pipeline().addLast(childHandler);

            setChannelOptions(child, childOptions, logger);

            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }

            try {
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }

当有客户端连接时,连接后的SocketChannel会有Pipeline进行链式调用,于是就会调用该channelRead方法,首先将ServerBootStrap中配置的childHanlder添加到SocketChannel的Pipeline中,然后设置SocketChannel的Options和Attributes,最后将SocketChannel注册到workGroup,绑定到一个EventLoop上,完成整个ServerChannel初始化工作。

EventLoopGroup

image

EventLoopGroup相当于线程池,EventLoop相当于线程池中的线程用于处理相关的IO事件。Server拥有两个线程池,bossGroup用于接受客户端连接事件,workGroup用于处理连接后SocketChannel产生的IO事件。一旦SocketChannel就绪就会在workGroup中进行注册,平均注册到一个EventLoop,此后该Channel就会与该EventLoop绑定,一个Channel对应一个EventLoop,但一个EventLoop可以对应多个Channel。

ChannelHandler

ChannelHandler为Netty消息的处理器,分为InboundChannelHandler和OutbountChannelHandler,对于Channel的read消息由InbountChannelHandler处理,write消息由OutbountChannelHandler处理。常见的几种处理器有ChannelInitializer是InbountChannelHandler,一般用于向Pipline中添加其他Handler,只会调用一次,完成后就会从Pipline中移除该Handler。Decoder和Encoder解码和编码器,Decoder为InbountHandler,Encoder为OutbountHandler。

ChannelPipline

一个Channel对应了一个CahnnelPipeline,维护保存了ChannelHandler的链式结构,具体如图:


ChannelPipeline

ChannelContext

ChannelContext是对ChannelHandler的封装,在ChannelHandler添加进Pipeline时生成,其实Pipeline中的链式结构保存的是ChannelContext的双向链表,ChannelHandler是不知道所在Pipeline中前后ChannelHandler的,但ChannelContext保存有前后ChannelContext的索引。ChannelContext调用下一个处理器相关方法是以fire为开头的,比如context.fireChannelRead()即调用下一个处理器的channelRead操作。
还有需要注意的是pipeline和channel调用write()都是从第一个outbount开始链式处理,但context调用write()是从当前context的下一个outbount开始链式处理。

//channel
   @Override
    public ChannelFuture write(Object msg) {
        return pipeline.write(msg);
    }
//pipeline
    @Override
    public final ChannelFuture write(Object msg) {
        return tail.write(msg);
    }
//context
        private void write(Object msg, boolean flush, ChannelPromise promise) {
            AbstractChannelHandlerContext next = findContextOutbound();
            final Object m = pipeline.touch(msg, next);
        ........
    }
     private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,076评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,658评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,732评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,493评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,591评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,598评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,601评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,348评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,797评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,114评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,278评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,953评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,585评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,202评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,180评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,139评论 2 352

推荐阅读更多精彩内容

  • Netty是一个高性能事件驱动的异步的非堵塞的IO(NIO)框架,用于建立TCP等底层的连接,基于Netty可以建...
    我是解忧鸭铺鸭阅读 1,316评论 0 2
  • 一、Netty介绍 Netty 是一款异步的事件驱动的网络应用程序框架,支持快速地开发可维护的高性能的面向协议的服...
    斯文遮阳阅读 343评论 1 1
  • 本文是我学习《Netty实战》的一些笔记。 Netty 的核心组件: Channel 回调 Future 事件和C...
    笨手笨脚越阅读 250评论 0 1
  • + 4715 3828 210000
    夕魄阅读 272评论 0 1
  • 老天心跳 雷电知多少 轰隆隆一场 还给夜空寂寥 ——《梦》
    NLF_62阅读 167评论 0 1