netty实现rpc

学习目标

  1. 什么是RPC
  2. RPC实现原理
  3. 借助netty 实现网络通信
  4. java 反射
  5. 代码实现

什么是RPC

RPC(Remote Procedure Call),即远程过程调用,它是一种通过网络从远程计算机程序 上请求服务,而不需要了解底层网络实现的技术。常见的 RPC 框架有: 源自阿里的 Dubbo, Spring 旗下的 Spring Cloud,Google 出品的 grpc 等等。

RPC原理

image-20240823140906250.png
  1. 服务消费方(client)以本地调用方式调用服务

  2. client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体

  3. client stub 将消息进行编码并发送到服务端

  4. server stub 收到消息后进行解码

  5. server stub 根据解码结果调用本地的服务

  6. 本地服务执行并将结果返回给 server stub

  7. server stub 将返回导入结果进行编码并发送至消费

  8. client stub 接收到消息并进行解码

  9. 服务消费方(client)得到结果

    RPC 的目标就是将 2-8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地 方法一样即可完成远程服务调用。接下来我们基于 Netty 自己动手搞定一个 RPC。

借助netty 实现网络通信

image-20240823141218353.png

client(服务的调用方): 两个接口 + 一个包含 main 方法的测试类 

Client Stub: 一个客户端代理类 + 一个客户端业务处理类 

Server(服务的提供方): 两个接口 + 两个实现类

Server Stub: 一个网络处理服务器 + 一个服务器业务处理类

注意:服务调用方的接口必须跟服务提供方的接口保持一致(包路径可以不一致) 最终要实现的目标是:在 TestNettyRPC 中远程调用 HelloRPCImpl 或 HelloNettyImpl 中的方法

java 反射实现

  1. Class.forName("com.example.Person")
  2. obj.getClass()
  3. MyClass.class
  4. Reflections

代码实现

  • 消费者应用
@RestController
@RequestMapping("/api")
public class ConsumerController {

    @Resource
    private NettyRPCProxy nettyRPCProxy;

    @GetMapping("/comsumer")
    private String comsumer(String str) {
        Provider provider = (Provider)nettyRPCProxy.create(Provider.class);
         String res = provider.provider(str);
        System.out.println(res);// 打印 12312_provider123
        return res;
     
    }
}

public interface Provider {
    String provider(String str);
}

  • 生产者应用

    public class ProviderImpl implements Provider {
        @Override
        public String provider(String str) {
            return str + "_provider123";
        }
    }
    public interface Provider {
        String provider(String str);
    }
    
    
  • rpc核心代码

    clientsub

    public class NettyRPCProxy {
        //根据接口创建代理对象
        public Object create(Class target) {
            return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() {
                @Override
                public Object invoke(Object proxy, Method method, Object[] args)
                        throws Throwable {
                    //封装ClassInfo
                    ClassInfo classInfo = new ClassInfo();
                    classInfo.setClassName(target.getName());
                    classInfo.setMethodName(method.getName());
                    classInfo.setObjects(args);
                    classInfo.setTypes(method.getParameterTypes());
    
                    //开始用Netty发送数据
                    EventLoopGroup group = new NioEventLoopGroup();
                    ResultHandler resultHandler = new ResultHandler();
                    try {
                        Bootstrap b = new Bootstrap();
                        b.group(group)
                                .channel(NioSocketChannel.class)
                                .handler(new ChannelInitializer<SocketChannel>() {
                                    @Override
                                    public void initChannel(SocketChannel ch) throws Exception {
                                        ChannelPipeline pipeline = ch.pipeline();
                                        //编码器
                                        pipeline.addLast("encoder", new ObjectEncoder());
                                        //解码器  构造方法第一个参数设置二进制数据的最大字节数  第二个参数设置具体使用哪个类解析器
                                        pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                                        //客户端业务处理类
                                        pipeline.addLast("handler", resultHandler);
                                    }
                                });
                        ChannelFuture future = b.connect("127.0.0.1", 9999).sync();
                        future.channel().writeAndFlush(classInfo).sync();
                        future.channel().closeFuture().sync();
                    } finally {
                        group.shutdownGracefully();
                    }
                    return resultHandler.getResponse();
                }
            });
    
        }
    }
    
public class ResultHandler extends ChannelInboundHandlerAdapter {

    private Object response;
    public Object getResponse() {
        return response;
    }

    @Override //读取服务器端返回的数据(远程调用的结果)
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        response = msg;
        ctx.close();
    }
}

serverstub

public class NettyRPCServer {

