手写RPC框架
1、手写一个RPC框架,看看100个线程同时调用效果如何
2、手写RPC框架(2)-引入zookeeper做服务治理
3、手写RPC框架(3)-引入Hessian序列化工具
4、手写RPC框架(4)-重写服务治理,开启1000个线程看看netty的执行调用情况
5、手写RPC框架(5)-Netty入门了解和实践
在上一篇 手写RPC框架(5)-Netty入门了解和实践 已经简单介绍了Netty怎么用的,这次通过实际代码编写中中遇到的各种问题为切入点介绍使用情况。具体内容如下:
- 客户端还未等到netty连接上就发起请求(肯定失败)
- 序列化和反序列化中,netty的粘包、拆包处理器错误使用遇到的问题
- 客户端同步阻塞等待结果返回
- 客户端对channel的管理
- 负载均衡的修改
目录结构
开启1000个线程执行的效果图
1、客户端对长连接的管理
每当客户端连接上一个有效的服务端之后,就保持这个channel长连接不断,后续两台机器的数据传递就可以通过这个channel就行,而不需要每次都进行套接字连接一般还需进行3次握手操作,这样可以极大的提高效率。
在这个RPC代码中,在服务端有多台机器的情况下,客户端就需要连接多台机器,对这个channel的管理就非常必要,再合适的添加管理,在channel无效的时候则进行移除操作。
ClientHandler类部分代码
/**
* ip ==> channel
*/
private Map<String, Channel> ipMap = new ConcurrentHashMap<>();
/**
* interface ==> list(ipA, ipB, ...)
*/
private Map<String, List<String>> interfaceMap = new ConcurrentHashMap<>();
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info("断开 channel:{}", ctx.channel());
// 需要移除操作
removeChannel(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.channel().close();
removeChannel(ctx.channel());
}
public void addChannel(Channel channel, String interfaceName, String ip) {
ipMap.put(ip, channel);
List<String> ipList = interfaceMap.get(interfaceName);
if (ipList == null) {
ipList = new ArrayList<>();
interfaceMap.put(interfaceName, ipList);
}
ipList.add(ip);
}
public void removeChannel(Channel channel) {
Iterator<Map.Entry<String, Channel>> iterator = ipMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Channel> entry = iterator.next();
if (entry.getValue() == channel) {
iterator.remove();
}
}
}
在上面的代码中,有两个map,一个是ip->channel,另一个是接口名称->ip列表,因为一个channel是关联一个服务机器和一个客户端机器的,此外在负载均衡中也是从众多有效channel选择一个使用,故使用了这两个map存储channel。
Netty断开channel时需要及时移除掉,防止客户端使用无效的channel而出现错误,而上面的addChannel在哪里被调用着呢?那肯定是在Netty进行链接操作完成之后进行的。
ClientConnection类connection方法部分代码
CountDownLatch countDownLatch = new CountDownLatch(1);
ChannelFuture channelFuture = bootstrap.connect(address);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
// channel连接完成
clientHandler.addChannel(channelFuture.channel(), interfaceName, ip);
// countdownlatch 计时器锁减1操作
countDownLatch.countDown();
}
}
});
这代码看上去会有些莫名其妙,这主要是截取了关键的代码块,在Netty通过bootstrap.connect(address)
异步连接后,会立即返回一个Future对象,通过添加监听器的做法获取到连接成功后的channel,后面再添加到channel管理容器中。可能有人需要问了,Netty的handler除了channelInactive事件之外,还有channelActive激活成功,同样可以异步的获取到有效的channel,为什么不选择呢?
答案也很简单,channelActive调用时,无法知道当前连接的业务接口名称等信息,故只能是在connection方法中采取这种方案获取channel和业务接口的关联关系了
更新:写代码期间并未想到使用Netty的Option透传业务数据,类似于ctx.channel().config().getOptions()就可以获取在boostrap链接时设置的数据,CountDownLatch也就不需要了,也不需要进行异步的调用
2、对粘包、拆包的处理
起初使用了hessian序列化工具,但是在调试中一直出现各种各样数据问题(就是拆包和粘包问题,现在已彻底解决),为了方便调试,就放弃了序列化,直接使用了POJO->toString()
的操作。
bootstrap.group(work).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new LineBasedFrameDecoder(1024))
.addLast(new StringDecoder())
.addLast(new StringEncoder())
.addLast(clientHandler);
}
});
使用StringDecoder和LineBasedFrameDecoder搭配使用,因为LineBasedFrameDecoder是通过\r\n进行切割的,那么转为String格式的时候也需要再加上分隔符。如下代码块
channel.writeAndFlush(JSON.toJSONString(request) + System.getProperty("line.separator"))
后面再使用hessian序列化,如下设置的处理器,一直出现切割字符出错
.addLast(new LineBasedFrameDecoder(1024))
.addLast(new RpcEncoder(RpcRequest.class, serializeProtocol))
.addLast(new RpcDecoder(RpcResponse.class, serializeProtocol))
.addLast(clientHandler);
使用这段代码的时候,因为考虑到使用LineBasedFrameDecoder进行字符处理,在序列完之后添加了System.getProperty("line.separator"))
对应的字节数据,经过调试才发现为什么会出现切割的问题,看下图
往bytebuf添加完数据字节后,又添加了System.getProperty("line.separator"))数据字节,可以看到bytebuf中的字节数据明显超过65的(测试中实际是255),继续看下图在服务端进行数据获取
从bytebuf中获取有效数据长度却只有65,再回顾上一幅图圈中的内容,可以发现经过序列化的字节数就中在索引值为65的地方是0,而System.getProperty("line.separator"))的字节恰好也是0,所以经过序列化之后的数据实际上是:
XXXXXXXXXXX0XXXXXXXXXX0
字节中包含了多个可切割的字节0,原本应该切最后一个,实际上切成了前面一个,使得一个完整的数据被切割成不完整的段,自然导致后面的反序列化失败。
/**
* A decoder that splits the received {@link ByteBuf}s on line endings.
* <p>
* Both {@code "\n"} and {@code "\r\n"} are handled.
* <p>
* The byte stream is expected to be in UTF-8 character encoding or ASCII. The current implementation
* uses direct {@code byte} to {@code char} cast and then compares that {@code char} to a few low range
* ASCII characters like {@code '\n'} or {@code '\r'}. UTF-8 is not using low range [0..0x7F]
* byte values for multibyte codepoint representations therefore fully supported by this implementation.
* <p>
* For a more general delimiter-based decoder, see {@link DelimiterBasedFrameDecoder}.
*/
public class LineBasedFrameDecoder extends ByteToMessageDecoder {
再看LineBasedFrameDecoder类的注释说明,该类适用在UTF-8字符或者ASCII码,而我们序列化生成的字节数据肯定不在其规定的范围内,就导致了数据错误的情况,这也是个误用的典型例子,知其然而不知其所以然。
后面改成.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2))
,使用长度+内容的样式的处理器才顺利解决字符粘包问题。
3、CountDownLatch的使用场景
在调试中屡次发现客户端启动后还未成功链接到服务端就进行服务调用的问题,而且大多出现在客户端启动之后短时间内的调用导致,如下图,此时并没有有效的channel使用,自然就出现错误了,那么如何解决呢?
在初次连接使用阻塞模式,必须等到连接完成才认为启动的准备工作完成才可以进行服务调用的操作。
public void connection(String interfaceName, String ip, boolean sync) {
Channel channel = clientHandler.getChannel(ip);
if (channel != null) {
logger.warn("已经连接好了, IP:{}, interface:{}, channel:{}", ip, interfaceName, channel);
return;
}
InetSocketAddress address = CommonUtils.parseIp(ip);
SerializeProtocol serializeProtocol = rpcClient.getSerializeProtocol();
EventLoopGroup work = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(work).channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
xxx
});
CountDownLatch countDownLatch = new CountDownLatch(1);
// 使用了一个计时器锁,阻塞操作
ChannelFuture channelFuture = bootstrap.connect(address);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
clientHandler.addChannel(channelFuture.channel(), interfaceName, ip);
countDownLatch.countDown();
// 释放操作
}
}
});
if (sync) {
// 如果传入的sync是true
try {
countDownLatch.await();
} catch (InterruptedException e) {
}
}
}
使用了一个计时器锁CountDownLatch,在最后面如果传入的sync为true则会进行阻塞等到,直到在完成链接后进行countDownLatch.countDown()
操作,从而释放锁释,继续执行,如果sync是false,就是非阻塞的模式,通过这sync参数兼容两种模式,在后面客户端阻塞等到获取数据的时候也采用了类似的方案,如下代码
RpcResponseFuture类
public class RpcResponseFuture {
private RpcResponse response;
private CountDownLatch countDownLatch;
public RpcResponseFuture() {
this.countDownLatch = new CountDownLatch(1);
}
public RpcResponse getResponse() throws InterruptedException {
// 阻塞在这里,直到得到了数据
this.countDownLatch.await();
return this.response;
}
public void setResponse(RpcResponse response) {
this.response = response;
this.countDownLatch.countDown();
}
}
意图也很明显,直到setResponse方法被调用完成后,getResponse的阻塞方法才被放行
4、负载均衡完善
负载均衡此前是针对IP进行一个选择操作,此方法效率低下而且存在不可用的情况,在上面完善了客户端长连接管理方法的工作后,负载均衡也由对IP的选择修改成对channel的选择。如下代码块,基本无改变,就是把String改成了Channel而已。
public abstract class AbstractLoadBalance implements LoadBalance {
@Override
public Channel balance(List<Channel> channelList) {
if (channelList == null || channelList.isEmpty()) {
return null;
}
if (channelList.size() == 1) {
return channelList.get(0);
}
return doLoad(channelList);
}
abstract Channel doLoad(List<Channel> addressList);
}
那么为什么使用IP是,效率低下而且不可用呢?
- 1、每次获取IP节点后都需要进行三次握手的链接操作,效率低下,无法发挥长连接的功效
- 2、服务端的IP数据存在在zk节点的,容易出现IP数据变更了,但是客户端还未及时感知,依旧尝试连接,继而出现错误,而channel则可以很高效的感知到是否可用
最后贴一个负载均衡的场景效果图,服务端1启动后,客户端启动,成功的完成一定的任务后,服务端2启动,这时候客户端的负载均衡就会生效,会随机的从服务端1和服务端2选择一个进行调用操作
public static void main(String[] args) {
RpcClient rpcClient = new RpcClient();
rpcClient.subscribe(Calculate.class);
rpcClient.start();
Calculate<Integer> calculateProxy = rpcClient.getInstance(Calculate.class);
for(int i=0;i<10;i++) {
new Thread(() -> { test(calculateProxy);}).start();
}
try {
Thread.sleep(1000 * 20);
} catch (InterruptedException e) {
e.printStackTrace();
}
for(int i=0;i<20;i++) {
new Thread(() -> { test(calculateProxy); }).start();
}
}
private static void test(Calculate<Integer> calculateProxy) {
int s1 = new Random().nextInt(100);
int s2 = new Random().nextInt(100);
long start = System.currentTimeMillis();
int s3 = calculateProxy.add(s1, s2);
logger.info("[" + Thread.currentThread().getName() + "]a: " + s1 + ", b:" + s2 + ", c=" + s3 + ", 耗时:" + (System.currentTimeMillis() - start));
}
如圈住的图显示,在执行完第一批10个任务后,新启动了一个服务端的机器,客户端通过zk及时感知到该数据变化后进行连接操作,再紧接着第二批20个任务执行中,就成功的使用了负载均衡的逻辑,从10001和10002两个服务端随机调用,如下图服务端1和服务端2的请求记录。
13.17分执行完成后,直到第二波请求在13.18分来到,顺利执行
13.17.52启动的新服务的,也在13.18顺利接收到客户端请求,通过负载均衡就可以实现灵活的上下线处理。
至此本篇学习笔记就算是完成了,还剩下了心跳检测、断开重连等功能,由于现在设置的请求体格式并不是很完善,故下次改写请求头内容,以便于更好的支持完善心跳检测和断开重连等操作,再后面netty完成就剩下结合spring了。在实际代码编写中由于某些点掌握理解不透,也是经过大量的测试,踩坑不少才勉强做到这样的,代码还有诸多问题,继续学习和完善~
如代码存在的问题欢迎提出~