入门:一个基于Netty的RPC实现

目标:客户端远程调用服务端的一项服务,具体来说,客户端给服务端指定具体的类,方法,和参数信息,服务端用这些信息完成服务,并将调用结果或异常返回给客户端。

这个例子中,客户端想要调用的服务是 HelloService

public interface HelloService {
    String sayHello(String name);
}

具体实现为 HelloServiceImpl

/**
 * 给参数中的名字返回打招呼
 */
public class HelloServiceImpl implements HelloService{
    @Override
    public String sayHello(String name) {
        return "您好," + name;
    }
}

首先,客户端与服务端的这些通讯信息需要载体,我们把他们封装成两个类,客户端给服务端发送的远程调用Request,和服务端返回的Response。从设计考虑,让他们都继承与Message这个类

Message

@Data
public abstract class Message implements Serializable {

    private int sequenceId;
    private int messageType;

    public abstract int getMessageType();

    public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
    public static final int  RPC_MESSAGE_TYPE_RESPONSE = 102;

    private static final Map<Integer, Class<?>> messageClasses = new HashMap<>();

    public static Class<?> getMessageClass(int messageType) {
        return messageClasses.get(messageType);
    }

    static {
        messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
        messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
    }
}

客户端的request类,包含了客户端想要调用的服务的一切信息(全类名、方法名、参数...)

RpcRequestMessage

@Data
@ToString(callSuper = true)
public class RpcRequestMessage extends Message {

    // 接口的全限定名
    private String interfaceName;

    // 调用的方法名
    private String methodName;

    // 方法的返回值类
    private Class<?> returnType;

    // 方法的参数类型数组
    private Class[] parameterTypes;

    // 方法的参数值数组
    private Object[] parameterValue;

    // 一个可以设置sequenceID的构造器
    public RpcRequestMessage(int sequenceID, String interfaceName, String methodName,
                             Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {

        super.setSequenceId(sequenceID);
        this.interfaceName = interfaceName;
        this.methodName = methodName;
        this.returnType = returnType;
        this.parameterTypes = parameterTypes;
        this.parameterValue = parameterValue;
    }

    @Override
    public int getMessageType() {
        return Message.RPC_MESSAGE_TYPE_REQUEST;
    }
}

服务端Request类,携带服务的返回值或异常值 RpcResponseMessage

@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message {

    // 返回值
    private Object returnValue;

    // 异常值
    private Exception exceptionValue;

    @Override
    public int getMessageType() {
        return Message.RPC_MESSAGE_TYPE_RESPONSE;
    }
}

好了,我们现在有了信息的载体,如何在客户端和服务端之间进行网络通信呢,这里使用了Netty框架

首先编写服务侧:RpcServer

@Slf4j
public class RpcServer {
    public static void main(String[] args) {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        RpcRequestMessageHandler RPC_HANDLER = new RpcRequestMessageHandler();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.channel(NioServerSocketChannel.class)
                    .group(boss, worker)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new ProtocolFrameDecoder())   // 处理粘包半包
                                    .addLast(LOGGING_HANDLER)   // 日志
                                    .addLast(MESSAGE_CODEC) // 自定义协议 消息编解码
                                    .addLast(RPC_HANDLER);
                        }
                    });
            Channel channel = serverBootstrap.bind(8080).sync().channel();
            channel.closeFuture().sync();
        } catch (InterruptedException e) {
            log.debug("server error",e);
        } finally {
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }
}

由于本篇旨在实现rpc,netty的基本内容就不再赘述,代码中的一些自定义的handler(处理粘包半包问题的、编解码协议的)就不再一一实现了。服务侧我们只关注一个关键的handler -- RpcRequestMessageHandler 这个handler专门负责处理从客户端来的 RpcRequestMessage

处理思路为

  • 先在封装的request信息中拿到所需服务的全类名,然后根据全类名拿到服务的实现类
  • 拿到所需服务的方法、参数类型、参数值、返回类型
  • 使用反射调用方法,拿到返回值或异常,封装成response类,返回客户端

由于没有整合Spring,需要手写一个从接口类拿到实现类的工具,具体为:

首先,在配置文件(application.properties)中,将服务的接口类和实现类绑定

# rpc bean
com.yldog.rpc.HelloService=com.yldog.rpc.HelloServiceImpl

然后一个工厂

public class ServicesFactory {
    static Properties properties;
    static Map<Class<?>, Object> map = new ConcurrentHashMap<>();

