在上一篇文章中,讲解了反射 与动态代理基本概念,没有看过点击此传送门。
接下来看看,动态代理在RPC中是如何使用的。在讲解过程中会去除掉一些无关的代码,如果读者相应看全部的代码,可以去开源项目的仓库中下载相关源码,进一步的专研,以下也会给出链接。
实例
1.第一个实例取自黄勇的轻量级分布式 RPC 框架 ,由于实现中通信框架使用了Netty,所以在分析中会有部分Netty代码的信息,不过不用担心,即使不懂Netty,讲解的过程中会尽量避免,并会突出反射与动态代理在其中的作用。
在rpc-simple-client中HelloClient.Class有如下代码
HelloService helloService = rpcProxy.create(HelloService.class);
String result = helloService.hello("World");
System.out.println(result);
这个代码做的是什么事呢?通过一个代理生成helloService对象,执行hello方法。
在我们印象中执行方法,最终都会执行的是接口中实现的方法。那事实是这样吗?看下面的分析。
在rpcProxy代码如下:
public <T> T create(final Class<?> interfaceClass, final String serviceVersion) {
// 创建动态代理对象
return (T) Proxy.newProxyInstance(
interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass},
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 创建 RPC 请求对象并设置请求属性
RpcRequest request = new RpcRequest();
request.setRequestId(UUID.randomUUID().toString());
request.setInterfaceName(method.getDeclaringClass().getName());
request.setServiceVersion(serviceVersion);
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
// 获取 RPC 服务地址
if (serviceDiscovery != null) {
String serviceName = interfaceClass.getName();
if (StringUtil.isNotEmpty(serviceVersion)) {
serviceName += "-" + serviceVersion;
}
serviceAddress = serviceDiscovery.discover(serviceName);
LOGGER.debug("discover service: {} => {}", serviceName, serviceAddress);
}
if (StringUtil.isEmpty(serviceAddress)) {
throw new RuntimeException("server address is empty");
}
// 从 RPC 服务地址中解析主机名与端口号
String[] array = StringUtil.split(serviceAddress, ":");
String host = array[0];
int port = Integer.parseInt(array[1]);
// 创建 RPC 客户端对象并发送 RPC 请求
RpcClient client = new RpcClient(host, port);
long time = System.currentTimeMillis();
RpcResponse response = client.send(request);
LOGGER.debug("time: {}ms", System.currentTimeMillis() - time);
if (response == null) {
throw new RuntimeException("response is null");
}
// 返回 RPC 响应结果
if (response.hasException()) {
throw response.getException();
} else {
return response.getResult();
}
}
}
);
}
从上面的代码可以看出经过了代理,执行hello方法,其实是发起一个请求。既然是一个请求,就是要涉及Client端与Server端,上面其实是一个Clent端代码。
那我们看看Server做了什么,去掉一个和本文所介绍不相关的代码,在RpcServerHandler中可以看核心代码如下
public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception {
Object result = handle(request);
response.setResult(result);
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); //返回,异步关闭连接
}
其中hanlde中重要实现如下
// 获取反射调用所需的参数,这些都是Client端传输给我们的。
Class<?> serviceClass = serviceBean.getClass();
String methodName = request.getMethodName();
Class<?>[] parameterTypes = request.getParameterTypes();
Object[] parameters = request.getParameters();
// 使用 CGLib 执行反射调用
FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
return serviceFastMethod.invoke(serviceBean, parameters);
2.第二个实例取自xxl-job分布式任务调度平台
说明:此开源项目的,RPC通信是用Jetty来实现的。
在xxl-job-admin中XxlJobTrigger.Class的runExecutor有如下
ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address); //根据地址拿到执行器
runResult = executorBiz.run(triggerParam);
做了很简单的是取出执行器,触发执行。但是进入getExecutorBiz方法你会发现如下
executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address,
accessToken).getObject();
executorBizRepository.put(address, executorBiz);
return executorBiz;
是不是很熟悉,没错,动态代理,看是NetComClientProxy的实现:
在结构上是不是和第一个实例中的rpcProxy代码,很相似呢。
new NetComClientProxy(ExecutorBiz.class, address, accessToken).getObject();做了什么呢
public Object getObject() throws Exception {
return Proxy.newProxyInstance(Thread.currentThread()
.getContextClassLoader(), new Class[] { iface },
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// request封装
RpcRequest request = new RpcRequest();
request.setServerAddress(serverAddress);
request.setCreateMillisTime(System.currentTimeMillis());
request.setAccessToken(accessToken);
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
// send发送
RpcResponse response = client.send(request);
// valid response
if (response == null) {
logger.error(">>>>>>>>>>> xxl-rpc netty response not found.");
throw new Exception(">>>>>>>>>>> xxl-rpc netty response not found.");
}
if (response.isError()) {
throw new RuntimeException(response.getError());
} else {
return response.getResult();
}
}
});
}
依旧是封装了一个RpcRequest ,发送请求。所以在 runResult = executorBiz.run(triggerParam)
其实是在发送一个请求。上面是Client端代码,照旧,接着看Server代码,你会发现还是似成相识。去掉与本文无关的代码,得到如下:
在xxl-job-core中JettyServerHandler.Class有
RpcResponse rpcResponse = NetComServerFactory.invokeService(rpcRequest, null);
点击进入:
public static RpcResponse invokeService(RpcRequest request, Object serviceBean) {
Class<?> serviceClass = serviceBean.getClass(); //类名
String methodName = request.getMethodName(); //方法名run
Class<?>[] parameterTypes = request.getParameterTypes(); //参数类型
Object[] parameters = request.getParameters(); //具体参数
FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
// 使用 CGLib 执行反射调用
Object result = serviceFastMethod.invoke(serviceBean, parameters);
response.setResult(result);
} catch (Throwable t) {
t.printStackTrace();
response.setError(t.getMessage());
}
return response;
}
根据反射生成具体的类,来执行相关的方法,达到想要的目的。
上面两个实例的过程可以用下图概括:
由于本人水平有限,有什么问题可以评论,喜欢的可以关注。