使用Netty实现一个简单的RPC

设计思路

RPC:远程过程调用,像是调用本地代码一样来调用远程的服务。所以一个简单的RPC至少包括两个角色:

  1. 服务提供方
  2. 服务调用方

服务调用方像调用本地代码一样调用远程服务,自然得有远程服务的接口信息。而服务提供方需要来实现接口提供服务返回到调用方。这样问题就变成了:

  1. 服务调用方如何通过服务接口将请求给到服务提供方?
  2. 服务提供方如何将数据回传到服务提供方?

先看第一个问题,因为服务在远程,我们可以告诉远程服务我们调用的是哪个接口,用的哪些参数就可以了。自然我们会想到使用动态代理,使用代理增强来接口方法,达到传输接口名参数的目的。服务端接收到这些数据后通过反射来执行。

Netty的简单描述

Netty是基于nio的网络编程框架。java nio由来已久,不过用的不多,原因在于使用原生的nio进行网络编程上手难度有些大。Netty对原生nio做了封装,原生的nio的核心组件selector来管理通道,而Netty则为Reactor。一个Reactor负责接收客户端的请求,同时也负责将不同的客户端请求放入到Channel中,这个便是Netty Reactor的单线程模型。最常用的还是主从模型:在多线程的基础上,让处理客户端的线程也变为多线程,一组线程池接收请求,一组线程池处理IO。做到了前面多个服务员,后面多个厨子的模式。
Reactor模型最为核心的是两个线程池,Netty使用 NioEventLoopGroup 来初始化线程池。一个 NioEventLoopGroup 下包含多个 NioEventLoop,NioEventLoop封装了selector,用于注册niochannel,每个 NioChannel 都绑定有一个自己的 ChannelPipeline。
EventLoopGroup 提供 next 接口,可以从组里面按照一定规则获取其中一个 EventLoop来处理任务。在 Netty 服务器端编程中,我们一般都需要提供两个 EventLoopGroup,例如:BossEventLoopGroup 和 WorkerEventLoopGroup。
通常一个服务端口即一个ServerSocketChannel对应一个Selector和一个EventLoop线程。BossEventLoop 负责接收客户端的连接并将 SocketChannel 交给 WorkerEventLoopGroup 来进行 IO 处理。BossEventLoopGroup 通常是一个单线程的 EventLoop,EventLoop 维护着一个注册了ServerSocketChannel 的 Selector 实例,BossEventLoop 不断轮询 Selector 将连接事件分离出来,通常是 OP_ACCEPT 事件,然后将接收到的 SocketChannel 交给 WorkerEventLoopGroup,WorkerEventLoopGroup 会由 next 选择其中一个 EventLoopGroup 来将这个 SocketChannel 注册到其维护的 Selector 并对其后续的 IO 事件进行处理。
另外一个比较重要的类便是ChannelHandler,ChannelHandler 接口定义了许多事件处理的方法,我们可以通过重写这些方法去实现具体的业务逻辑。

代码实现

服务端代码:

public class NettyRPCServer {
    private static final int PORT = 9090;

    public void run(){
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap boot = new ServerBootstrap();
            boot.group(bossGroup, workerGroup).
                    channel(NioServerSocketChannel.class).
                    option(ChannelOption.SO_BACKLOG, 128).
                    childOption(ChannelOption.SO_KEEPALIVE, true).
                    localAddress(PORT).
                    childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            //使用netty内置的object编解码器
                            //编码器
                            pipeline.addLast("encoder", new ObjectEncoder());
                            //解码器
                            pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE,
                                    ClassResolvers.cacheDisabled(null)));
                            pipeline.addLast(new InvokeHandler());
                        }
                    });
            ChannelFuture channelFuture = boot.bind(PORT).sync();
            System.out.println("server is ready");
            channelFuture.channel().closeFuture().sync();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        new NettyRPCServer().run();
    }

}

InvokeHandler来实现反射调用:

/**
 * 服务端业务处理类
 */
public class InvokeHandler extends ChannelInboundHandlerAdapter {