    private int port;
    public NettyRPCServer(int port) {
        this.port = port;
    }

    public void start() throws InterruptedException {
        //主线程池-负责处理客户端连接
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();

        //主线程池-负责处理网络IO读写
        NioEventLoopGroup workGroup = new NioEventLoopGroup();

        ServerBootstrap server = new ServerBootstrap();

        server.group(bossGroup, workGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 128)//设置线程队列的连接个数
                .childOption(ChannelOption.SO_KEEPALIVE, true)//设置保持活动连接状态
                .localAddress(port)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        //编码
                        pipeline.addLast("encoder", new ObjectEncoder());
                        //解码
                        pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));

                        //服务器端业务处理类
                        pipeline.addLast(new InvokeHandler());
                    }
                });

        /*异步绑定到服务器,sync()会阻塞到完成*/
        ChannelFuture f = server.bind().sync();
        System.out.println("......server is ready......");
        /*阻塞当前线程,直到服务器的ServerChannel被关闭*/
        f.channel().closeFuture().sync();

    }
    public static void main(String[] args) throws Exception {
        new NettyRPCServer(9999).start();
    }
}
public class InvokeHandler extends ChannelInboundHandlerAdapter {

    //得到某接口下某个实现类的名字
    private String getImplClassName(ClassInfo classInfo) throws Exception{

        String interfacePath="org.example.netty.rpc.provider.service";//服务提供者的 包路径
        int lastDot = classInfo.getClassName().lastIndexOf(".");
        String interfaceName=classInfo.getClassName().substring(lastDot);

        Class superClass= null;//拼接 全限定类 是服务提供者的
        try {

            superClass = Class.forName(interfacePath + interfaceName);
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
            throw e;
        }
        List<Class> classes2 = getClassesForPackage(interfacePath, superClass);
        if (classes2 != null && classes2.size() > 0) {

            return classes2.get(0).getName();
        }

        Reflections reflections = new Reflections(interfacePath);
//        得到某接口下的所有实现类
        Set<Class> ImplClassSet=reflections.getSubTypesOf(superClass);
        if(ImplClassSet.size()==0){
            System.out.println("未找到实现类");
            return null;
        }else if(ImplClassSet.size()>1){
            System.out.println("找到多个实现类,未明确使用哪一个");
            return null;
        }else {
            //把集合转换为数组
            Class[] classes=ImplClassSet.toArray(new Class[0]);
            return classes[0].getName(); //得到实现类的名字
        }

    }


    @Override //读取客户端发来的数据并通过反射调用实现类的方法
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ClassInfo classInfo = (ClassInfo) msg;
        Object clazz = Class.forName(getImplClassName(classInfo)).newInstance();
        Method method = clazz.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());
        //通过反射调用实现类的方法
        Object result = method.invoke(clazz, classInfo.getObjects());
        ctx.writeAndFlush(result);
    }

    private List<Class> getClassesForPackage(String packageName, Class<?> targetInterface) {
        List<Class> classes = new ArrayList<>();
        String path = packageName.replace('.', '/');
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        try {
            Enumeration<URL> resources = classLoader.getResources(path);
            while (resources.hasMoreElements()) {
                File file = new File(resources.nextElement().getFile());
                if (file.isDirectory()) {
                    classes.addAll(findClasses(file, packageName, targetInterface));
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        return classes;
    }

//    private static Collection<Class> findClasses2(File file, String packageName) {
//
//        return null;
//    }


    private List<Class> findClasses(File directory, String packageName, Class<?> targetInterface) {
        List<Class> classes = new ArrayList<>();
        File[] files = directory.listFiles();
        for (File file : files) {
            if (file.isDirectory()) {
                classes.addAll(findClasses(file, packageName + "." + file.getName(), targetInterface));
            } else if (file.getName().endsWith(".class")) {
                String className = packageName + '.' + file.getName().substring(0, file.getName().length() - 6);
                try {
                    Class<?> clzz = Class.forName(className);
                    //判断 targetInterface 是 当前clzz 父类或接口, 排除接口类
                    if (targetInterface.isAssignableFrom(clzz) && !clzz.isInterface()) {
                        classes.add(Class.forName(className));
                    }

                } catch (ClassNotFoundException e) {
                    e.printStackTrace();
                }
            }
        }
        return classes;
    }
}
image-20240823143245031.png
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,843评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,538评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,187评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,264评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,289评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,231评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,116评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,945评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,367评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,581评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,754评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,458评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,068评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,692评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,842评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,797评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,654评论 2 354

推荐阅读更多精彩内容