前言
本文只是简单实现了一次RPC调用示例,以理解其调用原理。一些主流RPC框架的其他功能并没有实现。(如服务自动注册与发现,流控,动态配置等)
PRC调用核心
像调用本地代码一样调用远程服务。
调用方只需调用服务方所提供的接口,通过Java动态代理,代理方法内,与服务方进行网络交互,得到服务方返回结果。基于上述,调用方只需依赖服务方所提供的接口。在使用时的感觉就像是,调用了本地代码一样。其实是用代理模式屏蔽了底层的网络交互。
简单的RPC调用所涉及的底层技术
Java动态代理
Java反射
Java序列化
NIO网络模型(Netty)
Netty是什么
- Netty 是 JBoss 公司用 Java 写的一个 Jar 包(库),目的是快速开发高性能、高可靠性的网络服务器和客户端程序
- Netty 提供异步、无阻塞、事件驱动的网络应用程序框架和工具
- Netty 是目前公认的网络编程最好的框架,官网地址:http://netty.io/
- GitHub 托管地址:https://github.com/netty/netty
- Netty 底层封装的也是 Java 的NIO,所以也叫NIO框架,常用于开发分布式系统
推荐该博主的关于netty的文章:
https://blog.csdn.net/wangmx1993328/article/details/83035760
动手实现
调用时序图
我们先来看一次PRC调用的时序图:
- Client:服务调用方
- Proxy : 调用方动态代理组件
- Netty_C:调用方Netty客户端
- 注册中心:服务自动注册与发现,可以是ZooKeeper
- Netty_S : 提供方Netty服务端
- Server: 服务提供方
注:上图红色虚线框中的功能,本示例没有涵盖。本示例通过API方式配置,直连服务节点。
类图
- MyRpcServiceContainer : 调用方入口,主要用于获取代理服务,存储服务节点的信息。
- MyRpcServiceGroup : 服务节点的集合,并提供负载均衡策略(未实现)。
- MyRpcServiceNode : 单个服务节点的信息。
- MyRpcClientProxy : 调用方动态代理实现类。
- MyRpcClient : Netty客户端,对连接信息进行配置,如序列化反序列化Handler和异步处理返回结果的Handler。
- MyRpcClientHandler : Netty客户端异步处理的Handler。主要用于发送请求信息等。
- MyRpcRequest : 请求对象封装,包含请求接口,请求方法,请求参数等。
- MyRpcResponse : 请求结果封装,包含方法返回结果。
- MyRpcServer : 提供方入口,主要用于暴露服务。
- MyRpcServerConfig : 提供方服务的集合,以及一些配置信息。
- MyRpcServiceImplProxy : 提供方服务代理,代理服务的方法具体实现,并提供流控等功能。
- MyRpcFlowControl : 流控计数器,针对接口、方法维度,提供流控计数功能。
- MyRpcServerHandler : Netty服务端异步处理的Handler。主要用于发送执行结果等。
代码讲解
TODO
测试
服务提供方
public class MyRpcServerTest {
@Test
public void testName() {
MyRpcServer myRpcServer = new MyRpcServer();
MyRpcServerConfig config = new MyRpcServerConfig();
config.setPort(8888);
//注册服务,此处为api方式
MyRpcServiceImplProxy implProxy = new MyRpcServiceImplProxy(DemoRpcService.class);
config.addService(DemoPrcInterface.class, implProxy);
myRpcServer.config(config).start();
}
}
十一月 13, 2019 5:24:09 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0xea1be79c] REGISTERED
十一月 13, 2019 5:24:09 下午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0xea1be79c] BIND: 0.0.0.0/0.0.0.0:8888
十一月 13, 2019 5:24:09 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0xea1be79c, L:/0:0:0:0:0:0:0:0:8888] ACTIVE
main,服务器开始监听端口,等待客户端连接.........
十一月 13, 2019 5:24:25 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xea1be79c, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0xac188c6d, L:/127.0.0.1:8888 - R:/127.0.0.1:61210]
十一月 13, 2019 5:24:25 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xea1be79c, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
十一月 13, 2019 5:24:25 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xea1be79c, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0x46558bc0, L:/127.0.0.1:8888 - R:/127.0.0.1:61208]
十一月 13, 2019 5:24:25 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xea1be79c, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
十一月 13, 2019 5:24:25 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xea1be79c, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0x7b6f1d71, L:/127.0.0.1:8888 - R:/127.0.0.1:61209]
十一月 13, 2019 5:24:25 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xea1be79c, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
服务调用方
public class MyRpcServiceContainerTest {
private CountDownLatch countDownLatch = new CountDownLatch(3);
@Test
public void testName() throws InterruptedException {
Map<Class, MyRpcServiceGroup> serviceInfoMap = new HashMap<Class, MyRpcServiceGroup>();
MyRpcServiceGroup serviceInfo = new MyRpcServiceGroup();
serviceInfo.addNode(new MyRpcServiceNode("127.0.0.1",8888));
serviceInfoMap.put(DemoPrcInterface.class, serviceInfo);
MyRpcServiceContainer container = new MyRpcServiceContainer(serviceInfoMap);
//上面为服务发现,此处为api方式
final DemoPrcInterface intf = container.getService(DemoPrcInterface.class);
//System.out.println(intf.helloWithName("zxm"));
final DemoReq req = new DemoReq();
req.setUuid("123456");
for (int i = 0; i < 3; i++) {
new Thread(new Runnable() {
public void run() {
try {
System.out.println(intf.callRemoteService(req,"yyxl").getUuid());
}catch (Exception e){
e.printStackTrace();
}finally {
countDownLatch.countDown();
}
}
}).start();
}
countDownLatch.await();
}
}
Thread-2,客户端发起异步连接..........
Thread-1,客户端发起异步连接..........
Thread-0,客户端发起异步连接..........
nioEventLoopGroup-3-1,Server return Message:com.yyxl.myrpc.service.dto.MyRpcResponse@6585879b
nioEventLoopGroup-4-1,Server return Message:com.yyxl.myrpc.service.dto.MyRpcResponse@74997d68
java.lang.reflect.UndeclaredThrowableException
at com.sun.proxy.$Proxy3.callRemoteService(Unknown Source)
at com.yyxl.myrpc.service.consumer.MyRpcServiceContainerTest$1.run(MyRpcServiceContainerTest.java:32)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: 流控
at com.yyxl.myrpc.service.provider.MyRpcServiceImplProxy.call(MyRpcServiceImplProxy.java:57)
at com.yyxl.myrpc.service.provider.MyRpcServerHandler.channelRead(MyRpcServerHandler.java:26)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
java.lang.reflect.UndeclaredThrowableException
at com.sun.proxy.$Proxy3.callRemoteService(Unknown Source)
at com.yyxl.myrpc.service.consumer.MyRpcServiceContainerTest$1.run(MyRpcServiceContainerTest.java:32)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: 流控
at com.yyxl.myrpc.service.provider.MyRpcServiceImplProxy.call(MyRpcServiceImplProxy.java:57)
at com.yyxl.myrpc.service.provider.MyRpcServerHandler.channelRead(MyRpcServerHandler.java:26)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
nioEventLoopGroup-2-1,Server return Message:com.yyxl.myrpc.service.dto.MyRpcResponse@686c50af
123456##yyxl
我们可以看到,默认流控阀值为1,前2个请求直接返回流控异常,第3个返回正常调用结果。
参考
彻底理解Netty,这一篇文章就够了
Netty 入门示例详解
Netty之传输POJO(使用Java自带的序列化方式)
动态代理
【Java 笔记】Java 反射相关整理