手写简单RPC框架

上篇我们看了RMI的服务注册与客户端请求调用的原理与源码实现,大概了解了RMI是如何实现的简单rpc服
务,那么本篇我们就来仿写一个简易的RMI类的rpc请求调度框架模型

首先我们在编写代码之前,思考一下,一个简单的rpc需要实现什么,即如果实现rpc通信,我们如何调用对应的方法,并且如何实现两个服务之间的互通与互相调用操作。

客户端与服务端如何互通

java开发过程中常见的交互方式有两种,一种为轮询方式的交互,不停循环获取数据,直到获取或者退出为止,还有一种方式为推拉/发布订阅模式的数据交互,根据这两种方式目前也有很多种主流服务框架的实现,而这里我们选择使用sokect通讯,利用轮询操作实现数据交互

如何实现调用服务的方法

常见的rpc都是基于服务进行请求交互,我们也从RMI的源码中看到了,RMI将服务的class作为url,远程对象作为引用值存入map中,那么我们也可以将服务类以及方法等参数封装,传递到远程连接对象中,这样只要sokect进行交互的时候获取远程连接对象,其中持有的服务信息都可以获取到了

构建服务端

前面确定了思路以后,我们可以开始编码了,首先是服务端代码的编写,而服务端我们要实现两点,第一实现长连接,并且可以轮询获取请求,解析请求并获取当前服务端对应的实例将结果返回,第二我们需要能够动态获取到请求来的信息,所以我们可以考虑使用动态代理技术处理接口请求与实例之间的关系

那么我们开始编写代码,首先我们看下服务端代码的结构:

自定义rpc服务端.png

可以看到有对外发布的服务IHelloService,服务的实现类HelloServiceImpl,sokect请求处理器ProcessorHanlder以及真正启动服务的RPCServer类,其中IHelloService作为对外公开的服务,需要在服务端和客户端使用同一份,一般在企业开发的时候往往会作为依赖进行加载,这里我们并没有将服务单独拆分出去进行实现,而是选择使用同一份服务代码,来实现动态代理、序列化以及反序列化操作。而RpcRequest类用来将我们的请求相关参数以及服务相关信息进行封装,作为客户端与服务端传输的基类实现。

通用服务--IHelloService

我们首先定义一个对外的rpc通用服接口,代码如下:

/**
 * 对外公开的服务
 */
public interface IHelloService {
   String getResource(String type);
}

此服务中仅有一个getResource方法,根据传入的type内容的不同返回不同的内容,我们将此接口实现:

/**
 * IHelloService服务实现
 */
public class HelloServiceImpl implements IHelloService {

    @Override
    public String getResource(String type) {
        if(null == type || "".equals(type.trim())){
            return "请不要传输空数据好吗?";
        }
        if("json".equals(type.trim())){
            return "{className:HelloServiceName,methodName:getResource,args:json}";
        }
        return "找不到匹配的数据";
    }

}
请求类构建--RpcRequest

客户端与服务端实现固定的交互传输,需要访问的远程服务接口类信息、请求的方法以及参数等封装进去,并且由于RpcRequest类作为远程服务请求的载体,所以同时也需要实现序列化操作,此时一个简单的rpc请求载体类--RpcRequest构建完成,如下:

/**
 * 服务请求类
 * @desc
 */
public class RpcRequest implements Serializable{
    private static final long serialVersionUID = -601583969544463744L;
    private String className;//远程服务接口类名
    private String methodName;//需要调用的方法名
    private Object[] args;//当前方法需要的参数
    
    public String getClassName() {
        return className;
    }
    
    public void setClassName(String className) {
        this.className = className;
    }
    
    public String getMethodName() {
        return methodName;
    }
    
    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }
    
    public Object[] getArgs() {
        return args;
    }
    
    public void setArgs(Object[] args) {
        this.args = args;
    }
    
    public RpcRequest(){}
    
    public RpcRequest(String className, String methodName, Object[] args) {
        this.className = className;
        this.methodName = methodName;
        this.args = args;
    }

    @Override
    public String toString() {
        return "RpcRequest [className=" + className + ", methodName=" + methodName + ", args=" + Arrays.toString(args)
                + "]";
    }
}
请求处理器--ProcessorHanlder

当我们发起一个请求,将封装好的远程服务信息传递到服务端的时候,往往会按照固定的规则,将远程服务信息解析,并且处理构建获取远程服务实例,进行操作,这个时候,我们可以考虑将解析、构建、调用的过程封装成一个hanlder处理器,实现通用的服务解析与方法调用操作,如下:

