自己动手实现RPC

动手实现RPC

最近利用业于时间实现一个了一个简简单单的 RPC demo,在这里过程中,遇到了几个问题,也收获一些东西,分享一下这个过程.

前期准备

RPC 是我们在企业开发中比较常见的,同时也是比较熟悉的,但是对于开发一个 RPC 来说,我们需要掌握一些基本的理论知识.

1、RPC 是网络通信(进程之间的通信),所以相比单进程的通信来的慢

2、网络传输的是 字节 ,而不是 字符,或者其他的传输媒介.

3、了解or掌握了 动态代理的运用

本质是传输信息,将 我(消费端) 所知的信息通过通信告诉你 (服务端) ,然后你给我返回最终的信息.

动手实现

有了上述的前期准备之后,先剖析一下我们是如何使用 RPC ,服务提供端需要将服务暴露出去,服务消费端需要接入需要的服务。这个过程如下所示,我的讲述也围绕这个过程展开.

服务端暴露服务 --> 消费端引用服务 --> 动态代理 --> 序列化 --> 网络请求 ---> 服务端处理 --> 序列化返回 --> 消费端返回结果.

服务暴露

服务暴露就是将服务发布到(zookeeper)上,提供给其他的消费者访问,当消费者拿到这个信息之后,就可以连接服务端,并发起请求。
暴露就是需要将该服务的基本信息发布出去,这里列举的主要信息有 服务机器host 服务监听端口 服务接口 服务序列化方式 服务权重 服务版本
代码如下

1、服务暴露的信息

    public class Provider implements Serializable, Cloneable {
        private String serviceName;
        private String host;
        private Integer port;
        private String version;
        private Integer weight;
        private String serialization;
    
        get and set
    }

2、暴露服务的过程,这里以 zookeeper 为例,主要是将服务以节点的形式添加到 zk

    @Override
    public void registerService(List<Provider> providerList) {
        assert zkClient != null;
        providerList.parallelStream().forEach(provider -> {
            String host = provider.getHost();
            Integer port = provider.getPort();
            String serviceName = provider.getServiceName();
            String version = provider.getVersion();
            Integer weight = provider.getWeight();
            String serverPath = root_path + "/" + serviceName + root_provider;
            if (!zkClient.exists(serverPath)) {
                zkClient.createPersistent(serverPath, true);
            }
            String finalInfo = host + split + port + split + serviceName + split + version + split + weight + split + provider.getSerialization();
            String path = serverPath + "/" + finalInfo;
            if (!zkClient.exists(path)) {
                log.info("注册服务:{}到ZooKeeper", serverPath);
                zkClient.createEphemeral(path);
            } else {
                log.warn("服务:{}已被注册", serverPath);
            }
        });
    }

然后可以通过 zkCli 命令查看zk上服务的状况.

[zk: localhost:2181(CONNECTED) 0] ls /fuck/top.huzhurong.fuck.UserService/provider
[]

消费端引用服务和动态代理

消费端引用服务端方式大多数还是通过 Spring 的自定义标签引入.使用方式如下.

    <fuck:reference id="test" interface="top.huzhurong.fuck.UserService" version="0.0.1"/>

这样就引用了 top.huzhurong.fuck.UserService 这个版本为 0.0.1 的服务. 在使用Spring的时候,我们需要通过使用动态代理去走网络,调用服务上的接口,那么就不可能想普通的 bean
一样去配置,一般说来都是通过 FactoryBean 来进行配置,因为在 getObject 中可以进行 bean的定制化(大部分的框架也是FactoryBean引入的)。

FactoryBean的使用方式如下

public class ProxyBean implements FactoryBean, InitializingBean {
    private Class name;
    private Object object;
    @Override
    public Object getObject() {
        return object;
    }
    @Override
    public Class<?> getObjectType() {
        return name;
    }
    @Override
    public void afterPropertiesSet() {
        this.build();
    }
    private void build() {
          //动态代理产生一个代理bean
        this.object = Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{this.name}, (proxy, method, args) -> {
             //在invoke中定制我们的服务,可以走tcp/http等等
            if (method.getName().equalsIgnoreCase("name")) {
                return "调用name方法";
            }
            if (method.getName().equalsIgnoreCase("toString")) {
                return "调用toString方法";
            }
            return "111";
        });
    }
}

这里只是一个简单的说明服务引用的过程,具体的过程如下

  • 注册消费节点到zookeeper
  • 获取服务者列表
  • 订阅该服务接口
  • 搭建动态代理

篇幅有限,我就介绍下搭建动态代理,其他三个还挺简单的。

  Object object = Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{Class.forName(this.interfaceName)}
                , new FuckRpcInvocationHandler(this));
                
 class FuckRpcInvocationHandler implements InvocationHandler {
        //字段
        //构造方法
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            //获取服务列表
            List<Provider> all = ProviderSet.getAll(this.className);
            //软负载均衡
            Provider provider = loadBalance.getProvider(all);
            String info = provider.buildIfno();
            SocketChannel channel = ChannelMap.get(info);
            if (channel == null) {
                //建立TCP连接
                Client client = new NettyClient(provider, this.serialization);
                client.connect(provider.getHost(), provider.getPort());
                channel = ChannelMap.get(info);
            }

            SocketChannel finalChannel = channel;
            Future<Response> submit = TempResultSet.executorService.submit(() -> {
                //请求写入tcp通道
                finalChannel.writeAndFlush(request);
                //这里可以改造成 CountDownLatch,可以比 ;; 循环要好
                for (; ; ) {
                    Response response = TempResultSet.get(request.getRequestId());
                    if (response != null) {
                        return response;
                    }
                }
            });
                Response response = submit.get(this.timeout, TimeUnit.SECONDS);
                
        }
    }

