笔者初识Dubbo的时候,只知道它是一个RPC框架,那么什么是RPC呢?
1. RPC是什么
维基百科是这么定义RPC的:
在分布式计算,远程过程调⽤(英语:Remote Procedure Call,缩写为 RPC)是⼀个计算机通信协 议。该协议允许运⾏于⼀台计算机的程序调⽤另⼀个地址空间(通常为⼀个开放⽹络的⼀台计算机)的⼦程序,⽽程序员就像调⽤本地程序⼀样,⽆需额外地为这个交互作⽤编程(⽆需关注细节)。RPC是 ⼀种服务器-客户端(Client/Server)模式,经典实现是⼀个通过发送请求-接受回应进⾏信息交互的系统。如果涉及的软件采⽤⾯向对象编程,那么远程过程调⽤亦可称作远程调⽤或远程⽅法调⽤,例:Java RMI。
所以对于Java程序而言,RPC就是远程方法调用。
1.1 相对于本地方法调用
远程⽅法调⽤和本地⽅法调⽤是相对的两个概念,本地⽅法调⽤指的是进程内部的⽅法调⽤,⽽远程⽅法调⽤指的是两个进程内的⽅法相互调⽤。
1.2 基于网络传输
从上述表述中可以看到,远程方法调用指的是两个进程内的方法调用,如果要实现远程方法调用,基本就是通过网络传输数据进行调用。
所以就有了
- RPC over Http:基于Http协议来传输数据
- PRC over Tcp:基于Tcp协议来传输数据
1.3 传输数据
对于所传输的数据,可以交由RPC的双⽅来协商定义,但基本都会包括:
- 调⽤的是哪个类或接⼝
- 调⽤的是哪个⽅法,⽅法名和⽅法参数类型(考虑⽅法重载)
- 调⽤⽅法的⼊参
2. 什么是Dubbo
2.1 定义
⽬前,官⽹上是这么介绍的:Apache Dubbo 是⼀款⾼性能、轻量级的开源 Java 服务框架。
在⼏个⽉前,官⽹的介绍是:Apache Dubbo 是⼀款⾼性能、轻量级的开源 Java RPC框架。
Dubbo⼀开始的定位就是RPC,专注于两个服务之间的调⽤。但随着微服务的盛⾏,除开服务调⽤之外, Dubbo也在逐步的涉猎服务治理、服务监控、服务⽹关等等,所以现在的Dubbo⽬标已经不⽌是RPC框架了,⽽是和Spring Cloud类似想成为⼀个服务框架。
2.2 基本组成
Dubbo有以下几个模块:注册中心、服务提供者、容器、服务消费者和监控中心组成。
2.2.1 注册中心(Registry)
使用过SpringCloud或者Zookeeper的同学应该有印象,对于一个分布式服务,最重要的组成之一就是注册中心,它可以管理系统的整个服务,包括服务发现与服务注册。
2.2.2 服务提供者(Provider)
前面提到RPC框架需要在网络传输数据,这个数据在Java中可以表示为一个接口,而服务提供者的作用就是把这个接口提供出去,并向注册中心注册自己,这样才可以把服务暴露出去,供服务消费者调用。服务提供者和服务消费者双方需要约定一个协议来处理服务消费者的请求。
2.2.3 容器(Container)
Dubbo的Container模块,是一个独立的容器,因为服务通常不需要Tomcat/JBoss等Web容器的特性,没必要用Web容器去加载服务。Dubbo内置了jetty、spring来启动服务,但是也提供了容器扩展,用户可以自定义容器启动服务,所以也可以使用Tomcat启动服务,但是官方不推荐这么使用。
2.2.4 服务消费者(Consumer)
服务消费者需要向服务提供者发送请求,来获取最终数据。
2.2.5 监控中心
一个分布式服务要想稳定运行,需要在全链路上监控服务的运行状态,那种无监控的服务实在是太可怕了,笔者就遇到过无监控的服务,发现问题、排查问题的过程实在是太痛苦了。
2.2.6 基本调用流程
- 服务容器负责启动,加载,运行服务提供者。
- 服务提供者在启动时,向注册中心注册自己提供的服务。
- 服务消费者在启动时,向注册中心订阅自己所需的服务。
- 注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者。
- 服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行调用,如果调用失败,再选另一台调用。
- 服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心。
3. 自己实现一个简单的Dubbo
介绍了这么多,是时候展现真正的技术了,我们先自己实现一个简单版本的Dubbo。
3.1 需求分析
首先进行需求分析,我们设计一个框架,希望框架有哪些模块、每个模块的功能是什么,这里可以参考Dubbo的架构来设计。
由于是简易的版本,我们只需要注册中心、服务提供者、服务消费者以及容器这四个模块。
3.2 模块功能分析
3.2.1 注册中心
- 提供注册服务功能
- 提供获取已注册服务功能
3.2.2 服务提供者
- 提供调用接口供服务消费者调用
- 启动时向注册中心注册服务
3.2.3 容器
- 服务容器负责启动,加载,运行服务提供者
3.2.4 服务消费者
- 启动时向注册中心订阅服务
- 调用服务提供者提供的接口,调用该接口
3.3 项目整体结构
整个项目分三个包,consumer、provider以及framework。
- consumer:包含一个消费者启动类Consumer
- provider:包含提供的接口UserService、接口实现类UserServiceImpl以及提供方启动类Provider
- framework:包含protocol包、proxy包以及register包
- protocol:协议层,本次用Http协议,包含HttpClient(Http客户端)、HttpServer(服务)、HttpServerHandler(服务处理)、DispatcherServlet(转发)以及Invocation(传输实体)
- proxy:包含一个代理工厂ProxyFactory
- register:包含一个LocalRegister
3.4 调用流程
- Provider类启动,注册本服务,启动容器
- Consumer类启动,通过动态代理,调用暴露的接口
- ProxyFactory通过HttpClient发送请求,调用服务提供方提供的接口
- 服务提供方接到该接口请求,通过HttpServerHandler处理请求
- HttpServerHandler解析请求参数,获取传输实体Invocation
- 通过传输实体中的参数,用反射获取UserServiceImpl的实例
- 调用UserServiceImpl中的方法,获得返回数据
- 将返回数据写到Response中,返回给服务消费者
- 消费者接到返回数据,呈现出来
3.5 具体实现
整体依赖文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>RPCDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.16.Final</version>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
<version>9.0.12</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.13</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.14</version>
</dependency>
</dependencies>
</project>
3.5.1 provider层
Provider类
package com.rpc.provider;
import com.rpc.framework.protocol.http.HttpServer;
import com.rpc.framework.register.LocalRegister;
public class Provider {
public static void main(String[] args) {
String interfaceName = UserService.class.getName();
LocalRegister.register(interfaceName,UserServiceImpl.class);
//启动服务
HttpServer httpServer = new HttpServer();
httpServer.start("localhost",8080);
}
}
接口UserService
package com.rpc.provider;
public interface UserService {
String hello(String name);
}
实现类UserServiceImpl
package com.rpc.provider;
public class UserServiceImpl implements UserService {
@Override
public String hello(String name) {
return "hello" + name;
}
}
3.5.2 consumer层
Cousumer类
package com.rpc.consumer;
import com.rpc.framework.proxy.ProxyFactory;
import com.rpc.provider.UserService;
public class Consumer {
public static void main(String[] args) {
UserService userService = ProxyFactory.gerProxy(UserService.class);
String result = userService.hello("PRC");
System.out.println(result);
}
}
3.5.3 framework层
3.5.3.1 protocol层
http层里的四个类
package com.rpc.framework.protocol.http;
import lombok.SneakyThrows;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
public class DispatcherServlet extends HttpServlet {
@SneakyThrows
@Override
protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
new HttpServerHandler().handler(req,resp);
}
}
package com.rpc.framework.protocol.http;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rpc.framework.protocol.Invocation;
import org.apache.http.HttpEntity;
import org.apache.http.ParseException;
import org.apache.http.client.HttpRequestRetryHandler;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.protocol.HttpContext;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.util.EntityUtils;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.nio.charset.Charset;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
public class HttpClient {
public static SSLContext createSSL() {
try {
return new SSLContextBuilder().loadTrustMaterial(null, (certificate, authType) -> true).build();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (KeyManagementException e) {
e.printStackTrace();
} catch (KeyStoreException e) {
e.printStackTrace();
}
return null;
}
/**
* 构建httpclient客户端,设置SSL证书,并设置请求失败的重试次数
*/
private static final CloseableHttpClient httpclient = HttpClients
.custom().setSSLContext(createSSL())
.setRetryHandler(new HttpRequestRetryHandler() {
@Override
public boolean retryRequest(IOException e, int retryCount, HttpContext httpContext) {
if (retryCount <= 3) {
return true;
}
return false;
}
}).build();
/**
* 发送HttpPost请求,参数为map
*
* @param url
* @param invocation
* @return
*/
public static String sendPost(String url, Invocation invocation) {
String jsonString = JSON.toJSONString(invocation);
StringEntity entity = new StringEntity(jsonString, Charset.forName("UTF-8"));
/*entity.setContentEncoding("UTF-8");
entity.setContentType("application/json;charset=UTF-8");*/
HttpPost httppost = new HttpPost(url);
/*
设置请求头 请求体
*/
httppost.setHeader("Content-Type", "application/json;charset=UTF-8");
httppost.setHeader("Accept", "application/json");
httppost.setEntity(entity);
CloseableHttpResponse response = null;
try {
response = httpclient.execute(httppost);
} catch (IOException e) {
e.printStackTrace();
}
HttpEntity httpEntity = response.getEntity();
String result = null;
try {
result = EntityUtils.toString(httpEntity);
} catch (ParseException | IOException e) {
e.printStackTrace();
}
return result;
}
}
package com.rpc.framework.protocol.http;
import org.apache.catalina.*;
import org.apache.catalina.connector.Connector;
import org.apache.catalina.core.StandardContext;
import org.apache.catalina.core.StandardEngine;
import org.apache.catalina.core.StandardHost;
import org.apache.catalina.startup.Tomcat;
/**
* 封装Tomcat启动服务
*
*/
public class HttpServer {
public void start(String hostName, Integer port){
Tomcat tomcat = new Tomcat();
Server server = tomcat.getServer();
Service service = server.findService("Tomcat");
Connector connector = new Connector();
connector.setPort(port);
Engine engine = new StandardEngine();
engine.setDefaultHost(hostName);
Host host = new StandardHost();
host.setName(hostName);
String contextPath = "";
Context context = new StandardContext();
context.setPath(contextPath);
context.addLifecycleListener(new Tomcat.FixContextListener());
host.addChild(context);
engine.addChild(host);
service.setContainer(engine);
service.addConnector(connector);
tomcat.addServlet(contextPath,"dispatcher", new DispatcherServlet());
context.addServletMappingDecoded("/*", "dispatcher");
try {
tomcat.start();
tomcat.getServer().await();
} catch (LifecycleException e) {
e.printStackTrace();
}
}
}
package com.rpc.framework.protocol.http;
import com.alibaba.fastjson.JSONObject;
import com.rpc.framework.protocol.Invocation;
import com.rpc.framework.register.LocalRegister;
import com.rpc.provider.UserService;
import com.rpc.provider.UserServiceImpl;
import org.apache.commons.io.IOUtils;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
public class HttpServerHandler {
public void handler(HttpServletRequest req, HttpServletResponse resp) throws IOException, IllegalAccessException, InstantiationException, InvocationTargetException, NoSuchMethodException {
//处理请求
Invocation invocation = JSONObject.parseObject(req.getInputStream(), Invocation.class);
Class aClass = LocalRegister.get(invocation.getInterfaceName());
try {
Method method = aClass.getMethod(invocation.getMethodName(),invocation.getParamTypes());
String result = (String) method.invoke(aClass.newInstance(), invocation.getParams());
IOUtils.write(result,resp.getOutputStream());
} catch (NoSuchMethodException e) {
e.printStackTrace();
}
}
}
传输实体类
package com.rpc.framework.protocol;
import java.io.Serializable;
public class Invocation implements Serializable {
private String interfaceName;
private String methodName;
private Class[] paramTypes;
private Object[] params;
public Invocation(String interfaceName, String methodName, Class[] paramTypes, Object[] params) {
this.interfaceName = interfaceName;
this.methodName = methodName;
this.paramTypes = paramTypes;
this.params = params;
}
public String getInterfaceName() {
return interfaceName;
}
public void setInterfaceName(String interfaceName) {
this.interfaceName = interfaceName;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Class[] getParamTypes() {
return paramTypes;
}
public void setParamTypes(Class[] paramTypes) {
this.paramTypes = paramTypes;
}
public Object[] getParams() {
return params;
}
public void setParams(Object[] params) {
this.params = params;
}
}
3.5.3.2 proxy层
ProxyFactory类
package com.rpc.framework.proxy;
import com.rpc.framework.protocol.Invocation;
import com.rpc.framework.protocol.http.HttpClient;
import java.lang.reflect.Proxy;
public class ProxyFactory {
public static <T> T gerProxy(final Class interfaceClass){
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass},
(proxy, method, args) -> {
Invocation invocation = new Invocation(interfaceClass.getName(),
method.getName(),
method.getParameterTypes(),args);
//发送请求
String result = HttpClient.sendPost("http://localhost:8080", invocation);
return result;
});
}
}
3.5.3.3 register层
LocalRegister类
package com.rpc.framework.register;
import java.util.HashMap;
import java.util.Map;
public class LocalRegister {
private static Map<String, Class> map = new HashMap<>();
public static void register(String interfaceName, Class implClass){
map.put(interfaceName, implClass);
}
public static Class get(String interfaceName){
return map.get(interfaceName);
}
}
3.6 验证
启动Provider类,再启动Consumer类
输出
helloPRC
大功告成!!!