基于netty的RPC实现

这里解决了三个问题

  1. 协议定义,解决 粘包/拆包 问题
  2. 单客户端并发发送/消息维护问题
  3. 服务端并发提供服务问题

三个问题的具体实现如下

1.协议定义:

完整数据块包含数据 开始标识头,数据长度,真实数据三部分,如下图.


在这里插入图片描述

客户端,具体发送代码实现如下:

 public class RpcEncoder extends MessageToByteEncoder {
    @Override
    protected void encode(ChannelHandlerContext ctx, Object requestBoday, ByteBuf out) throws Exception {
        //序列化传输对象. 也可只是传输字符串,服务端解析,但是局限不较大,无法应对多样的调用函数,对应参数,已经类型
        byte[] data = SerializationUtil.serialize(requestBoday);
        //先写入 开始标识
        out.writeBytes(Constants.SERVIE_HEARD.getBytes());
        //再写入数据长度
        out.writeInt(data.length);
        //再写入真实数据
        out.writeBytes(data);
    }
}

服务端,具体接收解析代码实现如下:

public class RpcDecoder extends ByteToMessageDecoder {
     .............
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        //hadReadHeard避免多次判断头信息
        if (!hadReadHeard) {
            while (true) {
                //这里保证至少读到一个头信息,也可以读到一个头和数据长度在做处理
                if (in.readableBytes() < 4) {
                    return;
                }
                in.markReaderIndex();
                in.readBytes(dataHeardBuffer);
                System.out.println(Constants.SERVIE_HEARD.getBytes().length);
                String s = new String(dataHeardBuffer);
                //读到头标识信息,准备读取数据长度和数据
                if (s.equals(Constants.SERVIE_HEARD)) {
                    hadReadHeard = true;
                    break;
                } else {
                    in.resetReaderIndex();
                    //为读取到 头标识,则过滤一个字节,继续判断是否收到头标识
                    in.readByte();
                }
            }
        }

        in.markReaderIndex();
        int dataLength = in.readInt();
        if (in.readableBytes() < dataLength) {
            in.resetReaderIndex();
            return;
        }
        hadReadHeard = false;
        byte[] data = new byte[dataLength];
        in.readBytes(data);
        out.add(SerializationUtil.deserialize(data, requestResponseRpc));
    }
}

2.单客户端并发发送/消息维护问题:

发送消息的维护:
1)消息通过唯一id来区分
2)所有"发送的消息" 都记录到hashmap中维护记录.
3)发送消息后,会阻塞等待结果返回
4)所有接收的消息,都借助唯一ID匹配到"发送的消息",并唤醒(notify)阻塞的发送线程处理返回数据

public class ProxyHelperTool {
    ...........
    public <T> T create(final Class<?> interfaceClass) {
        return (T) Proxy.newProxyInstance(
                interfaceClass.getClassLoader(),
                new Class<?>[]{interfaceClass},
                new InvocationHandler() {
                    //@Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        if (method.getDeclaringClass().getAnnotation(ServiceName.class) == null) {
                            throw new RuntimeException("Annotation(ServiceName) is null.");
                        }
                        //构造请求消息,并获取请求服务,方法,参数,参数类型
                        RequestRpc requestRpc = new RequestRpc();
                        requestRpc.setMethodName(method.getName());
                        requestRpc.setServiceName(method.getDeclaringClass().getAnnotation(ServiceName.class).name());
                        requestRpc.setParameters(args);
                        requestRpc.setParameterTypes(method.getParameterTypes());
                        //设置唯一id,确保消息的唯一性
                        requestRpc.setRequestId(StringUtil.getUiid());
                        //将发送的消息 送入列表维护起来.
                        ClientHandler.waitingRPC.put(requestRpc.getRequestId(),requestRpc);
                        ProxyHelperTool.client.send(requestRpc);
                        //进入阻塞等待,直到服务返回消息 唤醒.To do:这里缺过时处理
                        synchronized(requestRpc){
                            requestRpc.wait();
                        }
                        return requestRpc.getResult();
                    }
                }
        );
    }
}

3.服务端并发服务:

public class ServerHandler extends ChannelInboundHandlerAdapter {
    .............
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //將服务方静线程里执行,避免阻塞
        ServerService.submit(new Runnable() {
            @Override
            public void run() {
                RequestRpc requestRpc = (RequestRpc)msg;
                ResponseRpc responseRpc = handle(requestRpc);
                responseRpc.setRequestId(requestRpc.getRequestId());
                ctx.writeAndFlush(responseRpc).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        System.out.println("Server operationComplete");
                    }
                });
            }
        });

        /*.addListener(ChannelFutureListener.CLOSE)*/
    }
    //真实处理服务的地方,依据对方传递的 调用服务和参数通过反射调用获取结果返回
    private ResponseRpc handle(RequestRpc requestRpc){
        ResponseRpc responseRpc = new ResponseRpc();
        Object object = ServerService.getService(requestRpc.getServiceName());
        if(object == null){
            responseRpc.setException(new RuntimeException("Not service:"+requestRpc.
                    getServiceName()));
            return responseRpc;
        }

        try {
            Class<?> serviceClass = object.getClass();
            Method method = serviceClass.getMethod(requestRpc.getMethodName(),
                    requestRpc.getParameterTypes());
            method.setAccessible(true);
            Object[] parameters = requestRpc.getParameters();
            responseRpc.setResult(method.invoke(object, parameters));
        } catch (Exception e){
            responseRpc.setResult(e);
        }
        return responseRpc;
    }
  ........
}

测试方式,以及结果

客戶端 测试模拟 调用远程服务

这里, 客户端建立单链接,并发发送消息的方式 向服务端发起服务调用

public class TestClient {
    public static ProxyHelperTool proxyHelperTool = new ProxyHelperTool();
    public static void main(String[] args) throws Exception {
        int threadNumber = 15;
        CountDownLatch countDownLatch = new CountDownLatch(threadNumber);
        //开始15个线程发送 服务调用消息
        for(int i=0;i<threadNumber;i++){
            new Thread(){
                @Override
                public void run() {
                    //客户端,通过传递当前线程的名称(Thread.currentThread().getName)给服务端;
                    //服务端,组合收到的字符 再次发回来。
                    //通过对比 "线程名",可见各个线程收到的是否是自己发送的。
                    MsgService msgService = proxyHelperTool.create(MsgService.class);
                    String reslut = msgService.send(Thread.currentThread().getName());
                    System.out.println("Client("+Thread.currentThread().getName()+") get mag:" + "\n" + "..." + reslut);
                    countDownLatch.countDown();
                }
            }.start();
        }
        countDownLatch.await();
        ClientHelper.getClientHelper().close();
    }

}

客戶端 测试模拟 收到的结果

可见对应的调用线程,都收到了自己发出去的消息. 对应的thread-name 匹配


在这里插入图片描述

参考

https://my.oschina.net/huangyong/blog/361751?fromerr=NpC3phqY
https://github.com/apache/hadoop

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,509评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,806评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,875评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,441评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,488评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,365评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,190评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,062评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,500评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,706评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,834评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,559评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,167评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,779评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,912评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,958评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,779评论 2 354

推荐阅读更多精彩内容