一、demo构建
1. demo项目结构如图所示:
包含api,consumer,provider三个module
api如下所示,提供provider需要暴露的接口
package com.alibaba.dubbo.demo;
import java.util.List;
/**
* Created by Xkane on 2018/9/13.
*/
public interface DemoService {
List<String> getPermissions(Long id);
}
2. provider 发布服务配置文件
3. consumer订阅服务配置文件
二、dubbo框架结构
dubbo框架设计总共分了10层:
1. 服务接口层(Service):
该层是与实际业务逻辑相关,就如上面demo配置的
<!--使用 dubbo 协议实现定义好的 api.PermissionService 接口-->
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" protocol="dubbo" />
这个service就是业务方自己定义的接口与其实现。
2.配置层(Config):
该层是将业务方的service信息,配置文件的信息进行收集配置,主要是以ServiceConfig和ReferenceConfig为中心,
ServiceConfig是配置提供方provider提供的配置,当Spring启动的时候会响应的启动provider服务发布和注册的过程,
主要是加入一个ServiceBean继承ServiceConfig在spring注册,同理ReferenceConfig是consumer方的配置,
当消费方consumer启动时,会启动consumer的发布服务订阅服务的过程,当然也是使用一个ReferenceBean继承
ReferenceConfig注册在spring上
3. 服务代理层(Proxy):
对服务接口进行透明代理,生成服务的客户端和服务端,使服务的远程调用就像在本地调用一样,默认使用JavassistProxyFactory,
返回一个Invoker,Invoker则是个可执行核心实体,Invoker的invoke方法通过反射执行service方法。
4. 服务注册层(Registry):
封装服务地址的注册和发现,以服务URL为中心,基于zk。
5. 集群层(Cluster):
封装多个提供者的路由及负载均衡,并桥接注册中心,以Invoker为中心,扩展接口为Cluster、Directory、Router和LoadBalance。
将多个服务提供方组合为一个服务提供方,实现对服务消费方来透明,只需要与一个服务提供方进行交互。
6. 监控层(Monitor):
RPC调用次数和调用时间监控,以Statistics为中心,扩展接口为MonitorFactory、Monitor和MonitorService。
7. 远程调用层(Protocol):
封装RPC调用,provider通过export方法进行暴露服务/consumer通过refer方法调用服务。而Protocol依赖的是Invoker。通过上面说的Proxy获得的Invoker,包装成Exporter。
8.信息交换层(Exchange):
该层封装了请求响应模型,将同步转为异步,信息交换层依赖Exporter,最终将通过网络传输层接收调用请求RequestFuture和ResponseFuture。
9.网络传输层(Transport):
抽象mina和netty为统一接口,以Message为中心,扩展接口为Channel、Transporter、Client、Server和Codec。
10.数据序列化层:
该层无需多言,将数据序列化反序列化。
三、dubbo发布过程
发布过程的一些动作
暴露本地服务
暴露远程服务
启动netty
连接zookeeper
到zookeeper注册
监听zookeeper
流程图:
dubbo发布服务涉及到的相关类。
上图展示了部分服务发布过程中需要使用到的类和接口,其中:
spring适配涉及到的类:DubboNamespaceHandler、DubboBeanDefinitionParser、ServiceBean;
配置信息存储:ServicdConfig、RegistryConfig、MonitorConfig、ProtocolConfig、ProviderConfig等;
应用协议:Protocol、DubboProtocol、HessianProtocol、ThriftProtocol、RmiProtocol、AbstractProxyProtocol、AbstractProtocol等;
Server相关:Exchanger、HeaderExchanger、ExchangeServer、HeaderExchangeServer、Transporters、Transporter、NettyTransporter、NettyServer等;
- 通过demo配置文件查看
<!--定义了提供方应用信息,用于计算依赖关系;在 dubbo-admin 或 dubbo-monitor 会显示这个名字,方便辨识-->
<dubbo:application name="demotest-provider" owner="programmer" organization="dubbox"/>
<!--使用 zookeeper 注册中心暴露服务,注意要先开启 zookeeper-->
<dubbo:registry address="zookeeper://localhost:2181"/>
<!-- 用dubbo协议在20880端口暴露服务 -->
<dubbo:protocol name="dubbo" port="20880" />
<!--使用 dubbo 协议实现定义好的 api.PermissionService 接口-->
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" protocol="dubbo" />
<!--具体实现该接口的 bean-->
<bean id="demoService" class="com.alibaba.dubbo.demo.impl.DemoServiceImpl"/>
是通过dubbo的schema service进行注入的,找到DubboNameSpaceHandler
,Dubbo命名空间处理器,找到<dubbo:service>
标签解析行:
此时,进入DubboNameSpaceHandler
类,初始化配置文件,spring容器通过DubboBeanDefinitionParser
类的parse方法来解xml文件中的标签,生成ServiceConfig等配置对象;
public class DubboNamespaceHandler extends NamespaceHandlerSupport {
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
public void init() {
//<dubbo:application>标签解析行
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
//<dubbo:registry>标签解析行,本例中使用 zookeeper 注册中心暴露服务
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
//<dubbo:protocol>标签解析行,例如,demo中用dubbo协议在20880端口暴露服务
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
//<dubbo:service>标签解析行
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
}
}
从上述代码中可以看到 registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
会把<dubbo:service>
标签会被解析成 ServiceBean
。ServiceBean
实现了 InitializingBean
,在类加载完成之后会调用afterPropertiesSet()
方法。在afterPropertiesSet()
方法中,依次解析以下标签信息:
<dubbo:provider>
<dubbo:application>
<dubbo:module>
<dubbo:registry>
<dubbo:monitor>
<dubbo:protocol>
在 Spring 容器初始化的时候会调用 onApplicationEvent
方法,即spring的事件机制(event),监听Spring 容器初始化
。ServiceBean
重写了 onApplicationEvent
方法,实现了服务暴露的功能。
dubbo暴露服务有两种情况,一种是设置了延迟暴露(比如delay=”5000”),另外一种是没有设置延迟暴露或者延迟设置为-1(delay=”-1”):
1、设置了延迟暴露: dubbo在Spring实例化bean(initializeBean
)的时候会对实现了InitializingBean
的类进行回调,回调方法是afterPropertySet()
,如果设置了延迟暴露,dubbo在这个方法中进行服务的发布。
2、没有设置延迟或者延迟为-1: dubbo会在Spring实例化完bean之后,在刷新容器最后一步发布ContextRefreshEvent
事件的时候,通知实现了ApplicationListener
的类进行回调onApplicationEvent
,dubbo会在这个方法中发布服务。
ServiceConfig.export()
的流程图如下:
<1> export()方法
export()
进行服务发布,是发布核心。由上面代码可知,如果设置了 delay 参数,Dubbo 的处理方式是启动一个守护线程在 sleep 指定时间后再
doExport()
。从export代码中可以看到,export先判断是否需要延迟暴露,如果设置了延迟delay!=null
,则通过一个后台线程Thread.sleep(delay)
延迟调用doExport()
方法;反之如果不存爱delay,直接调用doExport()
方法。
<2> doExport()
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("Already unexported!");
}
if (exported) {
return;
}
exported = true;
if (interfaceName == null || interfaceName.length() == 0) {
throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!");
}
checkDefault();
if (provider != null) {
if (application == null) {
application = provider.getApplication();
}
if (module == null) {
module = provider.getModule();
}
if (registries == null) {
registries = provider.getRegistries();
}
if (monitor == null) {
monitor = provider.getMonitor();
}
if (protocols == null) {
protocols = provider.getProtocols();
}
}
if (module != null) {
if (registries == null) {
registries = module.getRegistries();
}
if (monitor == null) {
monitor = module.getMonitor();
}
}
if (application != null) {
if (registries == null) {
registries = application.getRegistries();
}
if (monitor == null) {
monitor = application.getMonitor();
}
}
if (ref instanceof GenericService) {
interfaceClass = GenericService.class;
generic = true;
} else {
try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
checkInterfaceAndMethods(interfaceClass, methods);
checkRef();
generic = false;
}
if(local !=null){
if(local=="true"){
local=interfaceName+"Local";
}
Class<?> localClass;
try {
localClass = ClassHelper.forNameWithThreadContextClassLoader(local);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
if(!interfaceClass.isAssignableFrom(localClass)){
throw new IllegalStateException("The local implemention class " + localClass.getName() + " not implement interface " + interfaceName);
}
}
if(stub !=null){
if(stub=="true"){
stub=interfaceName+"Stub";
}
Class<?> stubClass;
try {
stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub);
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
if(!interfaceClass.isAssignableFrom(stubClass)){
throw new IllegalStateException("The stub implemention class " + stubClass.getName() + " not implement interface " + interfaceName);
}
}
checkApplication();
checkRegistry();
checkProtocol();
appendProperties(this);
checkStubAndMock(interfaceClass);
if (path == null || path.length() == 0) {
path = interfaceName;
}
doExportUrls();
}
doExport
方法先执行一系列的检查方法,然后调用doExportUrls
方法。检查方法会检测dubbo的配置是否在Spring配置文件中声明,没有的话读取properties文件初始化。
doExportUrls
方法先调用loadRegistries
加载注册信息,组装注册中心url信息,如源码中config.properties中读取链接信息,组装Provider注册链接串; 然后遍历调用doExportUrlsFor1Protocol
方法。 里面有个for循环,代表一个服务可以有多个通信协议,例如tcp、http、dubbo等协议,dubbo支持多种协议,默认使用的是dubbo协议。
debug图中可以看到,
<dubbo:protocol>
的name="dubbo"
为dubbo协议。可以在配置文件中进行配置协议:
<dubbo:protocol name="dubbo" port="20880" />
其他几种协议对比:
1 dubbo协议
连接个数:单连接
连接方式:长连接
传输协议:TCP
传输方式:NIO异步传输
序列化:Hessian二进制序列化
适用范围:传入传出参数数据包较小(建议小于100K),消费者比提供者个数多,单一消费者无法压满提供者,尽量不要用dubbo协议传输大文件或超大字符串。
适用场景:常规远程服务方法调用
注:
1)、dubbo默认采用dubbo协议,dubbo协议采用单一长连接和NIO异步通讯,适合于小数据量大 并发的服务调用,以及服务消费者机器数远大于服务提供者机器数的情况
2)、他不适合传送大数据量的服务,比如传文件,传视频等,除非请求量很低。
3)、为防止被大量连接撑挂,可在服务提供方限制大接收连接数,以实现服务提供方自我保护
<dubbo:protocol name="dubbo" accepts="1000" />
2 rmi协议
Java标准的远程调用协议。
连接个数:多连接
连接方式:短连接
传输协议:TCP
传输方式:同步传输
序列化:Java标准二进制序列化
适用范围:传入传出参数数据包大小混合,消费者与提供者个数差不多,可传文件。
适用场景:常规远程服务方法调用,与原生RMI服务互操作
1)、RMI协议采用JDK标准的java.rmi.*实现,采用阻塞式短连接和JDK标准序列化方式
3 hessian协议
基于Hessian的远程调用协议。
连接个数:多连接
连接方式:短连接
传输协议:HTTP
传输方式:同步传输
序列化: 表单序列化
适用范围:传入传出参数数据包大小混合,提供者比消费者个数多,可用浏览器查看,可用表单或URL传入参数,暂不支持传文件。
适用场景:需同时给应用程序和浏览器JS使用的服务。
约束
1)、参数及返回值需实现Serializable接口
2)、参数及返回值不能自定义实现List, Map, Number, Date, Calendar等接口,只能用JDK自带的实现,因为hessian会做特殊处理,自定义实现类中的属性值都会丢失。
4 http协议
基于http表单的远程调用协议。参见:[HTTP协议使用说明]
连接个数:多连接
连接方式:短连接
传输协议:HTTP
传输方式:同步传输
序列化:表单序列化
适用范围:传入传出参数数据包大小混合,提供者比消费者个数多,可用浏览器查看,可用表单或URL传入参数,暂不支持传文件。
适用场景:需同时给应用程序和浏览器JS使用的服务。
5 webservice协议
基于WebService的远程调用协议。
连接个数:多连接
连接方式:短连接
传输协议:HTTP
传输方式:同步传输
序列化:SOAP文本序列化
适用场景:系统集成,跨语言调用
6 thrift协议
当前 dubbo 支持的 thrift 协议是对 thrift 原生协议的扩展,在原生协议的基础上添加了一些额外的头信息,比如service name,magic number等。使用dubbo thrift协议同样需要使用thrift的idl compiler编译生成相应的java代码,后续版本中会在这方面做一些增强 。
注意:
Thrift不支持null值,不能在协议中传null
7 memcached协议
可以通过脚本或监控中心手工填写表单注册memcached服务的地址。
8 redis协议
可以通过脚本或监控中心手工填写表单注册redis服务的地址。
继续debug代码,进入方法doExportUrlsFor1Protocol
:
doExportUrlsFor1Protocol()
先将Bean属性转换成URL对象,然后根据不同协议将服务已URL形式发布。如果scope
配置为none
则不暴露,如果服务未配置成remote
,则本地暴露exportLocal
,如果未配置成local
,则远程暴露。
方法第一部分:
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//获取是哪种协议,如果未配置,默认为dubbo
String name = protocolConfig.getName();
if (name == null || name.length() == 0) {
name = "dubbo";
}
//获取host
String host = protocolConfig.getHost(); //从配置中获取
if (provider != null && (host == null || host.length() == 0)) {
host = provider.getHost();
}
boolean anyhost = false;
if (NetUtils.isInvalidLocalHost(host)) { //为了确保获得的主机有效,还有相应的验证:
anyhost = true;
try {
host = InetAddress.getLocalHost().getHostAddress();//返回本机的ip地址
} catch (UnknownHostException e) {
logger.warn(e.getMessage(), e);
}
if (NetUtils.isInvalidLocalHost(host)) {
if (registryURLs != null && registryURLs.size() > 0) {
for (URL registryURL : registryURLs) {
try {
Socket socket = new Socket();
try {
//通过获取注册中心地址和端口号,创建socket连接地址
SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort());
socket.connect(addr, 1000);
host = socket.getLocalAddress().getHostAddress();
break;
} finally {
try {
socket.close();
} catch (Throwable e) {}
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
}
if (NetUtils.isInvalidLocalHost(host)) {
host = NetUtils.getLocalHost();//遍历本地网卡,返回第一个合理的IP
}
}
}
//获取各个协义需要暴露的端口, 按照这样的优先级去获取 :
//Protocol的实现类的默认端口 ——> Protocol的配置端口 ——> 随机端口
Integer port = protocolConfig.getPort();
if (provider != null && (port == null || port == 0)) {
port = provider.getPort();
}
final int defaultPort = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();
if (port == null || port == 0) {
port = defaultPort;
}
if (port == null || port <= 0) {
port = getRandomPort(name);
if (port == null || port < 0) {
port = NetUtils.getAvailablePort(defaultPort);
putRandomPort(name, port);
}
logger.warn("Use random available port(" + port + ") for protocol " + name);
}
第二部分:收集各类参数,放入map中,在为服务暴露做参数收集准备工作
// paramsMap,存放所有配置参数,下面生成url用。
Map<String, String> map = new HashMap<String, String>();
if (anyhost) {
map.put(Constants.ANYHOST_KEY, "true");
}
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
appendParameters(map, this);
// method子标签配置规则解析,retry次数,参数等。没有使用过,不做解释
if (methods != null && methods.size() > 0) {//服务接口的方法
for (MethodConfig method : methods) {
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry"; //重试次数
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
List<ArgumentConfig> arguments = method.getArguments();
if (arguments != null && arguments.size() > 0) {
for (ArgumentConfig argument : arguments) {
//类型自动转换.
if(argument.getType() != null && argument.getType().length() >0){
Method[] methods = interfaceClass.getMethods();
//遍历所有方法
if(methods != null && methods.length > 0){
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
//匹配方法名称,获取方法签名.
if(methodName.equals(method.getName())){
Class<?>[] argtypes = methods[i].getParameterTypes();
//一个方法中单个callback
if (argument.getIndex() != -1 ){
if (argtypes[argument.getIndex()].getName().equals(argument.getType())){
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
}else {
throw new IllegalArgumentException("argument config error : the index attribute and type attirbute not match :index :"+argument.getIndex() + ", type:" + argument.getType());
}
} else {
//一个方法中多个callback
for (int j = 0 ;j<argtypes.length ;j++) {
Class<?> argclazz = argtypes[j];
if (argclazz.getName().equals(argument.getType())){
appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j){
throw new IllegalArgumentException("argument config error : the index attribute and type attirbute not match :index :"+argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
}else if(argument.getIndex() != -1){
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
}else {
throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}
}
}
} // end of methods for
}
if (generic) {
map.put("generic", String.valueOf(true));
map.put("methods", Constants.ANY_VALUE);
} else {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if(methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put("methods", Constants.ANY_VALUE);
}
else {
map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
if (! ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put("token", UUID.randomUUID().toString());
} else {
map.put("token", token);
}
}
if ("injvm".equals(protocolConfig.getName())) {
protocolConfig.setRegister(false);
map.put("notify", "false");
}
第三部分:
这段主要是拼接URL,Dubbo框架是以URL为总线的模式,即运行过程中所有的状态数据信息都可以通过URL来获取,比如当前系统采用什么序列化,采用什么通信,采用什么负载均衡等信息,都是通过URL的参数来呈现的,所以在框架运行过程中,运行到某个阶段需要相应的数据,都可以通过对应的Key
从URL的参数列表中获取。
本例url: dubbo://xxx.xxx.xxx.xxx:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demotest-provider&dubbo=2.5.3&interface=com.alibaba.dubbo.demo.DemoService&methods=getPermissions&organization=dubbox&owner=programmer&pid=35270&side=provider×tamp=1539154732792
// 导出服务
String contextPath = protocolConfig.getContextpath();//获取协议的上下文路径
if ((contextPath == null || contextPath.length() == 0) && provider != null) {
contextPath = provider.getContextpath();
}
// 根据参数创建url对象
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
// 如果url使用的协议存在扩展,调用对应的扩展来修改原url。目前扩展有override,absent
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
//根据scope的配置决定是作本地暴露还是远程暴露,
//做服务暴露从结果上看就是产生了一个特定服务的 Exporter 类,
//并将其存储在对应的ServiceBean实例的 exporters属性中。
String scope = url.getParameter(Constants.SCOPE_KEY);
//配置为none不暴露
if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
//配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
//如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露远程服务)
if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
//只有远程暴露才需要用到注册中心url
if (registryURLs != null && registryURLs.size() > 0
&& url.getParameter("register", true)) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
//获取监控中心的URL
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
本地暴露
此处demo,为本地暴露,此时,调用exportLocal(url);
生成exporter
:
private void exportLocal(URL url) {
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL)
.setHost(NetUtils.LOCALHOST)
.setPort(0);
//这里的protocol是Protocol$Adpative的实例(spi机制)
//proxyFactory是ProxyFactory$Adpative实例(spi机制)
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() +" to local registry");
}
}
上述exportLocal
方法参数url,为:
dubbo://xxx.xxx.xxx.xxx:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demotest-provider&dubbo=2.5.3&interface=com.alibaba.dubbo.demo.DemoService&methods=getPermissions&organization=dubbox&owner=programmer&pid=78965&side=provider×tamp=1539163125437
上述方法中URL local
重新构造url,本地暴露的url是以injvm
开头的,host修改为本地,端口更改为0
,这与远程发布时不同的,url更换为一下内容:
injvm://127.0.0.1/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demotest-provider&dubbo=2.5.3&interface=com.alibaba.dubbo.demo.DemoService&methods=getPermissions&organization=dubbox&owner=programmer&pid=78965&side=provider×tamp=1539163125437
这里的proxyFactory.getInvoker
使用的是JavassistProxyFactory.getInvoker
方法,
对服务接口进行透明代理,生成服务的客户端和服务端,使服务的远程调用就像在本地调用一样,默认使用JavassistProxyFactory
,返回一个Invoker
,Invoker
则是个可执行核心实体,Invoker
的invoke
方法通过反射执行service方法。
public class JavassistProxyFactory extends AbstractProxyFactory {
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
//proxy 是服务实现类,type是服务接口
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
////利用Wrapper类通过服务接口生成对应的代理类。
//根据传入的 proxy对象的类信息创建对它的包装对象Wrapper, Wrapper类不能正确处理带$的类名
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
//实现抽象类AbstractProxyInvoker抽象方法doInvoke,
//并调用(proxy, type, url)构造函数实例化匿名类
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
最后返回invoker后,使用protocol.export
进行发布:
Protocol
的实现是RegistryProtocol
类,实现了export
方法
类及方法的概念作用:
1、proxyFactory:
就是为了获取一个接口的代理类,例如获取一个远程接口的代理。
它有2个方法,代表2个作用
a、getInvoker:
针对server端,将服务对象,例如DemoServiceImpl
包装成一个Wrapper对象。
b、getProxy:
针对client端,创建接口的代理对象,例如DemoService
的接口。
2、makeWrapper
:它类似spring的BeanWrapper
,它就是包装了一个接口或一个类,可以通过Wrapper对实例对象进行赋值取值以及指定方法的调用。
3、Invoker:
它是一个可执行的对象,能够根据方法名称、参数得到相应的执行结果。它里面有一个很重要的方法Result invoke(Invocation invocation)
,Invocation
是包含了需要执行的方法和参数的重要信息,目前它只有2个实现类RpcInvocation、 MockInvocation
它有3种类型的Invoker
1、本地执行类的Invoker
2、远程通信类的Invoker
3、多个远程通信执行类的Invoker聚合成集群版的Invoker
4、Protocol:
1)export
暴露远程服务(用于服务端),就是将proxyFactory.getInvoker
创建的代理类invoker
对象,通过协议暴露给外部。
2)refer:
引用远程服务(用于客户端)
5、Exporter:
维护invoker的生命周期
6、exchanger:
信息交换层,封装请求响应模式同步转异步
7、transporter:
网络传输层,用来抽象Netty(dubbo默认)或者Mina的统一接口
暴露本地服务与暴露远程服务的区别
a、暴露本地服务:
指暴露在同一个JVM里面,不用通过zk来进行远程通信,例如在同一个服务里面(tomcat),自己调用自己的接口,就无需进行网络IP连接通信。
b、暴露远程服务:
指暴露给远程客户端的IP和端口号,通过网络来实现通信。
远程暴露
在doExportUrlsFor1Protocol
方法中if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) )
条件,如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露远程服务)
public static final String SCOPE_LOCAL = "local";
未完待续
欢迎关注公众号