基于Netty+动态代理+反射 实现简单的RPC调用

前言

本文只是简单实现了一次RPC调用示例,以理解其调用原理。一些主流RPC框架的其他功能并没有实现。(如服务自动注册与发现,流控,动态配置等)

PRC调用核心

像调用本地代码一样调用远程服务。
调用方只需调用服务方所提供的接口,通过Java动态代理,代理方法内,与服务方进行网络交互,得到服务方返回结果。基于上述,调用方只需依赖服务方所提供的接口。在使用时的感觉就像是,调用了本地代码一样。其实是用代理模式屏蔽了底层的网络交互。

简单的RPC调用所涉及的底层技术

Java动态代理
Java反射
Java序列化
NIO网络模型(Netty)

Netty是什么

  • Netty 是 JBoss 公司用 Java 写的一个 Jar 包(库),目的是快速开发高性能、高可靠性的网络服务器和客户端程序
  • Netty 提供异步、无阻塞、事件驱动的网络应用程序框架和工具
  • Netty 是目前公认的网络编程最好的框架,官网地址:http://netty.io/
  • GitHub 托管地址:https://github.com/netty/netty
  • Netty 底层封装的也是 Java 的NIO,所以也叫NIO框架,常用于开发分布式系统

推荐该博主的关于netty的文章:
https://blog.csdn.net/wangmx1993328/article/details/83035760

动手实现

调用时序图

我们先来看一次PRC调用的时序图:


  • Client:服务调用方
  • Proxy : 调用方动态代理组件
  • Netty_C:调用方Netty客户端
  • 注册中心:服务自动注册与发现,可以是ZooKeeper
  • Netty_S : 提供方Netty服务端
  • Server: 服务提供方

注:上图红色虚线框中的功能,本示例没有涵盖。本示例通过API方式配置,直连服务节点。

类图

  • MyRpcServiceContainer : 调用方入口,主要用于获取代理服务,存储服务节点的信息。
  • MyRpcServiceGroup : 服务节点的集合,并提供负载均衡策略(未实现)。
  • MyRpcServiceNode : 单个服务节点的信息。
  • MyRpcClientProxy : 调用方动态代理实现类。
  • MyRpcClient : Netty客户端,对连接信息进行配置,如序列化反序列化Handler和异步处理返回结果的Handler。
  • MyRpcClientHandler : Netty客户端异步处理的Handler。主要用于发送请求信息等。
  • MyRpcRequest : 请求对象封装,包含请求接口,请求方法,请求参数等。
  • MyRpcResponse : 请求结果封装,包含方法返回结果。
  • MyRpcServer : 提供方入口,主要用于暴露服务。
  • MyRpcServerConfig : 提供方服务的集合,以及一些配置信息。
  • MyRpcServiceImplProxy : 提供方服务代理,代理服务的方法具体实现,并提供流控等功能。
  • MyRpcFlowControl : 流控计数器,针对接口、方法维度,提供流控计数功能。
  • MyRpcServerHandler : Netty服务端异步处理的Handler。主要用于发送执行结果等。

代码讲解

TODO

测试

服务提供方

public class MyRpcServerTest {

    @Test
    public void testName() {

        MyRpcServer myRpcServer = new MyRpcServer();
        MyRpcServerConfig config = new MyRpcServerConfig();
        config.setPort(8888);
        //注册服务,此处为api方式
        MyRpcServiceImplProxy implProxy = new MyRpcServiceImplProxy(DemoRpcService.class);
        config.addService(DemoPrcInterface.class, implProxy);
        myRpcServer.config(config).start();
    }
}



十一月 13, 2019 5:24:09 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0xea1be79c] REGISTERED
十一月 13, 2019 5:24:09 下午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0xea1be79c] BIND: 0.0.0.0/0.0.0.0:8888
十一月 13, 2019 5:24:09 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0xea1be79c, L:/0:0:0:0:0:0:0:0:8888] ACTIVE
main,服务器开始监听端口,等待客户端连接.........
十一月 13, 2019 5:24:25 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xea1be79c, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0xac188c6d, L:/127.0.0.1:8888 - R:/127.0.0.1:61210]
十一月 13, 2019 5:24:25 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xea1be79c, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
十一月 13, 2019 5:24:25 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xea1be79c, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0x46558bc0, L:/127.0.0.1:8888 - R:/127.0.0.1:61208]
十一月 13, 2019 5:24:25 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xea1be79c, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE
十一月 13, 2019 5:24:25 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0xea1be79c, L:/0:0:0:0:0:0:0:0:8888] READ: [id: 0x7b6f1d71, L:/127.0.0.1:8888 - R:/127.0.0.1:61209]
十一月 13, 2019 5:24:25 下午 io.netty.handler.logging.LoggingHandler channelReadComplete
信息: [id: 0xea1be79c, L:/0:0:0:0:0:0:0:0:8888] READ COMPLETE

