Netty学习笔记(一)组件

Channel,EventLoop和ChannelFuture类构成了Netty网络抽象的代表:

  • Channel:对应Socket
  • EventLoop:对应控制流,多线程处理,并发
  • ChannelFuture:对应异步通知

Channel接口

Channel是对Socket的封装,大大降低了直接使用Socket的复杂性。

EventLoop接口

EventLoop用于处理连接的生命周期中所发生的事件。在服务端编程中,EventLoop起到了监听端口连接和数据读取的工作。

ChannelFuture接口

Netty中的所有IO操作都是异步的,一个操作不会立即返回,我们需要在执行操作之后的某个时间点确定其结果的方法,即ChannelFuture接口,其addListener方法注册了一个ChannelFutureListener,一遍在某个操作完成时得到通知。

ChannelHandler和ChannelPipeline

ChannelHandler充当了所有处理入站和出站数据的应用程序逻辑的容器。ChannelHandler常常实现了传入数据包拆分以及业务逻辑控制的功能。

ChannelPipeline接口

ChannelPipeline为ChannelHandler链提供容器,并定义了用于在该链上传播入站和出站事件流的API。即ChannelPipeline其实就是一个保存很多ChannelHandler的链表。

BootStrap接口

Netty的引导类为应用程序的网络层配置提供了容器。有两种类型的引导:一种用户客户端,称为Bootstrap,另一种称为用于服务器,称为ServerBootstrap。

示例

上面提到的这些接口在实际使用过程中并不是挨个使用的,而是使用BootStrap接口来将需要的接口组织在一起,下面是个基于netty的服务端的例子:

public class Server {
    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
                    .handler(new ServerHandler())
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) {
                        }
                    });
            ChannelFuture f = b.bind(8888).sync();
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

public class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("channelActive");
    }
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) {
        System.out.println("channelRegistered");
    }
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        System.out.println("handlerAdded");
    }
    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        super.channelRead(ctx, msg);
        new Thread(new Runnable() {
            @Override
            public void run() {
                // 耗时的操作
                String result = loadFromDB();
                ctx.channel().writeAndFlush(result);
                ctx.executor().schedule(new Runnable() {
                    @Override
                    public void run() {
                        // ...
                    }
                }, 1, TimeUnit.SECONDS);
            }
        }).start();
    }
    private String loadFromDB() {
        return "hello world!";
    }
}

下面就针对上面使用的ServerBootstrap类来分析Netty是如何来封装java nio相关的操作的。

Netty 如何获取NIO中需要的Channel

首先从ChannelFuture f = b.bind(8888).sync();中的bind方法开始分析。
这个bind方法经过层层调用,最终会调用initAndRegister方法:

