Netty实现RPC的思路

RPC

RPC(Remote Procedure Call)远程过程调用,是一个计算机通信协议,可以实现远程调用远程接口就想调用本地接口一样的高效。

image.png

分布式组件中:外部RESTful内部RPC。

RPC调用流程

image.png
  1. 服务消费方(client)以本地调用方式调用服务
  2. client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
  3. client stub 将消息进行编码并发送到服务端
  4. server stub 收到消息后进行解码
  5. server stub 根据解码结果调用本地的服务
  6. 本地服务执行并将结果返回给 server stub
  7. server stub 将返回导入结果进行编码并发送至消费方
  8. client stub 接收到消息并进行解码
  9. 服务消费方(client)得到结果

小结:RPC 的目标就是将 2-8 这些步骤都封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用。

通过客户端代理,实现通过netty通信,实现接口的远程调用。

code

RPC客户端

package com.pl.netty.rpc.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * <p>
 *
 * @Description: TODO
 * </p>
 * @ClassName NettyClient
 * @Author pl
 * @Date 2021/3/5
 * @Version V1.0.0
 */
public class NettyClient {
    //创建一个线程池
    private static ExecutorService executor= Executors.newFixedThreadPool(5);

    private static NettyClientHandler client;

    //编写方法,使用代理模式,获取一个代理对象
    public Object getBean(final Class<?> serviceClass, final String providerName) {
        //通过代理对目标对象的方法进行增强
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{serviceClass},
                (proxy, method, args) -> {
                    System.out.println("代理被调用");
                    if (client == null)
                        initClient();
                    //方法参数
                    client.setPara(providerName + args[0]);
                    return executor.submit(client).get();
                });
    }


    //初始化客户端
    private static void initClient() {
        client = new NettyClientHandler();
        //创建EventLoopGroup
        NioEventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(client);
                        }
                    });
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 7000).sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

NettyClientHandler

package com.pl.netty.rpc.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.Callable;

/**
 * <p>
 *
 * @Description: TODO
 * </p>
 * @ClassName NettyClientHandler
 * @Author pl
 * @Date 2021/3/5
 * @Version V1.0.0
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {

    private ChannelHandlerContext context; //上下文
    private String result; //返回的结果
    private String para; //客户端调用方法时,传入的参数

    //与服务端创建连接后调用
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("通道连接成功");
        context = ctx; //因为我们在其他方法会使用到 ctx
    }

    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        result = msg.toString();
        notify(); //唤醒等待的线程
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    //被代理对象的调用,真正发送数据给服务器,发送完后就阻塞,等待被唤醒(channelRead)
    @Override
    public synchronized Object call() throws Exception {
        System.out.println("线程被调用-----");
        context.writeAndFlush(para);
        //进行wait
        wait(); //等待 channelRead 获取到服务器的结果后,进行唤醒。
        return result; //服务方返回的结果
    }

    public void setPara(String para){
        this.para = para;
    }
}

ClientBootStrap

package com.pl.netty.rpc.client;

import com.pl.netty.rpc.server.HelloService;

/**
 * <p>
 *
 * @Description: TODO
 * </p>
 * @ClassName ClientBootStrap
 * @Author pl
 * @Date 2021/3/5
 * @Version V1.0.0
 */
public class ClientBootStrap {

    //这里定义协议头
    public static final String providerName = "HelloService#hello#";

    public static void main(String[] args) throws InterruptedException {
        //创建一个消费者
        NettyClient customer = new NettyClient();
        //创建代理对象
        HelloService service = (HelloService) customer.getBean(HelloService.class, providerName);

        //通过代理对象调用服务提供者的方法
        String res = service.hello("你好 Dubbo");
        System.out.println("调用的结果,res = " + res);
        Thread.sleep(2000);
    }
}

RPC服务端

NettyServer

package com.pl.netty.rpc.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * <p>
 *
 * @Description: TODO
 * </p>
 * @ClassName NettyServer
 * @Author pl
 * @Date 2021/3/5
 * @Version V1.0.0
 */
public class NettyServer {
    public static  void startServer(String hostName, int port) {
        startServer0(hostName, port);
    }

    private static void startServer0(String hostname, int port) {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            pipeline.addLast(new NettyServerHandler()); //业务处理器
                        }
                    });
            ChannelFuture channelFuture = serverBootstrap.bind(hostname,port).sync();
            System.out.println("服务提供方开始运行");
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        startServer("127.0.0.1",7000);
    }
}

NettyServerHandler

package com.pl.netty.rpc.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * <p>
 *
 * @Description: TODO
 * </p>
 * @ClassName NettyServerHandler
 * @Author pl
 * @Date 2021/3/5
 * @Version V1.0.0
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //获取客户端发送的消息,并调用服务
        System.out.println("msg=" + msg);
        //客户端在调用服务器的api 时,我们需要定义一个协议
        //比如要求,每次发消息时,都必须以某个字符串开头 "HelloService#hello#你好"
        if (msg.toString().startsWith("HelloService#hello#")) {
            String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
            ctx.writeAndFlush(result);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

HelloService

package com.pl.netty.rpc.server;

/**
 * <p>
 *
 * @Description: TODO
 * </p>
 * @ClassName HelloService
 * @Author pl
 * @Date 2021/3/5
 * @Version V1.0.0
 */
public interface  HelloService {
    String hello(String message);
}

HelloServiceImpl

package com.pl.netty.rpc.server;

/**
 * <p>
 *
 * @Description: TODO
 * </p>
 * @ClassName HelloServiceImpl
 * @Author pl
 * @Date 2021/3/5
 * @Version V1.0.0
 */
public class HelloServiceImpl implements HelloService {
    @Override
    public String hello(String message) {
        System.out.println("收到客户端消息=" + message);
        //根据 message 返回不同的结果
        if(message != null) {
            return "你好客户端,我已经收到你的消息【" + message + "】";
        } else {
            return "你好客户端,我已经收到你的消息。";
        }
    }
}

输出


image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,843评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,538评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,187评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,264评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,289评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,231评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,116评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,945评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,367评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,581评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,754评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,458评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,068评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,692评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,842评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,797评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,654评论 2 354