服务调用方

public class MyRpcServiceContainerTest {
    private CountDownLatch countDownLatch = new CountDownLatch(3);
    @Test
    public void testName() throws InterruptedException {
        Map<Class, MyRpcServiceGroup> serviceInfoMap = new HashMap<Class, MyRpcServiceGroup>();
        MyRpcServiceGroup serviceInfo = new MyRpcServiceGroup();
        serviceInfo.addNode(new MyRpcServiceNode("127.0.0.1",8888));
        serviceInfoMap.put(DemoPrcInterface.class, serviceInfo);
        MyRpcServiceContainer container = new MyRpcServiceContainer(serviceInfoMap);
        //上面为服务发现,此处为api方式
        final DemoPrcInterface intf = container.getService(DemoPrcInterface.class);
        //System.out.println(intf.helloWithName("zxm"));
        final DemoReq req = new DemoReq();
        req.setUuid("123456");
        
        for (int i = 0; i < 3; i++) {
            new Thread(new Runnable() {
                public void run() {
                    try {
                        System.out.println(intf.callRemoteService(req,"yyxl").getUuid());
                    }catch (Exception e){
                        e.printStackTrace();
                    }finally {
                        countDownLatch.countDown();
                    }
                }
            }).start();
        }
        countDownLatch.await();
        
    }
}


Thread-2,客户端发起异步连接..........
Thread-1,客户端发起异步连接..........
Thread-0,客户端发起异步连接..........
nioEventLoopGroup-3-1,Server return Message:com.yyxl.myrpc.service.dto.MyRpcResponse@6585879b
nioEventLoopGroup-4-1,Server return Message:com.yyxl.myrpc.service.dto.MyRpcResponse@74997d68
java.lang.reflect.UndeclaredThrowableException
    at com.sun.proxy.$Proxy3.callRemoteService(Unknown Source)
    at com.yyxl.myrpc.service.consumer.MyRpcServiceContainerTest$1.run(MyRpcServiceContainerTest.java:32)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: 流控
    at com.yyxl.myrpc.service.provider.MyRpcServiceImplProxy.call(MyRpcServiceImplProxy.java:57)
    at com.yyxl.myrpc.service.provider.MyRpcServerHandler.channelRead(MyRpcServerHandler.java:26)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    ... 1 more
java.lang.reflect.UndeclaredThrowableException
    at com.sun.proxy.$Proxy3.callRemoteService(Unknown Source)
    at com.yyxl.myrpc.service.consumer.MyRpcServiceContainerTest$1.run(MyRpcServiceContainerTest.java:32)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: 流控
    at com.yyxl.myrpc.service.provider.MyRpcServiceImplProxy.call(MyRpcServiceImplProxy.java:57)
    at com.yyxl.myrpc.service.provider.MyRpcServerHandler.channelRead(MyRpcServerHandler.java:26)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:656)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:591)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:508)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:470)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    ... 1 more
nioEventLoopGroup-2-1,Server return Message:com.yyxl.myrpc.service.dto.MyRpcResponse@686c50af
123456##yyxl

我们可以看到,默认流控阀值为1,前2个请求直接返回流控异常,第3个返回正常调用结果。

参考

彻底理解Netty,这一篇文章就够了
Netty 入门示例详解
Netty之传输POJO(使用Java自带的序列化方式)
动态代理
【Java 笔记】Java 反射相关整理

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容

  • 微服务,已经是每个互联网开发者必须掌握的一项技术。而 RPC 框架,是构成微服务最重要的组成部分之一。趁最近有时间...
    java菜阅读 384评论 0 0
  • 国家电网公司企业标准(Q/GDW)- 面向对象的用电信息数据交换协议 - 报批稿:20170802 前言: 排版 ...
    庭说阅读 10,934评论 6 13
  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,093评论 1 32
  • 缅怀革命先烈 弘扬先烈精神 西北区: 杜秀霞 报道 2019年4月5日上午,春风盎然,天气睛朗,阳光明媚,...
    de47f6148188阅读 575评论 0 1
  • 诺丁山夜未眠 我住在诺丁山——伦敦我最喜欢的地方,平时集市上人们会贩卖各种为人熟知的果蔬。 ——《诺丁山》 没有温...
    帝企鹅喜欢北京熊阅读 814评论 0 2