Java进阶-Netty-进阶

一、Reactor线程

  源码基于4.1.6.Final版本。

1.1 Reactor线程启动

  NioEventLoop的run方法是reactor线程的主体,在第一次添加任务的时候被启动

  • 入口:NioEventLoop父类SingleThreadEventExecutor的execute方法
@Override
public void execute(Runnable task) {
    ...
    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
        startThread();
        addTask(task);
        ...
    }
    ...
}
  • netty的reactor线程在添加一个任务的时候被创建,该线程实体为FastThreadLocalThread。
  • 最后线程执行主体为NioEventLoop的run方法。

1.2 Reactor线程执行

@Override
protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
                    // fallthrough
            }
            processSelectedKeys();
            runAllTasks(...);
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        ...
    }
image.png
  • 轮询注册到reactor线程对应的selector上的所有的channel的IO事件
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
      selector.wakeup();
}

定时任务截止事时间快到了,中断本次轮询
轮询过程中发现有任务加入,中断本次轮询
阻塞式select操作

  netty会在每次进行selector.select(timeoutMillis)之前记录一下开始时间currentTimeNanos,在select之后记录一下结束时间,判断select操作是否至少持续了timeoutMillis秒,如果持续的时间大于等于timeoutMillis,说明就是一次有效的轮询,重置selectCnt标志,否则,表明该阻塞方法并没有阻塞这么长时间,可能触发了jdk的空轮询bug,当空轮询的次数超过一个阀值的时候,默认是512,就开始重建selector

  • 处理产生网络IO事件的channel
processSelectedKeys();

private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized(selectedKeys.flip());
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

取出IO事件以及对应的netty channel类
处理该channel
判断是否该再来次轮询

对于boss NioEventLoop来说,轮询到的是基本上就是连接事件,后续的事情就通过他的pipeline将连接扔给一个worker NioEventLoop处理
对于worker NioEventLoop来说,轮询到的基本上都是io读写事件,后续的事情就是通过他的pipeline将读取到的字节流传递给每个channelHandler来处理SelectedSelectionKeySet

  netty使用数组替换掉jdk原生的HashSet来保证IO事件的高效处理每个SelectionKey上绑定了netty类AbstractChannel对象作为attachment,在处理每个SelectionKey的时候,就可以找到AbstractChannel,然后通过pipeline的方式将处理串行到ChannelHandler,回调到用户方法。

  • 处理任务队列

用户自定义普通任务
非当前reactor线程调用channel的各种方法
用户自定义定时任务:1)若干时间后执行一次 2)每隔一段时间执行一次 3)每次执行结束,隔一定时间再执行一次

  taskQueue在NioEventLoop中默认是mpsc队列,mpsc队列,即多生产者单消费者队列,netty使用mpsc,方便的将外部线程的task聚集,在reactor线程内部用单线程来串行执行

  reactor线程task调度:

  • 从scheduledTaskQueue转移到期的定时任务到taskQueue(mpsc queue)
  • 计算本次任务循环的截止时间
  • 执行任务
  • 收尾

当前reactor线程调用当前eventLoop执行任务,直接执行,否则,添加到任务队列稍后执行
netty内部的任务分为普通任务和定时任务,分别落地到MpscQueue和PriorityQueue
netty每次执行任务循环之前,会将已经到期的定时任务从PriorityQueue转移到MpscQueue
netty每隔64个任务检查一下是否该退出任务循环

二、服务端启动

b.bind(8888).sync();

public ChannelFuture bind(int inetPort) {
    return bind(new InetSocketAddress(inetPort));
} 

public ChannelFuture bind(SocketAddress localAddress) {
    validate();
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    return doBind(localAddress);
}

private ChannelFuture doBind(final SocketAddress localAddress) {
    //...
    final ChannelFuture regFuture = initAndRegister();
    //...
    final Channel channel = regFuture.channel();
    //...
    doBind0(regFuture, channel, localAddress, promise);
    //...
    return promise;
}
  • new一个channel
image.png

用户调用方法Bootstrap.bind(port)第一步就是通过反射的方式new一个NioServerSocketChannel对象,并且在new的过程中创建了一系列的核心组件

  • init这个channel

设置option和attr
设置新接入channel的option和attr
加入新连接处理器

  • 将这个channel register到某个对象

1)设置启动类参数,最重要的就是设置channel创建server对应的channel,创建各大组件,包括ChannelConfig,ChannelId,ChannelPipeline,ChannelHandler,Unsafe等
2)初始化server对应的channel,设置一些attr,option,以及设置子channel的attr,option,给server的channel添加新channel接入器,并触发addHandler,register等事件
3)调用jdk底层做端口绑定,并触发active事件,active触发的时候,真正做服务端口绑定