private ChannelFuture doBind(final SocketAddress localAddress) {
  final ChannelFuture regFuture = initAndRegister();
  ...
}
final ChannelFuture initAndRegister() {
  Channel channel = null;
  try {
    channel = channelFactory.newChannel();
    ...

从上述源码中可以看到channel来自channelFactory的newChannel方法。ChannelFactory是一个接口,我们只能从它的实现类ReflectiveChannelFactory来看下具体实现了:

    private final Class<? extends T> clazz;
  
    @Override
    public T newChannel() {
        try {
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

可以看出这个工厂方法的newChannel其实就是根据clazz属性来反射出具体的对象。那当前代码中的clazz属性值是多少呢?回到一开始示例执行的channel(NioServerSocketChannel.class)这句代码,这个channel方法实现如下:

public B channel(Class<? extends C> channelClass) {
   return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

所以ServerBootstrapy引导类中使用的通道就是channel方法中设置的NioServerSocketChannel通道。

NioServerSocketChannel实现

接下来就要来看下Netty的NioServerSocketChannel内部是怎么调用java NIO中的ServerSocketChannel的。
NioServerSocketChannel内部实现步骤如下:

  • newSocket() 通过jdk来创建底层jdk channel
  • AbstractNioChannel() 方法中调用configureBlocking(false) 设置通道为非阻塞模式
  • AbstractNioChannel() 方法创建id,unsafe,pipeline
    接下面就上面几个点来看下NioServerSocketChannel类的源码。
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
public NioServerSocketChannel() {
   this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
     return provider.openServerSocketChannel();
    } catch (IOException e) {
     throw new ChannelException("Failed to open a server socket.", e);
    }

从NioServerSocketChannel的构造函数可以看出java NIO的SelectorProvider.provider().openServerSocketChannel()生成了ServerSocketChannel通道。
接下来看第二个点,即在哪设置通道的非阻塞模式,NioServerSocketChannel构造函数如下:

public NioServerSocketChannel(ServerSocketChannel channel) {
  super(null, channel, SelectionKey.OP_ACCEPT);
  ...
}

super方法调用了父类AbstractNioChannel的构造函数如下:

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
   super(parent);
   ...
   ch.configureBlocking(false);
   ...

所以NioServerSocketChannel的构造函数中将通道设置了非阻塞模式。除了设置为非阻塞模式外,还调用了父类的构造函数super(parent),具体如下:

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

从上面这段代码中可以看出在构造函数中创建了一个新的pipeline,并且这个链表一开始是空的。

在介绍了ServerBootstrap中如何调用java NIO中的ServerSocketChannel的实现后,我们接下来看ServerBootstrap是如何初始化服务器Channel的。还是从示例中ChannelFuture f = b.bind(8888).sync();中bind方法中的initAndRegister方法中的代码开始分析:

final ChannelFuture initAndRegister() {
  Channel channel = null;
  try {
    channel = channelFactory.newChannel();
    init(channel);
  }
  ...
}
abstract void init(Channel channel) throws Exception;

之前我们分析了从channelFactory中获取ServerChannel通道的逻辑,这里再分析下ServerBootstrap中这个init方法:

void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            channel.config().setOptions(options);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }

        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

init方法首先将传入的childOption,childAttr属性保存到ServerBootstrap对象中;另外又把用户通过handler方法设置的handler对象添加到pipeline尾部,然后再在尾部添加一个ServerBootstrapAcceptor对象,里面包装了用户通过childHandler方法传入的对象。

注册Selector

上面主要学习了netty内部是如何封装了ServerSocketChannel等NIO相关的操作的,接下来再继续看下netty内部是如何封装NIO中注册选择器selector的。
回到我们之前提到的initAndRegister方法内部:

final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            ......
        }

        ChannelFuture regFuture = config().group().register(channel);
        ......
    }

我们已经讲过获取ServerSocketChannel和初始化通道这两个部分,接下来再看下面config().group().register(channel);内部是如何实现注册Selector的。
由于EventLoopGroup是接口,在本例中实现类为NioEventLoop。

EventLoopGroup接口继承关系

NioEventLoop类实现的register方法:

public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
        // 参数校验......
        try {
            ch.register(selector, interestOps, task);
        } catch (Exception e) {
            throw new EventLoopException("failed to register a channel", e);
        }
    }

可以看出最终执行的SeverSocketChannel.register(selector,interestOps,task);方法,即还是执行了java NIO中通道注册到选择器selector的方法。

端口绑定

上面我们已经分析了AbstractBootstrap类里面doBind方法中initAndRegister方法的内部实现:创建ServerSocketChannel通道,初始化,注册通道到选择器selector上这三步。下面再继续分析doBind方法中后面关于端口绑定的实现。
从下面代码中的doBind0方法开始解析:

private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
            .......
}

private static void doBind0(
      final ChannelFuture regFuture, final Channel channel,
      final SocketAddress localAddress, final ChannelPromise promise) {
            channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

从上面源码中可以看出,在doBind0方法内部执行了ServerSocketChannel的bind方法,即进行端口的绑定操作。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351

推荐阅读更多精彩内容