/**
 * 服务请求处理器
 */
 public class ProcessorHanlder implements Runnable{
    
    private Socket socket;
    private Object service;//服务的impl

    public ProcessorHanlder(final Socket socket,final Object service) {
        this.socket = socket;
        this.service = service;
    }

    public ProcessorHanlder() {
    }

    public Socket getSocket() {
        return socket;
    }

    public void setSocket(Socket socket) {
        this.socket = socket;
    }

    public Object getService() {
        return service;
    }

    public void setService(Object service) {
        this.service = service;
    }

    @Override
    public void run() {
        try(ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream())){
            RpcRequest request = (RpcRequest) objectInputStream.readObject();
            //反射触发请求服务返回结果
            Object invoke = invoke(request);
            //根据输出流返回给客户端
            outputStream.writeObject(invoke);
            outputStream.flush();
        }catch (Exception e) {
            try {
                socket.close();
            } catch (Exception e2) {
                System.err.println("==》service关闭socket失败:"+e2.getMessage());
            }
        }
    }
    
    /**
     * 反射调用服务接口
     * @param request
     * @return
     * @throws SecurityException 
     * @throws NoSuchMethodException 
     * @throws InvocationTargetException 
     * @throws IllegalArgumentException 
     * @throws IllegalAccessException 
     */
    private Object invoke(RpcRequest request) throws NoSuchMethodException, SecurityException, IllegalAccessException, IllegalArgumentException, InvocationTargetException{
        Object[] args = request.getArgs();//获取请求参数
        String methodName = request.getMethodName();//获取请求方法名
        //创建参数类型数组
        Class<?>[] argClasses = new Class<?>[args.length];
        for (int i = 0; i < args.length; i++) {
            Class<? extends Object> type = args[i].getClass();
            argClasses[i] = type;
        }
        Method method = service.getClass().getMethod(methodName, argClasses);
        //反射调用方法
        return method.invoke(service, args);
    }
}
启动服务

当一切准备就绪后,我们需要将服务端启动,发布出去,使其客户端可以访问,这里由于存在并发访问场景,我们考虑加入了线程池机制,使用线程池操作每个线程请求:

/**
 * rpc服务类
 */
public class RPCServer {
  private static final ExecutorService SERVICE = Executors.newCachedThreadPool();//创建无固定容量的缓存池
  
  public void listener(final Object service,int port){
    try(ServerSocket serverSocket = new ServerSocket(port);){
        for(;;){
            Socket accept = serverSocket.accept();
            //挂起线程池来处理
            SERVICE.execute(new ProcessorHanlder(accept, service));
        }
    } catch (Exception e) {
        System.err.println("==>启动rpc服务失败:"+e.getMessage());
    }
  }
}

现在我们来将服务端启动,将其发布在8080端口上:

public static void main(String[] args) {
    RPCServer rpcServer = new RPCServer();
    IHelloService helloService = new HelloServiceImpl();
    rpcServer.listener(helloService,8080);
}

客户端请求

客户端负责连接服务端,将请求封装传输到服务端,而每次请求的具体参数和服务都是动态的,所以我们使用动态代理机制,将客户端的rpc操作封装,构建动态请求。首先我们看下客户端代码目录:

自定义rpc客户端.png

可以看到有动态代理rpc操作的RPCClientProxy类,封装了远程通讯连接与数据传输操作的TCPTransport类,还有将代理的请求进行处理的RemoteInvocationHandler类,除此之外客户端也必须包含远程服务接口IHelloService以及请求封装类RpcRequest(同一个工程中只需要一份,独立工程建议作为依赖引入),接下来我们来看看客户端的请求操作

动态请求操作

在java中可以使用动态代理机制,实现某一操作的动态处理,而实现过程则是需要将真正操作的类实现InvocationHandler接口,在invoke方法中拿到代理对象进行操作即可,而代理对象类中需要提供构建Proxy.newProxyInstance方法创建的代理对象的方法,其参数固定为ClassLoader类加载器需要实现的Class<?>[]接口类型以及需要实现动态代理的InvocationHandler接口子类实例,接下来我们来根据动态代理机制构建一个动态请求操作:

/**
 * 代理请求操作--处理器
 */
public class RemoteInvocationHandler implements InvocationHandler {
    private String host;//服务端所在地址
    private int port;//服务所在端口
    
    public RemoteInvocationHandler(String host, int port) {
        this.host = host;
        this.port = port;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        //组装请求参数
        RpcRequest request = new RpcRequest();
        request.setClassName(method.getDeclaringClass().getName());//获取方法所在类的name
        request.setMethodName(method.getName());
        request.setArgs(args);
        //创建TCP传输类 ==>并且返回当前返回的数据
        TCPTransport tcpTransport = new TCPTransport(host, port);
        return tcpTransport.handlerMsg(request);
    }

}

接着,我们实现具体动态代理类实例的构建:

/**
 * 动态代理rpc操作类
 */
public class RPCClientProxy {
    
    @SuppressWarnings("unchecked")
    public <T> T clientProxy(final Class<T> interfaceClass,final String host,final int port){
        return (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(),new Class[]{interfaceClass},new RemoteInvocationHandler(host, port));
    }
}
启动客户端

当我们完成客户端访问服务端的请求的动态代理操作后,这个时候我们可以尝试连接刚刚启动的IHelloService远程服务了:

public static void main(String[] args) {
     RPCClientProxy clientProxy = new RPCClientProxy();
     //开始接口调用
     IHelloService helloService = clientProxy.clientProxy(IHelloService.class,"localhost",8080);
     System.out.println("收到回复的消息:"+helloService.getResource("json"));
 }

这个时候可以看到控制台输出的服务接口返回的内容,至此一个简易的仿rpc请求操作完成


控制台输出.png
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,491评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,856评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,745评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,196评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,073评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,112评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,531评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,215评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,485评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,578评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,356评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,215评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,583评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,898评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,497评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,697评论 2 335

推荐阅读更多精彩内容