介绍
从单机走向分布式,产生了很多分布式的通信方式
- 最古老有效也是最不过时的TCP/UDP二进制传输,事实上所有的通信方式归根到底是TCP/UDP
- CORBA(Common Object Request Broker Architecture) 古老而复杂的,支持面向对象的通信协议
- Web Service(SOA SOAP RDDI WSDL..) 基于HTTP+XML的标准化Web Api
- Restful (Represenational State Transfer) 回归简单化本源的Web Api事实标准 http+json
- RMI Remote method invocation Java内部的分布式通信协议
- JMS Java Message Service JavaEE中的消息框架标准,为很多MQ所支持
- RPC Remote Procedure Call 远程方法调用,这只是一个统称,重点在于方法的调用(不支持对象的概念),具体实现甚至可以用RMI Restful等等去实现,但一般不用,因为RMI不能跨语言,restful效率太低。多用于服务器集群间的通信,因此常使用更加高效,短小精悍的传输模式以提高效率
完整RPC框架:序列化,服务通信,服务注册,服务发现,服务治理,服务监控,服务负载均衡
实现
1
最原始的客户端 服务端
public iterface IUserService{
User finUserById(int id);
}
public class IUserServiceImpl implements IUserService {
@Override
public User findUserById(int id) {
return new User(id,"Alice");
}//直接new模拟数据库查询
}
public class Client {
public static void main(String[] args) throws Exception {
Socket socket = new Socket('127.0.0.1', 8088)
ByteArrayOutputStream baos = new ByteArrayOutputStream()
DataoutputStream dos = new DataoutputStream(baos);
dos.write(123); // 缺少灵活性
// 发送查询id
socket.getOutputStream().write(baos.toByteArray()
socket.getOutputStream().flush()
// 接收服务器返回结果
DataInputStream dis = new
DataInputStream(socket.getInputStream())
int id = dis.readInt();
String name = dis.readUTF();
User user = new User(id, name);
dos.close();
socket.close();
)
}
}
public class Server {
public static void main(String[] args) throws Exception {
DataInputStream dis = new DataInputStream(socket.getInputStream())
DataOutputStream dos = new DataOutputStream(socket.getOutputStream())
int id = dis.readInt()
IUserService service = new IUserServiceImpl();
User user = service.findUserById(id);
dos.write(user.getId())
dos.writeUTF(user.getName())
dos.flush()
}}
2
客户端不需要知道网络细节,只需要知道接口用法
public class Client {
...
Stub stub = new Stub();
...(stub.findUserById(123));
}
public class Server{
public static boolean running = true;
...main... {
ServerSocket server = new ServerSOcket(8080)
while(runnning) {
Socket client = server.accept()
process(client)
client.close()
}
server.close()
}
public static void process(Socket socket) throws Exception {
DataInputStream dis = new DataInputStream(socket.getInputStream())
DataOutputStream dos = new DataOutputStream(socket.getOutputStream())
int id = dis.readInt()
IUserService service = new IUserServiceImpl();
User user = service.findUserById(id);
dos.write(user.getId())
dos.writeUTF(user.getName())
dos.flush()
}
}
public class Stub {
public User findUserById(int id) .. {
Socket socket = new Socket('127.0.0.1', 8088)
ByteArrayOutputStream baos = new ByteArrayOutputStream()
DataoutputStream dos = new DataoutputStream(baos);
dos.write(123); // 缺少灵活性
// 发送查询id
socket.getOutputStream().write(baos.toByteArray()
socket.getOutputStream().flush()
// 接收服务器返回结果
DataInputStream dis = new
DataInputStream(socket.getInputStream())
int idtmp = dis.readInt();
if (idtmp != id) ...('error')
String name = dis.readUTF();
User user = new User(id, name);
return User;
}
}
3
改动客户端,在rpc2的基础上,把stub改成代理方式,stub不是new出来的,而是使用他的静态方法getStub,这个版本相对于上个版本的优点还不能直观体现,因为硬编码部分还没有改动完成,rpc4将通用化这个动态代理
// client
IUserService stub = Stub.getStub()
...(stub.findUserById(123))
// stub
InvocationHandler h = new InvocationHandler() {
@override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket socket = new Socket('127.0.0.1', 8088)
ByteArrayOutputStream baos = new ByteArrayOutputStream()
DataoutputStream dos = new DataoutputStream(baos);
dos.write(123); // 缺少灵活性
// 发送查询id
socket.getOutputStream().write(baos.toByteArray()
socket.getOutputStream().flush()
// 接收服务器返回结果
DataInputStream dis = new
DataInputStream(socket.getInputStream())
int idtmp = dis.readInt();
if (idtmp != id) ...('error')
String name = dis.readUTF();
User user = new User(id, name);
return User;
}
};
Object o = Proxy.newProxyInstance(IUserService.class.getClassLoader(), new Class[]{IUserService.class}, h);
...(o.getClass.getName());
...(o.getClass().getInterface()[0]);
return (IUserService) o;
4
这个版本彻底改变了客户端与服务端的通信方式:
客户端不仅仅传递参数给服务器,还要传输调用的方法名,参数类型,因此可以适用不同的方法的调用而不用改变代码,stub中使用反射拿到需要传递的上述信息,sub也不用改变代码,顺便这个版本改用了Object Input/output Stream,注意这个时候要确保传输的对象可序列化,服务端,客户端,stub均有改动
总之,这个版本在同一个服务类型(IUserService)之下,调用任何方法都不需要改动stub和服务端代码,只需要改动IUserService接口
// client
IUserService service = Stub.getStub();
...(service.findUserById(123));
// server
public static process(Socket socket) .. {
DataInputStream osi= new DataInputStream(socket.getInputStream())
DataOutputStream oos = new DataOutputStream(socket.getOutputStream())
int id = dis.readInt()
// 为了适应客户端变化而做的改动
String method = ois.readObject();
Class[] parameterTypes = (Class[])ois.readObject()
Object[] paramters = (Object[])ois.readObject();
// 服务类型暂时还是写死的,不够灵活
IUserService service = new IUserServiceImpl();
Method method = service.getClass().getMethod(methodName, parameterTypes);
User user = (User)method.invoke(service, parameters);
User user = service.findUserById(id);
dos.writeObject(user);
dos.flush()
}
// stub
InvocationHandler h = new InvocationHandler() {
@override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Socket socket = new Socket('127.0.0.1', 8088)
ByteArrayOutputStream baos = new ByteArrayOutputStream()
DataoutputStream oos = new DataoutputStream(baos);
// 通用化的改动
oos.writeUTF(method.getMethodName());
oos.writeObject(method.getParameterTypes());
oos.writeObject(args);
oos.flush();
// 接收服务器返回结果
DataInputStream ois = new
DataInputStream(socket.getInputStream())
User user = (User)ois.readObject();
return user;
}
};
Object o = Proxy.newProxyInstance(IUserService.class.getClassLoader(), new Class[]{IUserService.class}, h);
...(o.getClass.getName());
...(o.getClass().getInterface()[0]);
return (IUserService) o;
5
这个版本在之前通用化的基础上,连服务类型都能改变,变成通用的,主要将服务类型作为参数传入getStub,为了验证server和stub的通用性,这里client调用了两个不同服务的不同接口,都可以正常运行
IUserService service = (IUserService)Stub.getStub(IUserService.class)
IProductService service2 = (IProductService)Stub.getStub(IProductService.class)
...(service.findUserById(123))
...(service2.findProductByName('bob'))
// server
public static process(Socket socket) .. {
DataInputStream osi= new DataInputStream(socket.getInputStream())
DataOutputStream oos = new DataOutputStream(socket.getOutputStream())
int id = dis.readInt()
// 为了适应客户端变化而做的改动
String clazzName = ois.readUTF();
String method = ois.readObject();
Class[] parameterTypes = (Class[])ois.readObject()
Object[] paramters = (Object[])ois.readObject();
// 服务类型暂时还是写死的,不够灵活
// IUserService service = new IUserServiceImpl();
// 本来是硬编码new出来的,现在变成从注册表查询到服务类,如果使用spring还可以直接根据配置植入bean然后根据bean查找
Object service = registerTable.get(clazzName).newInstance()
Method method = service.getClass().getMethod(methodName, parameterTypes);
User user = (User)method.invoke(service, parameters);
User user = service.findUserById(id);
dos.writeObject(user);
dos.flush()
}
// stub
// 添加了服务类型的传输
public class Stub {
static Object getStub(class c) {
...
// 增加服务类型传输
oos.writeUTF(c.getName())
}
。。。
Object o = Proxy.newProxyInstance(c.getClassLoader(), new Class[]{c}, h);//这里要写成通用的c,而不是固定的接口
}
RPC序列化框架
序列化:对象 -> 二进制
对象 -> json格式/xml格式 ->二进制
- 1 java.io.Serializable
- 2 Hessian
- 3 google protobuf 复杂 效率高
- 4 facebook Thrift 复杂 效率高
- 5 kyro
- 6 fst
- 7 json 序列化框架 Jackson, google Gson, Ali FastJson
- 8 xmlRpc(xstream)
...
Hession
public class HelloHession {
public static void main()... {
User u = new User(1, 'zhangsan')
byte[] bytes = seriablize(u);
...(bytes.length)
User u1 = (User)deserialize(bytes);
}
public static byte[] seralize(object o) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Hession2Output output = new Hession2Output(baos)
output.writeObject(o)
output.flush()
byte[] bytes = baos.toByteArray()
baos.close()
output.close()
return bytes;
}
public static byte[] deseralize(object o) {
ByteArrayInStream bais = new ByteArrayInStream();
Hession2Input input = new Hession2Input(baos)
Object o = input.readObject();
bais.close();
input.close();
return o;
}
}
通讯协议
- http
- http2.0(gRPC) 双工
- TCP
- 同步/异步 , 阻塞/非阻塞
- WebService