这是《手写RPC框架,我学会了什么?》系列的第08篇
ChannelHandler,无论之于Server端还是Client端都是重头戏。
上一篇 中处理客户端请求的ServerHandler继承自ChannelInboundHandlerAdapter,处理服务端响应的ClientHandler继承自SimpleChannelInboundHandler。甚至连 06篇 中的编码器和解码器也属于ChannelHandler的实现类。
可以把Channel想象成一条接受数据的通道,当数据进来之后,就会找到自己的ChannelPipeline,然后根据注册(通过addLast完成)的顺序,依次调用ChannelHandler的各个实现类来完成业务逻辑。当然,InBound*的顺序会优先于OutBound*。在不同的Handler之间,数据通过ChannelHandlerContext传递。
记住下面这张示意图,但也记住之前的描述并不准确
实际上,每个Handler的衔接,并不是自动完成的,而是通过上一个Handler 显式 的调用下一个Handler来完成。只不过我们继承某些子类的时候(比如MessageToByteEncoder)Netty帮我们完成了这个动作。
在ServerHandler中。我们重写了3个方法:channelRead()、channelReadComplete()和exceptionCaught()。分别代表当读取到消息时,读取到消息后,及异常时需要执行的代码。让我们通过一个FooHandler来验证一下。
/**
* @author nathan
* @date 2019/3/22
*/
public class FooHandler extends ChannelInboundHandlerAdapter {
/**
* 编号
*/
private int i;
public FooHandler(int i) {
this.i = i;
}
private String extractMessage(ByteBuf buf) {
int len = buf.readableBytes();
// 初始化1个byte数组
byte[] byteArray= new byte[len];
// 将buf中的bytes拷贝到byte数组中
buf.readBytes(byteArray);
// 将byte数组转为字符串,并去掉换行符
String message = new String(byteArray).replaceAll("\n|\r", "");
// 将byte数组回写到buf中
buf.writeBytes(byteArray);
return message;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("handler" + i + ", read... " + extractMessage((ByteBuf) msg));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("handler" + i + ", complete...");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("handler" + i + ", exception...");
}
}
每个FooHandler包含一个专属的编号,在接收到消息时,会打印出消息的内容。Server的代码片段如下:
// 其他代码片段...
ch.pipeline()
.addLast(new FooHandler(1))
.addLast(new FooHandler(2))
.addLast(new FooHandler(3));
// 其他代码片段...
启动Server后,通过telnet连接到服务端,输入Hello, netty.
。
在Server端能看到如下输出:
handler1, read...Hello, netty.
handler1, complete...
并没有任何跟handler2和3相关的字样。
稍微修改一下代码,显示的调用fireChannelRead()方法。
// 其他代码片段...
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("handler" + i + ", read... " + extractMessage((ByteBuf) msg));
ctx.fireChannelRead(msg);
}
// 其他代码片段...
重新测试后,输出结果如下:
handler1, read... Hello, netty.
handler2, read... Hello, netty.
handler3, read... Hello, netty.
handler1, complete...
上面的实验说明:
- 各个ChannelHandler实现类的执行顺序通过pipeline编排。
- 部分ChannelHandler实现类需要手动通过fire*()方法触发下一个实现类执行。
- channelRead和channelReadComplete(以及exceptionCaught)是3个完全独立的事件。
关于最后一点,下面这个例子会更加明显:
// 其他代码片段...
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("handler" + i + ", read... " + extractMessage((ByteBuf) msg));
throw new RuntimeException("");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("handler" + i + ", complete...");
ctx.fireChannelReadComplete();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("handler" + i + ", exception...");
}
// 其他代码片段...
输出结果:
handler1, read... Hello, netty.
handler1, exception...
handler1, complete...
handler2, complete...
handler3, complete...