国庆期间闲来无事,看到掘金上一篇文章《徒手撸框架--实现 RPC 远程调用》,觉得写的很不错,也推荐大家阅读一下。于是自己也趁机实现了一个简单的RPC框架,与他不同的是,我使用了etcd作为注册中心来实现服务注册与服务发现的功能。具体的内容请看下文~~。
先附上Github : https://github.com/AlexZFX/easyrpc
整体概述
先看一下一个简单的RPC的调用流程
- Server端的服务注册
- Client端获取服务提供者的信息,保存在本地,定时更新
- Client收到请求,对提供相应服务的Server发起请求
- Server通过反射调用本地的服务类的方法,将结果或出现的异常返回。
那我们在写一个RPC框架时,应该考虑的问题就应该包括以下几点
- 服务注册与发现
- Client端服务的代理
- 请求的发送与处理
- Server端方法的调用与结果返回
这四点内部又有一些细节要处理,下面我会对这几点进行描述,并给出我自己的实现。
服务注册与发现
对于客户端和服务端来说,我们希望我们提供的服务是非侵入式的,也就是对客户端或者服务端本身的服务代码无影响。而这样最便捷的方式便是通过注解来实现。
于是我定义了两个注解 @RpcInterface 和 @RpcService
注解定义如下
@RpcInterface
/**
* Description : 注解于实现的接口类上,表示该类是用于使用远程 rpc服务的 class,其中的method都会通过动态代理调用到远程的服务端
*/
//注解的声明周期为始终不会丢弃
@Retention(RetentionPolicy.RUNTIME)
//注解的使用地点为 类,接口或enum声明
@Target(ElementType.TYPE)
@Documented
public @interface RpcInterface {
}
@RpcService
/**
* Description : 注解于实现了接口的服务类上,表示该类是用于提供rpc服务的 class,其中的method都会被注册到etcd中
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
public @interface RpcService {
}
注解的作用和解释写在了注释里,有了这两个注解之后,我们就需要对注解进行处理,@RpcService 对应将相应服务接口名注册在etcd上,@RpcInterface 对应查找注册中心中的服务名,在本地通过动态代理将本地的方法的结果修改为远程调用得到的结果。
服务的注册和发现通过一个封装好的 EtcdRegistry 来实现,这里的代码主要参考了阿里第四届中间件性能大赛初赛的服务注册发现代码,只做了一点点小的修改。
完整内容还是看Github。
Register方法和Find方法的内容如下~
//注册类名,一个类对应一个client
@Override
public void register(String serviceName, int port) throws Exception {
String strKey = MessageFormat.format("/{0}/{1}/{2}:{3}", ROOTPATH, serviceName, getHostIp(), String.valueOf(port));
ByteSequence key = ByteSequence.fromString(strKey);
// 目前只需要创建这个key,对应的value暂不使用,先留空
ByteSequence val = ByteSequence.fromString("");
//等待put结束之后继续运行
kv.put(key, val, PutOption.newBuilder().withLeaseId(leaseId).build()).get();
log.info("Register a new service at :" + strKey);
}
private String getHostIp() throws UnknownHostException {
return Inet4Address.getLocalHost().getHostAddress();
}
@Override
public List<EndPoint> find(String serviceName) throws Exception {
String strkey = MessageFormat.format("/{0}/{1}", ROOTPATH, serviceName);
log.info("start to find service, Name :" + strkey);
ByteSequence key = ByteSequence.fromString(strkey);
GetResponse response = kv.get(key, GetOption.newBuilder().withPrefix(key).build()).get();
List<EndPoint> list = new ArrayList<>();
response.getKvs().forEach(kv -> {
String s = kv.getKey().toStringUtf8();
int index = s.lastIndexOf("/");
String endPointStr = s.substring(index + 1, s.length());
String host = endPointStr.split(":")[0];
int post = Integer.parseInt(endPointStr.split(":")[1]);
list.add(new EndPoint(host, post));
});
return list;
}
对注解的处理方面,为了方便,我没有自己写一整套扫描包获取注解的方法,而是使用了Java的开源反射框架 Reflections ,简单的使用如下。
Reflections reflections = new Reflections(packagePath);
Set<Class<?>> classes = reflections.getTypesAnnotatedWith(RpcService.class);
这样我们就获取到的标识了相应注解的类。
在微服务中,我们往往通过实现同一个接口来保证客户端方法和服务端方法的同步,所以我们在注册过程中也应该注册的是接口名,这样保证了服务的可拓展性。
这些具体的操作都被放在了 ServerMain 这个类中,服务端的启动也是通过这个类的start方法来实现。
@Slf4j
public class ServerMain {
private static final int DEFAULT_SERVER_PORT = 8890;
private IRegistry registry;
private int port;
private final String packagePath;
public ServerMain(String packagePath) {
this(packagePath, new EtcdRegistry());
}
public ServerMain(String packagePath, IRegistry registry) {
this.registry = registry;
this.packagePath = packagePath;
this.port = System.getProperty("server.port") == null ? DEFAULT_SERVER_PORT : Integer.parseInt(System.getProperty("server.port"));
}
public void start() {
Reflections reflections = new Reflections(packagePath);
Set<Class<?>> classes = reflections.getTypesAnnotatedWith(RpcService.class);
classes.forEach(clazz -> {
try {
Class<?>[] interfaces = clazz.getInterfaces();
String clazzName = clazz.getName();
if (interfaces != null && interfaces.length > 0) {
//简单实现,所以只获取了第一个interface的name,实际上并不准确,可能有误。
clazzName = interfaces[0].getName();
}
//注册的是 接口名 和 服务实例
//clazzMap是用来保存一个实例对象,相当于服务端的单例
ServerHandler.clazzMap.put(clazzName, clazz.newInstance());
registry.register(clazzName, port);
} catch (Exception e) {
log.error("register service failed : " + e.getLocalizedMessage(), e);
}
});
// //新开线程的话会程序会退出(如果在springboot的构造函数中则另开线程启动,否则会阻塞项目的启动)
// new Thread(() -> {
Server server = new Server(port);
server.start();
// }).start();
}
}
客户端也通过一个 ClientServer 类实现了客户端服务的启动工作,将注解处理,服务查找和缓存等任务进行实现。
find方法返回的结果是一个EndPoint的list,对应提供相应服务的n个节点,每个相同的endpoint对应了我封装的一个netty的client,具体会在client的部分讲明~~
@Slf4j
public class ClientServer {
private IRegistry registry;
//设置一个endpoint使用一个client,netty高效理论上满足使用
private static ConcurrentHashMap<EndPoint, Client> clientMap = new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, List<EndPoint>> serviceMap = new ConcurrentHashMap<>();
private final String packagePath;
private static final Random random = new Random();
public ClientServer(String packagePath) {
this.packagePath = packagePath;
this.registry = new EtcdRegistry();
}
public void start() {
Reflections reflections = new Reflections(packagePath);
Set<Class<?>> classes = reflections.getTypesAnnotatedWith(RpcInterface.class);
EventLoopGroup eventLoopGroup = Epoll.isAvailable() ? new EpollEventLoopGroup(4) : new NioEventLoopGroup(4);
//定时任务线程池,定时更新服务列表,设置为3分钟
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2);
classes.forEach(clazz -> executorService.scheduleAtFixedRate(() -> {
try {
//拿到当前仍在注册中心中的相应服务列表
// TODO 删除掉对应失效的endpoint
Class<?>[] interfaces = clazz.getInterfaces();
String className = clazz.getName();
if (interfaces != null && interfaces.length > 0) {
className = interfaces[0].getName();
}
List<EndPoint> list = registry.find(className);
serviceMap.put(className, list);
list.forEach(endPoint -> {
if (clientMap.get(endPoint) == null) {
//所有的Client共用一个EventLoopGroup
Client client = new Client(endPoint.getHost(), endPoint.getPort(), eventLoopGroup);
clientMap.put(endPoint, client);
}
});
} catch (Exception e) {
e.printStackTrace();
}
}, 0, 3 * 60, TimeUnit.SECONDS));
}
public static Client getClient(String serviceName) {
List<EndPoint> endPoints = serviceMap.get(serviceName);
// 简单的负载均衡,只使用了随机选择
if (endPoints != null) {
EndPoint endPoint = endPoints.get(random.nextInt(endPoints.size()));
return clientMap.get(endPoint);
}
return null;
}
}
以上就完成了服务注册与发现的工作,同时也提供了非常简单易用的启动接口,等会看看example就知道啦。
Client端的服务代理
浏览器的请求被发送到Client端后,虽然表面上是通过本地的方法返回,但实质上其实是远程方法的调用,这主要是动过Java中的动态代理来实现的。
这个项目中的动态代理是通过CGLIB来实现的,关于CGLIB和JDK原生动态代理的区别,我就不细述,网上有很多相关的文章。
先声明一个ProxyFactory类,用于在注入服务对象时生成代理
public class ProxyFactory {
public static <T> T create(Class<T> clazz) {
Enhancer enhancer = new Enhancer();
enhancer.setSuperclass(clazz);
enhancer.setCallback(new ProxyIntercepter());
return (T) enhancer.create();
}
}
在调用代理类的方法时,实际上会调用到 ProxyIntercepter 中的intercepter方法。所以实际上的请求会在intercepter方法里发送。
ProxyIntercepter 主要内容如下
@Slf4j
public class ProxyIntercepter implements MethodInterceptor {
@Override
public Object intercept(Object o, Method method, Object[] parameters, MethodProxy methodProxy) throws Throwable {
RpcRequest rpcRequest = new RpcRequest();
Class clazz = method.getDeclaringClass();
Class<?>[] interfaces = clazz.getInterfaces();
//存在接口时使用的是接口名称
String clazzName = clazz.getName();
if (interfaces != null && interfaces.length > 0) {
clazzName = interfaces[0].getName();
}
rpcRequest.setClassName(clazzName);
rpcRequest.setServiceName(method.getName());
rpcRequest.setParameterTypes(method.getParameterTypes());
rpcRequest.setParameters(parameters);
Client client = ClientServer.getClient(rpcRequest.getClassName());
RpcFuture rpcFuture;
if (client != null) {
ChannelFuture channelFuture = client.connectChannel();
rpcFuture = new RpcFuture(channelFuture.channel().eventLoop());
if (channelFuture.isSuccess()) {
sendRequest(rpcRequest, rpcFuture, channelFuture);
} else {
channelFuture.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
sendRequest(rpcRequest, rpcFuture, future);
} else {
log.error("send request error ", future.cause());
}
});
}
//这里没有用listener & getNow的方式获取主要是考虑客户端本身非异步的情形,同时是为了简便实现。
RpcResponse rpcResponse = rpcFuture.get(5, TimeUnit.SECONDS);
if (rpcResponse.getException() == null) {
return rpcResponse.getResult();
} else {
throw rpcResponse.getException();
}
} else {
log.error("no rpcService is available :" + rpcRequest.getClassName());
return null;
}
}
private void sendRequest(RpcRequest rpcRequest, RpcFuture rpcFuture, ChannelFuture channelFuture) {
channelFuture.channel().writeAndFlush(rpcRequest)
.addListener((ChannelFutureListener) writefuture -> {
if (writefuture.isSuccess()) {
FutureHolder.registerFuture(rpcRequest.getRequestId(), rpcFuture);
log.info("send request success");
} else {
rpcFuture.tryFailure(writefuture.cause());
log.error("send request failed", writefuture.cause());
}
});
}
}
可以看到我仍然是获取了方法所在类的接口,并根据接口名查找了相应的Client对象,发送请求。
请求的发送与处理
为了服务端能成功利用反射进行方法调用,客户端的请求应该包含一些参数,在 RpcRequest 类中。
@Data
public class RpcRequest {
private static AtomicLong atomicLong = new AtomicLong(0);
// 请求Id netty的请求是异步的,为了复用连接,一般会带个id,这样在收到返回信息的时候能一一对应。
private long requestId;
// 类名
private String className;
// 服务名
private String serviceName;
// 参数类型
private Class<?>[] parameterTypes;
// 参数
private Object[] parameters;
public RpcRequest() {
this.requestId = atomicLong.getAndIncrement();
}
public RpcRequest(long requestId) {
this.requestId = requestId;
}
}
服务端返回的数据包含的内容如下
@Data
public class RpcResponse {
private long requestId;
private Throwable exception;
private Object result;
public RpcResponse(long requestId) {
this.requestId = requestId;
}
public RpcResponse() {
}
}
Client端和Server端均使用了Netty作为网络框架。
其实利用HTTP协议+Json也可以完成我们的基本需求,但是在大部分Rpc框架中,为了提高性能,往往都会自定义一个满足需求的协议,并采用一些更为高效的序列化方案,如ProtoBuf,Tyro等。 本项目采用了ProtoStuff作为序列化方案, 和ProtoBuf相比主要是省去了初始生成proto文件等步骤,提供了较为易用的接口。(网上说ProtoStuff在序列化一些集合类的时候会有bug,我自己测试了一下HashMap和ArrayList,都没有出现问题,就没有专门去解决这一问题)
编写了一个ProtoStuffUtil的工具类,提供serializer和deserializer方法(网上有好多这种代码,参考了一些,因为很多文章,就没有标注来源了)。
public class ProtoStuffUtil {
public static <T> byte[] serializer(T o) {
Schema schema = RuntimeSchema.getSchema(o.getClass());
return ProtostuffIOUtil.toByteArray(o, schema, LinkedBuffer.allocate());
}
public static <T> T deserializer(byte[] bytes, Class<T> clazz) {
Schema<T> schema = RuntimeSchema.createFrom(clazz);
T message = schema.newMessage();
ProtostuffIOUtil.mergeFrom(bytes, message, schema);
return message;
}
}
工具类提供的方法在Netty的Encoder和Decoder中使用(参考了这篇文章《使用netty结合Protostuff传输对象例子》),这两个类会在Netty启动时被我利用一个Initializer添加到Netty的pipeline中去,关于Netty的责任链模式可以查看其他相关文章。
RpcEncoder
public class RpcEncoder extends MessageToByteEncoder {
private Class<?> targetClazz;
public RpcEncoder(Class<?> targetClazz) {
this.targetClazz = targetClazz;
}
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
if (targetClazz.isInstance(msg)) {
byte[] data = ProtoStuffUtil.serializer(msg);
out.writeInt(data.length);
out.writeBytes(data);
}
}
}
RpcDecoder
public class RpcDecoder extends ByteToMessageDecoder {
private Class<?> targerClass;
public RpcDecoder(Class<?> targerClass) {
this.targerClass = targerClass;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.readableBytes() < 4) {
return;
}
in.markReaderIndex();
int dataLen = in.readInt();
if (dataLen < 0) {
ctx.close();
}
if (in.readableBytes() < dataLen) {
in.resetReaderIndex();
return;
}
byte[] data = new byte[dataLen];
in.readBytes(data);
Object obj = ProtoStuffUtil.deserializer(data, targerClass);
out.add(obj);
}
}
接下来来说用于发送请求的Client类,为了复用TCP连接,Netty的channel都添加了KEEPALIVE的参数,同时,我也在每次获取了一个EndPoint的属性后建立一个新的Client,储存在了ClientMap中。每个Client是对应一个EndPoint的,如果并发量大的话,其实可以多个Client对应于一个EndPoint,这时就可以将我们的Client设计为一个连接池,也可以是Netty自身提供的连接池,这方面扩展我就不细讲,我github上的iustu-agent里面Connection那一块本来有用到连接池,后来发现性能并没有什么提升就没再用了。
Client本质上就是一个Netty的Client,我设置成同一个客户端的所有Client都会共用一个EventLoopGroup,主要是为了资源的合理利用吧。因为负载不高的情况下同一个EventLoopGroup其实是够用的,并且还会有浪费。
Client 的构成如下
@Slf4j
public class Client {
private EventLoopGroup eventLoopGroup;
private Channel channel;
private ChannelFuture channelFuture;
private String host;
private int port;
public Client(String host, int port) {
this(host, port, Epoll.isAvailable() ? new EpollEventLoopGroup(1) : new NioEventLoopGroup(1));
}
public Client(String host, int port, EventLoopGroup eventLoopGroup) {
this.host = host;
this.port = port;
this.eventLoopGroup = eventLoopGroup;
}
public ChannelFuture connectChannel() {
if (channelFuture == null) {
channelFuture = new Bootstrap().group(eventLoopGroup)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT)
.channel(Epoll.isAvailable() ? EpollSocketChannel.class : NioSocketChannel.class)
.handler(new ClientInitialzer())
.connect(host, port)
.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
channel = future.channel();
log.info("start a client to " + host + ":" + port);
channel.closeFuture().addListener((ChannelFutureListener) closefuture -> {
log.info("stop the client to " + host + ":" + port);
});
} else {
log.error("start a Client failed", future.cause());
}
})
;
}
return channelFuture;
}
public Channel getChannel() {
if (channel != null) {
return channel;
} else {
channelFuture = connectChannel();
return channelFuture.channel();
}
}
}
可以看见只有第一次调用 connectChannel 方法的时候才会真正的向服务端发起连接,这会返回一个channelFuture,但是channelFuture的结果并不一定已经成功了。
所以在 ProxyIntecepter 里我也做了相应的处理,完整代码上面已附。处理部分的代码如下
ChannelFuture channelFuture = client.connectChannel();
rpcFuture = new RpcFuture(channelFuture.channel().eventLoop());
if (channelFuture.isSuccess()) {
sendRequest(rpcRequest, rpcFuture, channelFuture);
} else {
channelFuture.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
sendRequest(rpcRequest, rpcFuture, future);
} else {
log.error("send request error ", future.cause());
}
});
}
请求发送的时候本地会在 RpcHolder(内部使用了ThreadLocal的HashMap) 中存储一个 RpcFuture(实质上只是继承了Netty中的DefaultPromise),与唯一的 RequestId 相对应,rpcfuture.get()会先wait,并在请求得到返回之后被notify并返回结果,如果结果中包含Exception,则会抛出异常。
服务端调用与返回
相比于Client而言,Server端的实现就简单了许多,只是开启了一个ServerBootStarp并绑定在指定的端口上接受请求。
收到请求后,会通过请求中携带的服务名查找到相应的对象(在注册服务的同时被添加到clazzMap中去的impl实例),并利用反射调用其中方法,对结果进行返回。这里用的也是CGLIB中提供的FastClass来实现反射。
主要的代码都在 ServerHandler 中了
@Slf4j
@ChannelHandler.Sharable
public class ServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
public static ConcurrentHashMap<String, Object> clazzMap = new ConcurrentHashMap<>();
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception {
log.info("recieve a request id : " + msg.getRequestId());
RpcResponse rpcResponse = getResponse(msg);
ctx.writeAndFlush(rpcResponse).addListener((GenericFutureListener<ChannelFuture>) future -> {
if (!future.isSuccess()) {
log.error(future.cause().getLocalizedMessage());
}
});
}
private RpcResponse getResponse(RpcRequest rpcRequest) {
RpcResponse rpcResponse = new RpcResponse(rpcRequest.getRequestId());
try {
Class<?> clazz = Class.forName(rpcRequest.getClassName());
Object c = clazzMap.get(rpcRequest.getClassName());
if (c == null) {
clazzMap.put(rpcRequest.getClassName(), clazz.newInstance());
c = clazzMap.get(rpcRequest.getClassName());
}
String methodName = rpcRequest.getServiceName();
Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
Object[] parameters = rpcRequest.getParameters();
FastClass fastClass = FastClass.create(clazz);
FastMethod fastMethod = fastClass.getMethod(methodName, parameterTypes);
//与Spring联合使用时应该调用ApplicationContext里面的已有的bean
Object result = fastMethod.invoke(c, parameters);
rpcResponse.setResult(result);
} catch (ClassNotFoundException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
e.printStackTrace();
rpcResponse.setException(e);
}
return rpcResponse;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof IOException) {
if (cause.getLocalizedMessage().equals("远程主机强迫关闭了一个现有的连接。")) {
log.info("一个客户端连接断开。");
return;
}
}
super.exceptionCaught(ctx, cause);
}
}
试着使用一下~
先将当前的项目安装到本地仓库中去以供其他项目使用~
mvn install
这样就OK了,最好还是用idea做这些比较方便
然后新建一个SpringBoot项目,加入web的组件,添加三个模块,组成如下图
同时在pom.xml中加入easyrpc的引用
<dependency>
<groupId>com.alexzfx</groupId>
<artifactId>easyrpc</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
在common包中,我们在com.alexzfx.easyrpc.com
目录下创建 HelloService 接口,声明 sayHello方法
package com.alexzfx.easyrpc.common;
public interface HelloService {
String sayHello();
}
然后在server包中创建一个实现类,标识 @RpcService 注解
package com.alexzfx.easyrpc.server;
import com.alexzfx.easyrpc.commom.annotation.RpcService;
import com.alexzfx.easyrpc.common.HelloService;
@RpcService
public class HelloServiceImpl implements HelloService {
@Override
public String sayHello() {
return "Hello EasyRPC";
}
}
在client包中也创建一个实现类,标识 @RpcInterface 注解,但只是实现,不做其他方法内容的实现
package com.alexzfx.easyrpc.client;
import com.alexzfx.easyrpc.commom.annotation.RpcInterface;
import com.alexzfx.easyrpc.common.HelloService;
@RpcInterface
public class HelloServiceImpl implements HelloService {
@Override
public String sayHello() {
return null;
}
}
在server包中创建一个main方法,实现我们的服务启动功能
package com.alexzfx.easyrpc.server;
public class ServerApplication {
public static void main(String[] args) {
ServerMain serverMain = new ServerMain("com.alexzfx.easyrpc.server");
serverMain.start();
}
}
在client端中实现一个controller,并返回 sayHello 方法的结果,启动前要先完成我们client端的启动工作,并将本地使用的服务类创建为我们的代理类(因为本项目没有完全对应Spring做配置,如果是专门为了和Spring集成的话,可以直接在applicationContext加载的时候将代理类添加进去,这样就可以使 @Autowire 注解注入的是我们的代理类了)。
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
@RestController
public class ClientApplication {
public ClientApplication() {
ClientServer clientServer = new ClientServer("com.alexzfx.easyrpc.client");
clientServer.start();
helloService = ProxyFactory.create(HelloServiceImpl.class);
}
private HelloService helloService;
public static void main(String[] args) {
SpringApplication.run(ClientApplication.class);
}
@RequestMapping("/")
public String sayhello() {
return helloService.sayHello();
}
}
这样,我们整体的测试项目就构建完成了。
启动etcd,Windows上只要下载链接中相应的版本,解压缩后直接双击启动 etcd.exe 即可,其他系统也类似。
启动 server 端的 main 函数,启动时注册了服务,所在端口等日志都会被打印出来。
再启动client端 SpringBoot的 main 函数。
访问 http://localhost:8080 你就可以看到 Hello EasyRPC
的出现啦。
总结
通过以上的内容,实现了一个简单易用,包括了服务注册与发现功能的RPC框架。实际上一个可通用的RPC框架,要处理的事情远远不止上面做的这么简单,还需要包括监控,熔断等等许多功能。
这是我第一次写完整的技术类博客,有许多不足的地方,希望发现的老哥帮忙指正~
如果你觉得还不错的话,希望能给我的这个项目点个小小的 star ,这就是对我最大的鼓励啦。EasyRPC