零、目录
一、pipeline整体关系简述
二、Unsafe的作用
三、事件的分类及处理
四、pipeline中节点的添加和删除
一、pipeline整体关系简述
1、netty中的pipeline模型
当EventLoop的selector监听到某Channel产生了就绪的IO事件,并调用socket API对就绪的IO事件进行操作后,需要将操作产生的“IO数据”或“操作结果”告知用户进行相应的业务处理。
netty将因外部IO事件导致的Channel状态变更(Channel被注册到EventLoop中,Channel状态变为可用,Channel读取到IO数据...)或Channel内部逻辑操作(添加ChannelHandler...)抽象为不同的回调事件,并定义了pipeline对Channel的回调事件进行流式的响应处理。
用户可在pipeline中添加多个事件处理器(ChannelHandler),并通过实现ChannelHandler中定义的方法,对回调事件进行定制化的业务处理。ChannelHandler也可以调用自身方法对Channel本身进行操作。
netty会保证“回调事件在ChannelHandler之间的流转”及“Channel内部IO操作”由EventLoop线程串行执行,用户也可以在ChannelHandler中使用自行构建的业务线程进行业务处理。
2、pipeline相关类的关系图
1)DefaultChannelPipeline:
事件处理流,是一个双向链表结构,链表中节点元素为ChannelHandlerContext。
新的AbstractNioChannel创建时,会创建该Channel对应的DefaultChannelPipeline,用于处理该Channel对应的回调事件。
DefaultChannelPipeline创建时,会自动创建并向链表中添加两个ChannelhandlerContext节点——head和tail。
pipeline的fireXXX()方法:回调事件的发起方法。会产生相应回调事件并直接调用ChannelHandlerContext.invokeXXX(head)方法将回调事件传递给pipeline的head节点。
2)ChannelHandlerContext:
事件处理器上下文,pipeline中的实际处理节点。
每个处理节点ChannelHandlerContext中包含一个具体的事件处理器ChannelHandler,同时ChannelHandlerContext中也绑定了对应的pipeline和Channel的信息,方便ChannelHandler进行调用。
AbstractChannelHandlerContext具体实现了ChannelHandlerContext接口的功能,并进行了相应扩展。
ChannelHandlerContext的fireXXX方法:回调事件的发起方法。会产生相应回调事件并将其交给pipeline中的下一个处理节点。此方法提供给用户实现的ChannelHandler使用,用于将回调事件向pipeline中的下一个节点传递。
AbstractChannelHandlerContext的static invokeXXX(AbstractChannelHandlerContext next)方法:封装next.invokeXXX()的逻辑并交给EventLoop的IO线程执行。
ChannelHandlerContext的invokeXXX()方法:回调事件执行方法。执行节点中事件处理器ChannelHandler的XXX方法,实际处理回调事件。
3)ChannelHandler:
ChannelHandler(事件处理器接口),由ChannelInboundHandler接口和ChannelOutboundHandler接口继承。
ChannelInboundHandler中定义了各个回调事件的回调方法,由用户进行具体实现。
ChannelOutboundHandler中定义了方法进行Channel内部IO操作(Channel发起bind/connect/close操作,Channel监听OP_READ,Channel写IO数据...),供用户在回调方法中使用。
ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter为接口的默认实现类(其实没干什么事),用户通过继承这两个类来实现自己的业务处理逻辑。
二、Unsafe的作用
1、概述
Unsafe是Channel的内部类,一个Channel对应一个Unsafe。
Unsafe用于处理Channel对应网络IO的底层操作。ChannelHandler处理回调事件时产生的相关网络IO操作最终也会委托给Unsafe执行。
2、类继承结构
Unsafe接口中定义了socket相关操作,包括SocketAddress获取、selector注册、网卡端口绑定、socket建连与断连、socket写数据。这些操作都和jdk底层socket相关。
NioUnsafe在Unsafe基础上增加了几个操作,包括访问jdk的SelectableChannel、socket读数据等。
NioByteUnsafe实现了与socket连接的字节数据读取相关的操作。
NioMessageUnsafe实现了与新连接建立相关的操作。
3、读/写操作实现浅述
三、事件的分类及处理
1、Inbound事件
用于描述因外部事件导致的Channel状态变更。
ChannelInboundHandler中定义了各个Inbound事件的回调方法。
2、Outbound事件
用于定义Channel能够提供的IO操作。
ChannelOutboundHandler中定义了Channel提供的进行IO操作的方法。
3、发起并处理inbound事件
ChannelInboundInvoker接口定义了inbound事件的发起方法fireXXX(),用于发起一个inbound事件。
ChannelPipeline接口继承了ChannelInboundInvoker,其实现类DefaultChannelPipeline因此具有发起inbound事件的功能。--> 实际通过调用AbstractChannelHandlerContext的静态invokeXXX()方法,产生并从pipeline的head节点开始接收事件。
ChannelHandlerContext接口继承了ChannelInboundInvoker,其实现类AbstractChannelhandlerContext因此具有发起inbound事件的功能。 --> 实际通过调用AbstractChannelHandlerContext的静态invokeXXX()方法,从当前节点的下一个节点开始接收事件。
AbstractChannelHandlerContext类中定义了静态invokeXXX(next)方法,用于在指定节点上开始接收inbound事件,并保证接收过程在EventLoop线程中执行。
AbstractChannelHandlerContext类中定义了invokeXXX()方法,用于实际接收inbound事件,并执行其ChannelHandler实现的事件回调方法。
对于inbound事件,因为需要进行业务逻辑处理,因此pipeline的head节点会执行fireXXX()方法将事件透传给后面的用户自己实现inbound处理节点,由用户自己实现的ChannelHandler接收事件并回调执行业务逻辑。
fireChannelActive、fireChannelRead、fireChannelReadComplete等inbound事件的处理过程可以自己跟代码看一下。(head节点对channelRead事件向后透传。对channelActive和channelReadComplete事件,向后传递并产生了Channel.read()的outbound事件)
4、发起并处理outbound事件
ChannelOutboundInvoker接口定义了outbound事件的产生方法(register,bind,connect,read,write,flush...),用于描述Channel能够提供的IO操作。
Channel接口继承了ChannelOutboundInvoker,其实现类AbstractChannel因此具有执行outbound事件的功能。--> 实际通过调用ChannelPipeline的对应方法实现。
ChannelPipeline接口继承了ChannelOutboundInvoker,其实现类DefaultChannelPipeline因此具有执行outbound事件的功能。--> 实际通过调用pipeline的tail节点的对应方法实现。
ChannelHandlerContext接口继承了ChannelOutboundInvoker,其实现类AbstractChannelhandlerContext因此具有执行outbound事件的功能。--> 实际从当前节点的前一个outbound处理节点开始接收事件,调用其invokeXXX()方法,并保证接收过程在EventLoop线程中执行。
AbstractChannelHandlerContext类中定义了invokeXXX()方法,用于实际接收outbound事件,并执行其ChannelHandler实现的事件执行方法。
对于outbound事件,因为和IO操作相关,最后会由pipeline中的head节点接收处理。head节点实现了ChannelHandler的事件执行方法,将实际的执行操作委托给Unsafe进行。
bind、connect、read、writeAndFlush等outbound事件的处理过程可以自己跟代码看一下。(最终由head节点委托给Unsafe类执行相关IO操作)
5、head、tail节点的作用
head节点:
head节点既是inBound处理节点,又是outBound处理节点。
head节点作为pipeline的头结点开始接收并传递inbound事件。并作为pipeline的最后一环最终接收处理outbound事件(委托Unsafe进行outbound事件的相关IO操作)。
tail节点:
tail节点是inBound处理节点。
tail节点作为pipeline的第一环传递outbound事件,其实就是将outbound事件透传到前一个outbound处理节点。并作为pipeline的最后一环最终接收inbound事件,大部分左右是终止inbound事件的传播。
tail节点的exceptionCaught方法:若最终在用户自定义的处理节点没有捕获处理异常,则在tail节点捕获异常打印警告日志。
tail节点的channelRead方法:若Channel读入的ByteBuf在流经pipeline过程中没有被消费掉,最终流入了tail节点,则将该ByteBuf丢弃回收并打印警告日志。
四、pipeline中节点的添加和删除
1、向pipeline中添加ChannelHandler的过程
从DefaultChannelPipeline.addLast(ChannelHandler... handlers)这行代码开始跟吧
细节不想写了,列一下大致流程:
1)checkMultiplicity(handler); --> 检查要添加的handler是否被重复添加
2)newCtx= newContext(group,filterName(name,handler),handler); --> 创建节点
3)addLast0(newCtx); --> 添加节点到pipeline的末尾(tail之前)
4)callHandlerAdded0(newCtx); --> 回调新建节点的handlerAdded(ctx)方法
2、将ChannelHandler从pipeline中移除的过程
从DefaultChannelPipeline.remove(ChannelHandler handler)这行代码开始跟吧
细节不想写了,列一下大致流程:
1)getContextOrDie(handler) --> 遍历pipeline链表找到待删除的接待对象
2)remove0(ctx) --> 将节点从pipeline链表中移除
3)callHandlerRemoved0(ctx) --> 回调移除节点的handlerRemoved(ctx)方法。