手写RPC框架
1、手写一个RPC框架,看看100个线程同时调用效果如何
2、手写RPC框架(2)-引入zookeeper做服务治理
3、手写RPC框架(3)-引入Hessian序列化工具
4、手写RPC框架(4)-重写服务治理,开启1000个线程看看netty的执行调用情况
Netty是基于NIO的的服务框架,屏蔽了使用Java原生NIO网络模型的各种问题,对外提供灵活的Reactor模型配置,也提供了插拔式的Handler处理器,便于支持各种网络协议和特定业务等操作,也是异步事件驱动,使得性能能够更高。此前RPC中关于Netty的代码逻辑存在些问题,对Netty的一些概念也没有理解到位,所以这次就一起再学习Netty,先写一个demo有大致的了解和印象,随后通过问题介绍各个组件的功能和特点,其原因是什么。
- 粘包、拆包是什么情况,为什么会发生这种情况?
- pipeline 和 handler是什么关系?
- pipeline.addLast的顺序是如何执行的?
- handler中的各个fireXXX执行顺序是怎样的?
- 为什么server是2个EventLoopGroup,而client却只有1个EventLoopGroup?
Demo
运行效果如下图
服务端
public class Server {
public void run(int port) throws InterruptedException {
EventLoopGroup workGroup = new NioEventLoopGroup();
EventLoopGroup bossGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeServerHandler());
}
});
ChannelFuture cf = serverBootstrap.bind(port).sync();
cf.channel().closeFuture().sync();
} finally {
workGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new Server().run(10002);
}
}
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered ...");
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelUnregistered ...");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive ...");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive ...");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("Client:[" + body + "]");
String cur = ("Hello, My name is jwfy".equalsIgnoreCase(body) ? "OK" : "ERROR") + System.getProperty("line.separator");
ctx.writeAndFlush(Unpooled.copiedBuffer(cur.getBytes()));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelReadComplete ...");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客户端
public class Client {
public void connection(String host, int port) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeClientHandler());
}
});
ChannelFuture cf = bootstrap.connect(host, port).sync();
cf.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
new Client().connection("127.0.0.1", 10002);
}
}
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered ...");
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelUnregistered ...");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive ...");
for(int i = 0; i < 1; i++ ) {
ctx.writeAndFlush(Unpooled.copiedBuffer(("Hello, My name is jwfy" + System.getProperty("line.separator")).getBytes()));
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive ...");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("Server:[" + body + "]");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
EventLoopGroup
EventLoopGroup 是一种Reactor多线程模型的抽象,具体实现一般都是NioEventLoopGroup。而Reactor模型又有单线程、多线程、以及主从多线程模型,他们有什么区别呢?
单线程模型
1个NIO线程原则上可以负责所有IO相关的请求操作,通过acceptor接收客户端发生的TCP请求,当链接建立成功之后,通过Dispatch将对于的请求数据包装成bytebuf指派给相关的handler处理。但是这在某些场景下也不太合适。
- 一个NIO线程同时管理成百上千的客户端链接,会严重影响性能
- 当NIO线程的负载很高时,导致处理速度变慢,同时还可能因为某一请求影响整个NIO线程的工作,进而影响其他端口的处理请求。故有了多线程模型。
多线程模型
同样是一个NIO线程接收客户端的请求调用,当链接完成后请求会分配给一个NIO线程池,具体的消息序列化反序列化、数据处理等任务可有NIO线程池中的线程完成。
同样的基本情况下是没有问题的,但是多个客户端连接依旧可能出现性能问题,故有了主从多线程模型
主从多线程模型
主从情况就是从一个NIO线程变成了一个NIO线程池,可同时由多个NIO线程处理客户端的请求连接操作,减少因为性能不足导致的问题,这也是netty推荐的使用方法。
EventLoopGroup 则也是一个NIO线程池,即可用于客户端的TCP请连接求,也可用于数据的IO处理,所以在上述代码中观察发现服务端和客户端的EventLoopGroup个数不一样也是这个道理,服务端一个线程池用来接收客户端连接,另一个则用来进行读写IO操作。
粘包、拆包
众所周知,网络上的传输的都是字节流,从TCP/IP协议角度出发无法知道具体的业务数据组装情况,所以实际场景中一个请求可能被分批次传输,也有可能因为请求数据太少故打包多个请求统一传输
如上图,正常的情况是分别有D1和D2两个数据包发送到服务端,但是因为网络拥塞比较严重,滑动窗口自适应的缩小,使得1个缓冲区的大小无法装满整个请求体,就会出现拆包的情况;又例如请求体内容较少,无法填充完整缓冲区,那么就会等待多个请求把缓冲区填满再发送出去,就会出现粘包的情况,如下距离:
- D2和D1 同时发送到服务端,那么服务端则需要正确的进行拆分处理,否则反序列化会失败
- D1和D2 的一部分D2_1 同时发送到服务端,服务端除了需要把D1拆出来,还需要等待D2_2的到来才能开始处理D2数据
必须首先处理好拆包和粘包问题,才能保证收到正常的完整的消息,而netty则帮我们解决了大部分问题了,例如根据长度拆分(FixedLengthFrameDecoder),根据换行符拆分(LineBasedFrameDecoder),又或者分割符拆分(DelimiterBasedFrameDecoder),只是在本Demo中使用的是换行符切分的LineBasedFrameDecoder
ChannelPipeline 和 ChannelHandler
ChannelPipeline 是一个拥有头(Head)和尾(Tail)的双向链式容器,可自由添加不同的handler处理器以满足不同的业务需求。同时因为有从外界读取数据和发送数据两种场景,所以有inbound和outbound两种情况。
ChannelHandler 则是具体的处理器,可通过addLast方式添加到pipeline管道链路上,如粘包说的LineBasedFrameDecoder也是一种具体的handler处理器。demo中提到的添加自定义handler代码块如下所示
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeServerHandler());
通过这种添加方式形成了下面的链路
HEAD TAIL
LineBasedFrameDecoder -> StringDecoder -> TimeServerHandler
- netty读取的规则是从head开始的,先进行拆分、粘包的处理,再反序列化,后面交由具体的业务处理器,是inbound类处理器
- netty写出的规则是从tail开始的,先进行数据的序列号,再发送出去,是outbound类处理器
这整个链路是比较清晰完整的,如果把StringDecoder和LineBasedFrameDecoder的处理器顺序换一下,则会发现出现错误,如下图
圈住的地方换行符就是我们代码中添加的 System.getProperty("line.separator")
换行导致,因为这个就1次调用,所以会发现只进行了字符串的转换,并没有进行拆包处理,再次把请求的数据量加大些,再测试看看
会发现服务端接收到的数据全部错误了,没有一个正确,切记不要把handler处理器顺序搞错,如下图是netty源码中关于顺序的说明情况。
Handler 生命周期
handler在处理的时候是有着一定的顺序,例如服务端先接收请求的注册,等到TCP/IP三次握手完成后,相当于channel激活完成,开始接受客户端正常的请求调用,然后返回响应结果等,客户端关闭后,服务端也需要进行取消激活,关闭注册的操作,以放弃该channel的管理操作。通过对其各个步骤的生命周期的管理,可以实现自定义的各种管理和控制。fireXXX又被包装成类似于channelRegistered的名字,如下图的调用过程
如本demo的运行结果也可以很明显的看出其执行链路。
结束
到此netty的学习就结束了,并没有介绍的太深入,也只是把常用的组件知识梳理了一遍,以便于我们在使用netty的时候注意到这些问题,以发挥netty的最大功效,文中很多内容都参考自《Netty权威指南》,大家如果有兴趣的话可以自行阅读学习和加强理解,下一期将会进行RPC代码中的netty部门的改造。
如代码存在的问题欢迎提出~