    private static final String SERVER_PATH = "com.ucal.dc.nio.netty.rpc.server";

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ClassInfo cl = (ClassInfo) msg;
        Class impl = Class.forName(getImplClassName(cl));
        Method method = impl.getDeclaredMethod(cl.getMethod(), cl.getType());
        Object result = method.invoke(impl.newInstance(),cl.getObjects());
        //写入到通道中
        ctx.writeAndFlush(result);
    }
    //通过接口找到实现类
    private String getImplClassName(ClassInfo classInfo) throws Exception {
        int i = classInfo.getClassName().lastIndexOf(".");
        String interfaceName = classInfo.getClassName().substring(i);
        Class service = Class.forName(SERVER_PATH + interfaceName);
        //使用Reflections框架
        Reflections reflections = new Reflections(SERVER_PATH);
        Set<Class> types = reflections.getSubTypesOf(service);
        if(types.size() == 0){
            throw new RuntimeException("未能找到服务实现类");
        }else if(types.size() > 1){
            throw new RuntimeException("找到多个服务实现类,未能明确使用哪一个");
        }else{
            Class c = types.iterator().next();
            return c.getName();//得到实现类的名字
        }
    }

}

客户端代理类(使用ClassInfo类来封装数据):

/**
 * 客户端代理类
 */
public class NettyRPCProxy {
    private static final int PORT = 9090;
    //根据接口创建动态代理对象
    public static Object create(Class target){
        return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() {
            @Override
            public Object invoke(Object o, Method method, Object[] objects) throws Throwable {
                //封装ClassInfo
                ClassInfo classInfo = new ClassInfo();
                classInfo.setClassName(target.getName());
                classInfo.setMethod(method.getName());
                classInfo.setObjects(objects);
                classInfo.setType(method.getParameterTypes());

                //开始使用netty发送数据
                NioEventLoopGroup group = new NioEventLoopGroup();
                Bootstrap bootstrap = new Bootstrap();
                ResultHandler resultHandler = new ResultHandler();
                try {
                    bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            //编码器
                            pipeline.addLast("encoder", new ObjectEncoder());
                            //解码器
                            pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE,
                                    ClassResolvers.cacheDisabled(null)));
                            //客户端业务处理类
                            pipeline.addLast(resultHandler);
                        }
                    });
                    ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", PORT).sync();
                    channelFuture.channel().writeAndFlush(classInfo).sync();
                    channelFuture.channel().closeFuture().sync();
                }finally {
                    group.shutdownGracefully();
                }
                return resultHandler.response;

            }
        });
    }
}

ClassInfo:

/**
 * 用于封装类消息,用于数据传输
 */
public class ClassInfo implements Serializable {

    private static final long serialVersionUID = 9159351154097982580L;

    private String className; //类名
    private String method; //方法名
    private Class<?>[] type; //参数类型
    private Object[] objects;


    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethod() {
        return method;
    }

    public void setMethod(String method) {
        this.method = method;
    }

    public Class<?>[] getType() {
        return type;
    }

    public void setType(Class<?>[] type) {
        this.type = type;
    }

    public Object[] getObjects() {
        return objects;
    }

    public void setObjects(Object[] objects) {
        this.objects = objects;
    }
}

ResultHandler便是客户端接收到消息的Handler:

/**
 * 客户端接收消息处理器
 */
public class ResultHandler extends ChannelInboundHandlerAdapter {
    public Object response;
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        response = msg;
        ctx.close();
    }

}

在客户端有这么一个远程服务接口:

/**
 * 服务接口
 */
public interface HelloNetty {
    String hello();
}

而服务端有这么一个接口的实现类:

public class HelloNettyImpl implements HelloNetty {
    @Override
    public String hello() {
        return "hello netty";
    }
}

开始测试:先启动服务端,在客户端使用NettyRPCProxy创建HelloNetty的代理对象,在调用代理对应的接口方法的时候,实际上就是将类信息封装成ClassInfo,通过netty传输到服务端NettyRPCServer。

public class TestNettyRPC {
    public static void main(String[] args) {
        HelloNetty hn = (HelloNetty)NettyRPCProxy.create(HelloNetty.class);
        System.out.println(hn.hello());
    }
}

下面增加一个带参的调用:

public interface HelloRPC {
    String hello(String name);
}

实现类:

public class HelloRPCImpl implements HelloRPC {
    @Override
    public String hello(String name) {
        return "hello "+name;
    }
}

测试一下:

public class TestNettyRPC {
    public static void main(String[] args) {
        HelloNetty hn = (HelloNetty)NettyRPCProxy.create(HelloNetty.class);
        System.out.println(hn.hello());

        HelloRPC hr = (HelloRPC)NettyRPCProxy.create(HelloRPC.class);
        System.out.println(hr.hello("susan"));

    }
}

输出:

image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容