- 我的技术博客:https://nezha.github.io,https://nezhaxiaozi.coding.me
- 我的简书地址:https://www.jianshu.com/u/a5153fbb0434
前言
在如今微服务盛行的时代,服务拆分越来越普遍,这时候就涉及远程调用的概念了,不同的服务提供者和消费者可能在不同的网络节点中,而现在最主要的微服务框架都是基于两大类:1.基于HTTP协议的Spring Cloud框架。2.基于TCP协议的Dubbo框架。
今天咱们主要来看看基于Dubbo来实现一个RPC框架有哪些工作。
下图是最简易版本的Dubbo架构,服务的提供方和服务的调用方都是独立的微服务,他们之间的关系是通过网络通信进行调用的。然后所有的服务提供者都需要注册到Register注册中心去;消费者在消费的时候都会去注册中心发现所有的服务提供者,然后通过负载均衡算法来确定去哪一个服务提供者。
设计思路
在理解Dubbo之后,咱们设想一下咱们的RPC框架需要设计到哪些东西。
-
序列化工具
:由于需要网络通信,所以对象需要可以序列化和反序列化。常用的一般有Gson、Jackson等,我们咱们就要原生方法吧。 -
动态代理
:由于需要在消费者端执行接口,然后此时可以直接获取到服务提供端的具体实现类的结果,这就需要使用JDK的动态代理方法了。 -
网络通信
:在动态代理的实现过程中肯定是需要进行网络通信获取相应的服务结果,这里可以使用Java BIO中的socket或者像Dubbo那样使用Netty。 -
Java反射
:消费端在网络请求的时候肯定会将类、方法、参数类型、参数值
这些传过来的,在服务提供端可以通过反射机制直接执行实现类,然后返回结果。 -
服务注册中心
:当然了在大面积微服务的框架下,服务的提供者在哪些网络节点中肯定不是写死在代码中的,这时候就需要一个配置中心去读取所有的服务提供者。一般可以使用Zookeeper、Redis或者Nacos来作为服务注册与发现中心。在本文章中,为了方便起见咱们就直接写死地址了,没有使用注册中心。 -
负载均衡
:如上所说,服务的提供者可能会有好多,所以每次的请求需要去哪一个服务提供者中去调用也是不确定,所有的请求都去同一个提供者那势必会造成网络阻塞的,所以负载均衡算法也是很有必要的。但是同上,为了简化咱们的RPC框架,咱们不会去实现负载均衡的功能。
代码实现
主要涉及的子模块有:demo-model
, rpc-api
, rpc-provider
, rpc-consumer
demo-model
:用户User对象类;
rpc-api
:对外暴露的接口;
rpc-provider
:接口的实现类和RPC服务的提供方;
rpc-consumer
:RPC服务的调用方
1. 用户对象-User
package com.nezha.learn.demo;
import java.io.Serializable;
/**
* @Description: User用户对象 <br>
* @Date: 2018/11/23 12:23 PM <br>
* @Author: zhangyi <br>
* @Version: 1.0 <br>
*/
public class User implements Serializable {
private static final long serialVersionUID = 1L;
private String name;
private Integer age;
public User(){
}
public User(String username, Integer age) {
this.name = username;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
@Override
public String toString() {
return "User String is:"+name+",age:"+age;
}
}
2. 暴露的接口-IUserService
package com.nezha.learn.demo;
/**
* @Description: 暴露的接口 <br>
* @Date: 2019/1/4 2:29 PM <br>
* @Author: objcat <br>
* @Version: 1.0 <br>
*/
public interface IUserService {
User findById(Long id);
}
3. 接口的实现(服务的提供者)
- 接口的实现
package com.nezha.learn.demo;
/**
* @Description: 实现接口的类 <br>
* @Date: 2019/1/4 3:07 PM <br>
* @Author: objcat <br>
* @Version: 1.0 <br>
*/
public class UserServiceImpl implements IUserService {
@Override
public User findById(Long id) {
User user = new User();
user.setName("nezha");
user.setAge(123);
return user;
}
}
- 服务提供者
package com.nezha.learn.demo;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @Description: RPC服务提供者 <br>
* @Date: 2019/1/4 3:06 PM <br>
* @Author: objcat <br>
* @Version: 1.0 <br>
*/
public class Provider {
public static void main(String[] args) {
try {
//1.Socket绑定本地端口
ServerSocket serverSocket = new ServerSocket(8888);
//2.监听端口
while (true){
Socket socket = serverSocket.accept();
//1.接收所有的参数
ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
String apiClassName = inputStream.readUTF();
String methodName = inputStream.readUTF();
Class[] paramTypes = (Class[]) inputStream.readObject();
Object[] args4Method = (Object[]) inputStream.readObject();
Class clazz = null;
//2.服务注册,找到具体的实现类
if (apiClassName.equals(IUserService.class.getName())){
clazz = UserServiceImpl.class;
}
//3.执行UserServiceImpl的方法
Method method = clazz.getMethod(methodName,paramTypes);
Object invoke = method.invoke(clazz.newInstance(),args4Method);
//4.返回结果给客户端
ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());
outputStream.writeObject(invoke);
outputStream.flush();
//5.关闭连接
outputStream.close();
inputStream.close();
socket.close();
}
}catch (Exception e){
e.printStackTrace();
}
}
}
4. 服务的调用者
package com.nezha.learn.demo;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;
/**
* @Description: 消费者 <br>
* @Date: 2019/1/4 2:33 PM <br>
* @Author: objcat <br>
* @Version: 1.0 <br>
*/
public class Consumer {
public static void main(String[] args) {
//1.获取代理类
IUserService userService = (IUserService) rpc(IUserService.class);
//2.触发InvocationHandler,进行远程代理
User user = userService.findById(123L);
System.out.println(user);
}
public static Object rpc(Class clazz){
return Proxy.newProxyInstance(
clazz.getClassLoader(),
new Class[]{clazz},
(proxy, method, args) ->
new InvocationHandler() {
//代理执行方法,上面设置了代理的类
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//1.建立远程连接
Socket socket = new Socket("127.0.0.1",8888);
//2.要调用的类、方法、参数
String apiName = clazz.getName();
String methodName = method.getName();
//为了鉴别方法的重载,这里需要传入参数类型
Class[] paramTypes = method.getParameterTypes();
//3.传输类信息,请求远程执行结果
ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());
outputStream.writeUTF(apiName);
outputStream.writeUTF(methodName);
outputStream.writeObject(paramTypes);
outputStream.writeObject(args);
outputStream.flush();
//4.接收返回的结果
ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
Object object = inputStream.readObject();
inputStream.close();
outputStream.close();
socket.close();
return object;
}
});
}
}
这里要注意啊!
必须使用代理类
,通过代理类可以对接口的方法进行拦截
,然后执行相应的远程调用!!!
如果想要我的源码的话在我的github中:java-learning-demo/demo-rpc/
参考文献
- 实现一个迷你版的RPC
- 如果对动态代理不是很熟悉的可以查看一下我的这篇文章:Java中的三种代理模式