三、新连接接入

所有的channel底层都会有一个与unsafe绑定,每种类型的channel实际的操作都由unsafe来实现

image.png

  流水线的开始就是HeadContxt,流水线的结束就是TailConext,HeadContxt中调用Unsafe做具体的操作,TailConext中用于向用户抛出pipeline中未处理异常以及对未处理消息的警告

  • 检测到有新连接进入
  • 将新的连接注册到worker线程组
  • 注册新连接的读事件
//NioEventLoop.java
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    int readyOps = k.readyOps();
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        unsafe.read();
    }
}
  • boos reactor线程轮询到有新的连接进入
  • 通过封装jdk底层的channel创建NioSocketChannel以及一系列的netty核心组件
  • 将该条连接通过chooser,选择一条worker reactor线程绑定上去
  • 注册读事件,开始新连接的读写

四、pipeline

4.1 pipeline 初始化

  pipeline是channel其中的一员,在AbstractChannel中被创建

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}
protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

  pipeline中保存了channel的引用,默认情况下,一条pipeline会有两个节点,head和tail

image.png

4.2 pipeline添加节点

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new Spliter())
         p.addLast(new Decoder());
         p.addLast(new BusinessHandler())
         p.addLast(new Encoder());
     }
});
image.png
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        // 1.检查是否有重复handler
        checkMultiplicity(handler);
        // 2.创建节点
        newCtx = newContext(group, filterName(name, handler), handler);
        // 3.添加节点
        addLast0(newCtx);
    }
   
    // 4.回调用户方法
    callHandlerAdded0(handler);
    
    return this;
}

netty中用两个字段来表示这个channelHandlerContext属于inBound还是outBound,或者两者都是

private static boolean isInbound(ChannelHandler handler) {
    return handler instanceof ChannelInboundHandler;
}

private static boolean isOutbound(ChannelHandler handler) {
    return handler instanceof ChannelOutboundHandler;
}
image.png
  • 检查是否有重复handler
  • 创建节点
  • 添加节点
  • 回调用户方法
private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev; // 1
    newCtx.next = tail; // 2
    prev.next = newCtx; // 3
    tail.prev = newCtx; // 4
}
image.png

4.3 pipeline删除节点

@Override
public final ChannelPipeline remove(ChannelHandler handler) {
    remove(getContextOrDie(handler));
    
    return this;
}
  • 找到待删除的节点
  • 调整双向链表指针删除
  • 回调用户函数
image.png

4.4 pipeline其他

  • 一个Channel对应一个Unsafe,Unsafe处理底层操作,NioServerSocketChannel对应NioMessageUnsafe, NioSocketChannel对应NioByteUnsafe
  • inBound事件从head节点传播到tail节点,outBound事件从tail节点传播到head节点
  • 异常传播只会往后传播,而且不分inbound还是outbound节点,不像outBound事件一样会往前传播

五、writeAndFlush

  • pipeline中的标准链表结构
image.png
  • Encoder节点分配一个ByteBuf,调用encode方法,将java对象根据自定义协议写入到ByteBuf,然后再把ByteBuf传入到下一个节点。
  • pipeline中的编码器原理是创建一个ByteBuf,将java对象转换为ByteBuf,然后再把ByteBuf继续向前传递
  • 调用write方法并没有将数据写到Socket缓冲区中,而是写到了一个单向链表的数据结构中,flush才是真正的写出
  • writeAndFlush等价于先将数据写到netty的缓冲区,再将netty缓冲区中的数据写到Socket缓冲区中,写的过程与并发编程类似,用自旋锁保证写成功
  • netty中的缓冲区中的ByteBuf为DirectByteBuf

六、拆包器

  • 不断从TCP缓冲区中读取数据,每次读取完都需要判断是否是一个完整的数据包
  • 如果当前读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,继续从tcp缓冲区中读取,直到得到一个完整的数据包
  • 如果当前读到的数据加上已经读取的数据足够拼接成一个数据包,那就将已经读取的数据拼接上本次读取的数据,够成一个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接

netty中的拆包内部会有一个累加器,每次读取到数据都会不断累加,然后尝试对累加到的数据进行拆包,拆成一个完整的业务数据包,这个基类叫做ByteToMessageDecoder

netty将具体如何拆包抽象出一个decode方法,不同的拆包器实现不同的decode方法,就能实现不同协议的拆包

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,033评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,725评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,473评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,846评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,848评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,691评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,053评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,700评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,856评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,676评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,787评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,430评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,034评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,990评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,218评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,174评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,526评论 2 343

推荐阅读更多精彩内容