动态代理做的主要是,获取服务列表,软件负载处理,建立tcp连接,通信这几步.

动态代理仅仅是实际调用的时候才会进入invoke方法,实例化不进入,每一次代理调用都会进入 invoke

序列化和网络请求

序列化网上都文章很多,我也只是支持了 protostuffjdk 序列化方式. 本身很难,都是封装之后还是ok的,重点介绍一下网络传输的处理

当解决完序列化之后,就是网络传输了,都知道网络传输的是字节,但是怎么去使用网络进行透明传输我们都很头痛,而netty在使用上降低了我们的入门难度,其简单的api可以快速的进行上手,如果你还没有通过,可以看看netty的example就可以动手写了。

在网络传输部分的处理中,一个是协议的处理,一个是如何将处理完的请求结果赋值给正确的请求对象.

1、协议的处理也非常简单,就是一个常规的length + data , 作为自定义协议,这种处理可以让我们快速的处理而不会纠结于协议的正确性

     @Override
       protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) {
        if (byteBuf.readableBytes() <= HEAD_LENGTH) {
            return;
        }
        byteBuf.markReaderIndex();//标记位置
        int dataLength = byteBuf.readInt();
        if (byteBuf.readableBytes() < dataLength) {
            byteBuf.resetReaderIndex();//可读取的数据不够
            return;
        }
        byte[] dataArray = new byte[dataLength];
        byteBuf.readBytes(dataArray);
        Request request = serialization.deSerialize(dataArray, Request.class);
        if (log.isDebugEnabled()) {
            log.debug("接受到消费者请求:{},请求内容:{}", ctx.channel().toString(), request);
        }
        list.add(request);
    }

2、如何将处理结果返回给正确的请求对象,我们都知道在执行请求的时候都需要一个 RI --> requestId 去标示这个请求,在我们的rpc当中也是如此,确保请求和响应的是一家人

    public class Request implements Serializable {
            private String requestId;//请求标示
            private String serviceName;//服务名称
            private String methodName;//方法名,Method不能被序列化
            private Class<?>[] parameters;//参数类型,用于获取对于的执行方法
            private Object[] args;//实际参数
    }

3、服务处理,可以选择新建立线程池而不是直接使用 netty 的work io 线程池。

    @Override
     protected void channelRead0(ChannelHandlerContext channelHandlerContext, Serializable serializable) {
               if (serializable instanceof Request) {
               Request request = (Request) serializable;
            responseTask.execute(new ResponseTask(request, channelHandlerContext, this.applicationContext));
        }
    }
    
    @Override
    public void run() {
        String serviceName = request.getServiceName();
        String methodName = request.getMethodName();
        Class<?>[] parameters = request.getParameters();
        Object[] args = request.getArgs();
        Response response = new Response();
        response.setRequestId(request.getRequestId());
        response.setSuccess(false);
        try {
            //获取服务
            Object service = ServiceCache.getService(serviceName);
            if (service == null) {
                Class<?> aClass = ClassUtils.forName(serviceName, ClassUtils.getDefaultClassLoader());
                service = applicationContext.getBean(aClass);
                ServiceCache.put(serviceName, service);
            }
            //获取方法
            Method method = service.getClass().getDeclaredMethod(methodName, parameters);
            Object invoke = method.invoke(service, args);
            System.out.println("invoke:" + invoke);
            response.setSuccess(true);
            response.setObject(invoke);
        } catch (ClassNotFoundException | IllegalAccessException e) {
            response.setException(e);
        } catch (NoSuchMethodException e) {
            e.printStackTrace();
            response.setException(e);
        } catch (InvocationTargetException e) {
            e.printStackTrace();
            response.setException(e.getTargetException());
        }
        //写入channel中
        channelHandlerContext.writeAndFlush(response);
    }

消费端返回结果

当服务端写入数据的时候,如果网络通畅,基本上一下就到了客户端,我们的处理也很简单,就是放到一个Map中,而消费线程一直在map中找这个数据

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Serializable serializable) {
        if (serializable instanceof Response) {
            Response response = (Response) serializable;
            ResponseFuture responseFuture = TempResultSet.getResponseFuture(response.getRequestId());
            if (responseFuture == null) {
                return;
            }
            if (response.getAsync()) {
                CompletableFuture<Object> future = responseFuture.getFuture();
                if (future != null) {
                    if (response.getObject() != null) {
                        future.complete(response.getObject());
                    } else {
                        assert response.getException() != null;
                        future.completeExceptionally(response.getException());
                    }
                }
            } else {
                //解除CountDownLatch
                responseFuture.putResponse(response);
            }
        }
    }

到这里一个简单的rpc调用就可以起来了。

小结

一个简单的 rpc 就新鲜出炉了,但是其实还有很多可以改造的点,例如拦截(责任链+SPI),例如观察者模式的运用,不过也是从中学习到了一个RPC基本的使用方法,还需要深入到了解线程池使用,这里的使用也是很粗制滥造的。不过能在几天之内写完还是很开心,项目在这里 ----> fuck-rpc 的简单实现,你也可以试试哦

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,657评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,889评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,057评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,509评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,562评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,443评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,251评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,129评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,561评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,779评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,902评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,621评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,220评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,838评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,971评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,025评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,843评论 2 354

推荐阅读更多精彩内容