    static {
        try {
            InputStream in = MyConfig.class.getResourceAsStream("/application.properties");
            properties = new Properties();
            properties.load(in);
            Set<String> names = properties.stringPropertyNames();
            for (String name : names) {
                if (name.endsWith("Service")) {
                    // 拿到接口类Class对象
                    Class<?> interfaceClass = Class.forName(name);
                    // 拿到接口的实现类Class对象
                    Class<?> instanceClass = Class.forName(properties.getProperty(name));
                    map.put(interfaceClass, instanceClass.getDeclaredConstructor().newInstance());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 根据接口类获得实现类
    public static <T> T getService(Class<T> interfaceClass) {
        return (T) map.get(interfaceClass);
    }
}

最后就能编写服务端的handler了

@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage msg) throws Exception {
        // 构造一个响应实例
        RpcResponseMessage resp = new RpcResponseMessage();
        // 设置响应消息的序号 -- 应与请求消息序号一致
        resp.setSequenceId(msg.getSequenceId());
        try {
            // 通过request message拿到接口类,在通过接口类拿到实现类
            HelloService service = (HelloService) ServicesFactory.getService(Class.forName(msg.getInterfaceName()));
            // 拿到方法
            Method method = service.getClass().getDeclaredMethod(msg.getMethodName(), msg.getParameterTypes());
            // 在刚才拿到的实现类上调用方法
            Object result = method.invoke(service, msg.getParameterValue());
            resp.setReturnValue(result);
        } catch (Exception e) {
            String message = e.getCause().getMessage();
            resp.setExceptionValue(new Exception("远程调用异常:" + message));
        } finally {
            ctx.writeAndFlush(resp);
        }

    }
}

接着编写客户端

首先要实现客户端给服务端发远程调用请求。

第一步是先要拿到与服务端联络的 SocketChannel,再使用channel来进行远程调用

@Slf4j
public class RpcClientManager {

    private static volatile Channel channel = null;
    private static final Object LOCK = new Object();
  
    // 使用DCL保证channel单例
    public static Channel getChannel() {
        if (channel != null) {
            return channel;
        }
        synchronized (LOCK) {
            if (channel != null) {
                return channel;
            }
            initChannel();
            return channel;
        }
    }

    // 初始化channel,只能初始化一次
    private static void initChannel() {
        NioEventLoopGroup worker = new NioEventLoopGroup();
        LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        MessageCodecSharable MESSAGE_HANDLER = new MessageCodecSharable();
        RpcResponseMessageHandler RPC_HANDLER = new RpcResponseMessageHandler();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.channel(NioSocketChannel.class)
                .group(worker)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ProtocolFrameDecoder())
                                .addLast(LOGGING_HANDLER)
                                .addLast(MESSAGE_HANDLER)
                                .addLast(RPC_HANDLER);
                    }
                });

        try {
            channel = bootstrap.connect("localhost", 8080).sync().channel();

            // 初始化channel后不能在这里阻塞住,所以采用异步
            // channel.closeFuture().sync();
            channel.closeFuture().addListener(future -> { worker.shutdownGracefully(); });
        } catch (InterruptedException e) {
            log.debug("Client Error", e);
        }
    }
}

注意:

  • getChannel() 就可以拿到与服务端通信的Channel,由于使用了DLC单例模式,保证了channel的唯一性
  • 在处理连接关闭时,不能使用 channel.closeFuture().sync();否则代码阻塞在了 initChannel()中,要使用异步的处理方式

这样,客户端就能向服务端发送request消息了,具体为:

public static void main(String[] args) {
        getChannel().writeAndFlush(new RpcRequestMessage(
                1,
                "com.yldog.rpc.HelloService",
                "sayHello",
                String.class,
                new Class[]{String.class},
                new Object[]{"张三"}
        ));
}

但是这种让用户一个个填写参数的方式,非常的不友好,理想的方式应该是用户像在本地调用方法一般,如

service.sayHello("张三"),这里的思路是使用代理模式将构建请求消息这一步封装起来,具体为编写一个构造代理类的方法:

    // 创建代理类
    public static <T> T getProxyService(Class<T> serviceClazz) {
        ClassLoader classLoader = serviceClazz.getClassLoader();
        Class<?>[] interfaces = {serviceClazz};
        Object o = Proxy.newProxyInstance(classLoader, interfaces, (proxy, method, args) -> {
            // 1. 将方法调用 转为 消息对象
            int sequenceID = SequenceIdGenerator.nextId();
            RpcRequestMessage msg = new RpcRequestMessage(
                    sequenceID,
                    serviceClazz.getName(),
                    method.getName(),
                    method.getReturnType(),
                    method.getParameterTypes(),
                    args
            );
            // 2. 发送消息
            getChannel().writeAndFlush(msg);
          
            // 暂时不考虑如何接收返回值

        });
        return (T) o;
    }

