RPC学习笔记

介绍

从单机走向分布式,产生了很多分布式的通信方式

  • 最古老有效也是最不过时的TCP/UDP二进制传输,事实上所有的通信方式归根到底是TCP/UDP
  • CORBA(Common Object Request Broker Architecture) 古老而复杂的,支持面向对象的通信协议
  • Web Service(SOA SOAP RDDI WSDL..) 基于HTTP+XML的标准化Web Api
  • Restful (Represenational State Transfer) 回归简单化本源的Web Api事实标准 http+json
  • RMI Remote method invocation Java内部的分布式通信协议
  • JMS Java Message Service JavaEE中的消息框架标准,为很多MQ所支持
  • RPC Remote Procedure Call 远程方法调用,这只是一个统称,重点在于方法的调用(不支持对象的概念),具体实现甚至可以用RMI Restful等等去实现,但一般不用,因为RMI不能跨语言,restful效率太低。多用于服务器集群间的通信,因此常使用更加高效,短小精悍的传输模式以提高效率
    完整RPC框架:序列化,服务通信,服务注册,服务发现,服务治理,服务监控,服务负载均衡

实现

1

最原始的客户端 服务端

public iterface IUserService{
  User finUserById(int id);
}
public class IUserServiceImpl implements IUserService {
    @Override
    public User findUserById(int id) {
        return new User(id,"Alice");
    }//直接new模拟数据库查询
}
public class Client {
   public static void main(String[] args) throws Exception {
  Socket socket = new Socket('127.0.0.1', 8088)
  ByteArrayOutputStream baos = new ByteArrayOutputStream()
  DataoutputStream dos = new DataoutputStream(baos);
  dos.write(123); // 缺少灵活性
  // 发送查询id
 socket.getOutputStream().write(baos.toByteArray()
 socket.getOutputStream().flush()
  // 接收服务器返回结果
  DataInputStream dis = new 
  DataInputStream(socket.getInputStream())
  int id = dis.readInt();
  String name = dis.readUTF();
  User user = new User(id, name);
  dos.close();
  socket.close();
)
}
}
public class Server {
   public static void main(String[] args) throws Exception {
  DataInputStream dis = new DataInputStream(socket.getInputStream())
  DataOutputStream dos = new DataOutputStream(socket.getOutputStream())
  int id = dis.readInt()
  IUserService service = new IUserServiceImpl();
  User user = service.findUserById(id);
  dos.write(user.getId())
  dos.writeUTF(user.getName())
  dos.flush()
}}

2

客户端不需要知道网络细节,只需要知道接口用法

public class Client {
  ...
    Stub stub = new Stub();
     ...(stub.findUserById(123));
}
public class Server{
  public static boolean running = true;
  ...main... {
    ServerSocket server = new ServerSOcket(8080)
    while(runnning) {
      Socket client = server.accept()
      process(client)
      client.close()
    }
    server.close()
}
  public static void process(Socket socket) throws Exception {
     DataInputStream dis = new DataInputStream(socket.getInputStream())
  DataOutputStream dos = new DataOutputStream(socket.getOutputStream())
  int id = dis.readInt()
  IUserService service = new IUserServiceImpl();
  User user = service.findUserById(id);
  dos.write(user.getId())
  dos.writeUTF(user.getName())
  dos.flush()
}
}
public class Stub {
  public User findUserById(int id) .. {
    Socket socket = new Socket('127.0.0.1', 8088)
  ByteArrayOutputStream baos = new ByteArrayOutputStream()
  DataoutputStream dos = new DataoutputStream(baos);
  dos.write(123); // 缺少灵活性
  // 发送查询id
 socket.getOutputStream().write(baos.toByteArray()
 socket.getOutputStream().flush()
  // 接收服务器返回结果
  DataInputStream dis = new 
  DataInputStream(socket.getInputStream())
  int idtmp = dis.readInt();
  if (idtmp != id) ...('error')
  String name = dis.readUTF();
  User user = new User(id, name);
   return User;
  }
}

3

改动客户端,在rpc2的基础上,把stub改成代理方式,stub不是new出来的,而是使用他的静态方法getStub,这个版本相对于上个版本的优点还不能直观体现,因为硬编码部分还没有改动完成,rpc4将通用化这个动态代理

// client
IUserService stub = Stub.getStub()
...(stub.findUserById(123))
// stub
InvocationHandler h = new InvocationHandler() {
  @override
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    Socket socket = new Socket('127.0.0.1', 8088)
  ByteArrayOutputStream baos = new ByteArrayOutputStream()
  DataoutputStream dos = new DataoutputStream(baos);
  dos.write(123); // 缺少灵活性
  // 发送查询id
 socket.getOutputStream().write(baos.toByteArray()
 socket.getOutputStream().flush()
  // 接收服务器返回结果
  DataInputStream dis = new 
  DataInputStream(socket.getInputStream())
  int idtmp = dis.readInt();
  if (idtmp != id) ...('error')
  String name = dis.readUTF();
  User user = new User(id, name);
   return User;
}
};
Object o = Proxy.newProxyInstance(IUserService.class.getClassLoader(), new Class[]{IUserService.class}, h);
...(o.getClass.getName());
...(o.getClass().getInterface()[0]);
return (IUserService) o;

4

这个版本彻底改变了客户端与服务端的通信方式:
客户端不仅仅传递参数给服务器,还要传输调用的方法名,参数类型,因此可以适用不同的方法的调用而不用改变代码,stub中使用反射拿到需要传递的上述信息,sub也不用改变代码,顺便这个版本改用了Object Input/output Stream,注意这个时候要确保传输的对象可序列化,服务端,客户端,stub均有改动
总之,这个版本在同一个服务类型(IUserService)之下,调用任何方法都不需要改动stub和服务端代码,只需要改动IUserService接口

// client
IUserService service = Stub.getStub();
...(service.findUserById(123));
// server
public static process(Socket socket) .. {
  DataInputStream osi= new DataInputStream(socket.getInputStream())
  DataOutputStream oos = new DataOutputStream(socket.getOutputStream())
  int id = dis.readInt()
  // 为了适应客户端变化而做的改动
  String method = ois.readObject();
  Class[] parameterTypes = (Class[])ois.readObject()
  Object[] paramters = (Object[])ois.readObject();
  // 服务类型暂时还是写死的,不够灵活
  IUserService service = new IUserServiceImpl();
  Method method = service.getClass().getMethod(methodName, parameterTypes);
  User user = (User)method.invoke(service, parameters);
User user = service.findUserById(id);
  dos.writeObject(user);
  dos.flush()
}
// stub
InvocationHandler h = new InvocationHandler() {
  @override
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    Socket socket = new Socket('127.0.0.1', 8088)
  ByteArrayOutputStream baos = new ByteArrayOutputStream()
  DataoutputStream oos = new DataoutputStream(baos);
  // 通用化的改动
  oos.writeUTF(method.getMethodName());
  oos.writeObject(method.getParameterTypes());
  oos.writeObject(args);
  oos.flush();
  // 接收服务器返回结果
  DataInputStream ois = new 
  DataInputStream(socket.getInputStream())
   User user = (User)ois.readObject();
   return user;
}
};
Object o = Proxy.newProxyInstance(IUserService.class.getClassLoader(), new Class[]{IUserService.class}, h);
...(o.getClass.getName());
...(o.getClass().getInterface()[0]);
return (IUserService) o;