这样,用户的使用就变成了:

        public static void main(String[] args) {
//        getChannel().writeAndFlush(new RpcRequestMessage(
//                1,
//                "com.yldog.rpc.HelloService",
//                "sayHello",
//                String.class,
//                new Class[]{String.class},
//                new Object[]{"张三"}
//        ));
        // 以上这种远程调用的方法非常不友好,考虑使用代理封装
        HelloService proxyService = getProxyService(HelloService.class);
        proxyService.sayHello("张三");
        proxyService.sayHello("李四");
        proxyService.sayHello("王五");
    }

最后,我们要考虑的就是客户端如何接收服务端的返回消息,由于在netty中,调用远程服务的线程与收到返回结果的线程并不是一个线程,接收返回消息的线程一般在 nioEventLoop 中,所以想要在调用服务的线程中获取服务的结果,就涉及到了线程之间异步交换信息,具体实现思路为

  • 发起调用的线程在发送了Request请求信息后,准备一个空的Promise,然后等待nio线程将服务端返回的结果放入Promise中
  • nio线程得到返回的结果后,将结果放入Promise中,并通知调用的线程,实现异步

首先编写处理服务器响应信息的handler

@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {

    /**
     * 一个用来传递线程之间异步结果的容器集合
     * 某个线程进行远程调用后,开启一个Promise容器,Nio线程收到远程结果后,将结果放入那个线程的Promise中
     */
    public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
        log.debug("{}", msg);

        // 拿到属于这次消息接收的空的promise
        Promise<Object> promise = PROMISES.remove(msg.getSequenceId());

        // 往promise放结果
        if (promise != null) {
            if ((msg.getExceptionValue() == null)) { // 若异常值为空,则正常调用
                promise.setSuccess(msg.getReturnValue());
            } else { // 异常值不为空,则调用异常
                promise.setFailure(msg.getExceptionValue());
            }
        }

    }
}

然后完整的客户端代码为

@Slf4j
public class RpcClientManager {

    private static volatile Channel channel = null;
    private static final Object LOCK = new Object();

    public static void main(String[] args) {
//        getChannel().writeAndFlush(new RpcRequestMessage(
//                1,
//                "com.yldog.rpc.HelloService",
//                "sayHello",
//                String.class,
//                new Class[]{String.class},
//                new Object[]{"张三"}
//        ));
        // 以上这种远程调用的方法非常不友好,考虑使用代理封装
        HelloService proxyService = getProxyService(HelloService.class);
        proxyService.sayHello("张三");
        proxyService.sayHello("李四");
        proxyService.sayHello("王五");
    }

    // 创建代理类
    public static <T> T getProxyService(Class<T> serviceClazz) {
        ClassLoader classLoader = serviceClazz.getClassLoader();
        Class<?>[] interfaces = {serviceClazz};
        Object o = Proxy.newProxyInstance(classLoader, interfaces, (proxy, method, args) -> {
            // 1. 将方法调用转为 消息对象
            int sequenceID = SequenceIdGenerator.nextId();
            RpcRequestMessage msg = new RpcRequestMessage(
                    sequenceID,
                    serviceClazz.getName(),
                    method.getName(),
                    method.getReturnType(),
                    method.getParameterTypes(),
                    args
            );
            // 2. 发送消息
            getChannel().writeAndFlush(msg);

            // 3. 准备一个这次接收消息专用的Promise对象                指定promise异步接收结果的线程
            DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
            // 放入promise集合中,Nio线程取用时,用sequenceId作为key
            RpcResponseMessageHandler.PROMISES.put(sequenceID, promise);

            // 4. 等待 Nio线程将返回的结果放入 promise 中
            promise.await();

            // 5. 拿到结果后,判断调用是否正常
            if (promise.isSuccess()) {
                // 调用成功
                return promise.getNow();
            } else {
                // 调用异常
                throw new RuntimeException(promise.cause());
            }
        });
        return (T) o;
    }

    // 使用DCL保证channel单例
    public static Channel getChannel() {...}

    // 初始化channel,只能初始化一次
    private static void initChannel() {...}
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,755评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,369评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,799评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,910评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,096评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,159评论 3 411
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,917评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,360评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,673评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,814评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,509评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,156评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,123评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,641评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,728评论 2 351

推荐阅读更多精彩内容