5

这个版本在之前通用化的基础上,连服务类型都能改变,变成通用的,主要将服务类型作为参数传入getStub,为了验证server和stub的通用性,这里client调用了两个不同服务的不同接口,都可以正常运行

IUserService service = (IUserService)Stub.getStub(IUserService.class)
IProductService service2 = (IProductService)Stub.getStub(IProductService.class)
...(service.findUserById(123))
...(service2.findProductByName('bob'))
// server
public static process(Socket socket) .. {
  DataInputStream osi= new DataInputStream(socket.getInputStream())
  DataOutputStream oos = new DataOutputStream(socket.getOutputStream())
  int id = dis.readInt()
  // 为了适应客户端变化而做的改动
  String clazzName = ois.readUTF();
  String method = ois.readObject();
  Class[] parameterTypes = (Class[])ois.readObject()
  Object[] paramters = (Object[])ois.readObject();
  // 服务类型暂时还是写死的,不够灵活
  // IUserService service = new IUserServiceImpl();
   // 本来是硬编码new出来的,现在变成从注册表查询到服务类,如果使用spring还可以直接根据配置植入bean然后根据bean查找
 Object service = registerTable.get(clazzName).newInstance() 
 Method method = service.getClass().getMethod(methodName, parameterTypes);
  User user = (User)method.invoke(service, parameters);
User user = service.findUserById(id);
  dos.writeObject(user);
  dos.flush()
}
// stub
// 添加了服务类型的传输
public class Stub {
  static Object getStub(class c) {
    ...
    // 增加服务类型传输
    oos.writeUTF(c.getName())
  }
   。。。
    Object o = Proxy.newProxyInstance(c.getClassLoader(), new Class[]{c}, h);//这里要写成通用的c,而不是固定的接口
}

RPC序列化框架

序列化:对象 -> 二进制
对象 -> json格式/xml格式 ->二进制

  • 1 java.io.Serializable
  • 2 Hessian
  • 3 google protobuf 复杂 效率高
  • 4 facebook Thrift 复杂 效率高
  • 5 kyro
  • 6 fst
  • 7 json 序列化框架 Jackson, google Gson, Ali FastJson
  • 8 xmlRpc(xstream)
    ...

Hession

public class HelloHession {
  public static void main()... {
    User u = new User(1, 'zhangsan')
    byte[] bytes = seriablize(u);
    ...(bytes.length)
    User u1 = (User)deserialize(bytes);
  }
  public static byte[] seralize(object o) {
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    Hession2Output output = new Hession2Output(baos)
    output.writeObject(o)
    output.flush()
    byte[] bytes = baos.toByteArray()
    baos.close()
    output.close()
    return bytes;
  }
  public static byte[] deseralize(object o) {
     ByteArrayInStream bais = new ByteArrayInStream();
    Hession2Input input = new Hession2Input(baos)
    Object o = input.readObject();
    bais.close();
    input.close();
    return o;
  }
}

通讯协议

  • http
  • http2.0(gRPC) 双工
  • TCP
    • 同步/异步 , 阻塞/非阻塞
  • WebService
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • RPC是什么? RPC 的全称是 Remote Procedure Call 是一种进程间通信方式。 它允许程序调...
    1angxi阅读 5,295评论 5 20
  • 最近上手公司一个老项目, 遇到了一个RPC服务调用失败的问题, 由于之前只接触过RestCilent这一种远程服务...
    春风知桺阅读 1,078评论 0 2
  • RPC协议 RPC协议同HTTP协议一样,都是属于应用层的协议,可以通过学习HTTP的协议来理解RPC协议. HT...
    LegendGo阅读 444评论 0 0
  • 【定义】 Java RMI:Java远程方法调用,即Java RMI(Java Remote Method Inv...
    小梁同学jxy阅读 226评论 0 0
  • 我是黑夜里大雨纷飞的人啊 1 “又到一年六月,有人笑有人哭,有人欢乐有人忧愁,有人惊喜有人失落,有的觉得收获满满有...
    陌忘宇阅读 8,633评论 28 53