前言
Apache Dubbo™ (incubating)是一款高性能Java RPC框架。在平常业务开发过程中使用的越来越频繁,同时也会遇到更多的问题。这就需要我们更多的了解一下dubbo源码,以便更好的处理问题。
看源码的话就会直面一个棘手的问题:不知道从哪下手,找不到切入点。所以,本文准备就dubbo的启动过程做一下宏观的流程分析,希望对大家有所帮助。
问题引入
用过dubbo的同学都知道,我们只需要在xml文件中配置zk、协议、要暴露的服务等信息,发布jar包、然后启动spring。我们的服务就可以被调用了。如下,我们暴露了HelloService。启动spring就可以被远程调用了:
<dubbo:service interface="com.alibaba.dubbo.demo.hello.HelloService" ref="helloService" timeout="300" ></dubbo:service>
直入主题
那么在spring容器启动的过程中,都做了什么操作才使我们的服务可以暴露出去呢?为什么dubbo是透明化接入应用,对应用没有任何 API 侵入的呢?
1.Spring可扩展Schema的支持
Spring框架从2.0版本开始,提供了基于Schema风格的XML扩展机制,允许开发者扩展最基本的spring配置文件,这样我们就可以编写自定义的xml bean解析器然后集成到Spring IoC容器中。
也就是说利用这个机制就可以把我们在xml文件中配置的东西实例化成对象。
使用这种机制需要通过以下几步:
- 设计配置属性和JavaBean
- 编写XSD文件
- 编写NamespaceHandler和BeanDefinitionParser完成解析工作
- 编写spring.handlers和spring.schemas串联起所有部件
接着我们以dubbo的provider为例开始分析
<dubbo:provider registry="test_zk" version="1.0.0" iothreads="300" retries="0"/>
spring启动过程中会去扫描META-INF/spring.schemas,拿到dubbo的扩展配置,然后根据配置找到META-INF/dubbo.xsd文件
http\://dubbo.apache.org/schema/dubbo/dubbo.xsd=META-INF/dubbo.xsd
至于spring为什么会扫面META-INF/spring.schemas目录呢?答案在PluggableSchemaResolver.java中
public class PluggableSchemaResolver implements EntityResolver {
public static final String DEFAULT_SCHEMA_MAPPINGS_LOCATION = "META-INF/spring.schemas";
private static final Log logger = LogFactory.getLog(PluggableSchemaResolver.class);
private final ClassLoader classLoader;
private final String schemaMappingsLocation;
private volatile Map<String, String> schemaMappings;
public PluggableSchemaResolver(ClassLoader classLoader) {
this.classLoader = classLoader;
this.schemaMappingsLocation = "META-INF/spring.schemas";
}
}
dubbo.xsd文件中定义了我们Bean的标签,和Bean中定义的字段一一对应的;
这一步spring会把dubbo.xsd文件解析成 Dom 树,在解析的自定义标签的时候, spring 会根据标签的命名空间和标签名找到一个解析器。
这个命名空间就是targetNamespace。拿到这个参数去扫面META-INF/spring.handlers。拿到dubbo配置的handler路径
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:tool="http://www.springframework.org/schema/tool"
xmlns="http://dubbo.apache.org/schema/dubbo"
targetNamespace="http://dubbo.apache.org/schema/dubbo">
http\://dubbo.apache.org/schema/dubbo=com.alibaba.dubbo.config.spring.schema.DubboNamespaceHandler
这样就找到了DubboNamespaceHandler,由该解析器来完成对该标签内容的解析,并返回一个 BeanDefinition 。
public class DubboNamespaceHandler extends NamespaceHandlerSupport {
public DubboNamespaceHandler() {
}
public void init() {
this.registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
this.registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
this.registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
this.registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
this.registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
this.registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
this.registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
this.registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
this.registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
this.registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
}
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
在这个过程中就会把dubbo自定义的schema配置初始化成Bean对象,并维护在spring容器中。
(深入了解schema机制 可参考:https://docs.spring.io/spring/docs/4.2.x/spring-framework-reference/html/xsd-configuration.html)
2.spring 事件机制
dubbo使用到的配置信息都已经托管在spring容器中了,服务又是怎么暴露的呢?万事俱别,只欠东风,此时就需要一个触发dubbo服务启动的事件。
public void onApplicationEvent(ApplicationEvent event) {
if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
if (isDelay() && ! isExported() && ! isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
}
}
3.服务暴露
从export()方法开始,才真正进入了dubbo的服务暴露流程,在这个过程中就会涉及到多协议暴露服务、注册zk、暴露本地和远程服务,获取invoker,将invoker转化成exporter等一系列操作。如同官方提供的那样:
接着会到ServiceConfig.export()方法,这里面涉及到dubbo服务延迟暴露的一个点,delay这个参数可以配置在<dubbo:provider/> 或者<dubbo:service/>中,目的是为了延迟注册服务时间(毫秒) ,设为-1时,表示延迟到Spring容器初始化完成时暴露服务。一些特殊的场景,可以通过修改该参数来解决服务刚启动接口响应较慢的案例。
ServiceConfig.doExport()主要是做一些合法性的校验工作:
- application®istry&protocol等有效性检查;
- <dubbo:service>中配置的interface合法性检查:接口不能为空,检查接口类型必需为接口,检查方法是否在接口中存在.(checkInterfaceAndMethods)
- 检查xml配置中interface和ref是否匹配(interfaceClass.isInstance(ref))
- 有效性检查通过后,调用doExportUrls()发布dubbo服务;
在ServiceConfig.doExportUrls()方法,这里会进行多协议暴露服务,由于dubbo不仅支持dubbo协议同时还支持http、webservice、thrift等协议。如果我们配置的service需要同时提供多种服务,那么会根据不同的协议进行循环暴露。
<dubbo:service interface="com.alibaba.dubbo.demo.hello.HelloService" ref="helloService" timeout="300" protocol="dubbo"></dubbo:service>
<dubbo:service interface="com.alibaba.dubbo.demo.hello.HelloService" ref="helloService" timeout="300" protocol="http"></dubbo:service>
在doExportUrlsFor1Protocol中会把所有的相关属性封装到Map中,构造dubbo定义的统一数据模型URL,这个url会贯穿服务暴露和调用的整个流程。
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
接着是根据参数scope判断服务的发布范围。服务的发布范围分为不暴露、本地暴露、远程暴露。
scope的配置规则如下:
- 如果配置scope=none,不发布这个dubbo服务;
- 如果配置scope=local,只本地暴露这个dubbo服务;
- 如果配置remote,只远程暴露这个dubbo服务
- 如果不配置或者不为以上三种,既暴露本地服务,又暴露远程服务;
//配置为none不暴露
if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && registryURLs.size() > 0
&& url.getParameter("register", true)) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
}
}
那么为什么会有本地暴露呢?因为在dubbo中我们一个服务可能既是Provider,又是Consumer,因此就存在他自己调用自己服务的情况,如果再通过网络去访问,就会白白增加一层网络开销。所以本地暴露和远程暴露的区别如下:
- 本地暴露是暴露在JVM中,不需要网络通信.
-
远程暴露是将ip,端口等信息暴露给远程客户端,调用时需要网络通信.
本地暴露服务的时候url是以injvm开头的,而远程服务是以registry开头的,如图示:
上面代码也可以看出来,本地暴露和远程暴露的本质都是是通过把拼装好的url转换成invoker,然后把invoker转换为exporter。
点开getInvoker方法
/**
* create invoker.
*
* @param <T>
* @param proxy
* @param type
* @param url
* @return invoker
*/
@Adaptive({Constants.PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
这里用到了Adaptive,就会生成动态编译的Adaptive类。这个类就是getInvoker方法的具体实现。
拿到invoker之后,调用protocol.export(invoker)把invoker转换成exporter。
/**
* 暴露远程服务:<br>
* 1. 协议在接收请求时,应记录请求来源方地址信息:RpcContext.getContext().setRemoteAddress();<br>
* 2. export()必须是幂等的,也就是暴露同一个URL的Invoker两次,和暴露一次没有区别。<br>
* 3. export()传入的Invoker由框架实现并传入,协议不需要关心。<br>
*
* @param <T> 服务的类型
* @param invoker 服务的执行体
* @return exporter 暴露服务的引用,用于取消暴露
* @throws RpcException 当暴露服务出错时抛出,比如端口已占用
*/
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
到这里就是服务暴露的总流程。
4.netty服务启动
在invoker->exporter转换的过程中又涉及到了dubbo连接池的创建和netty的初始化。
在createServer方法中利用dubbo SPI机制找到NettyTransporter,new NettyServer()->doOpen().最终我们就看到boss 线程,worker 线程,和 ServerBootstrap。
而 Client 在 Spring getBean 的时候,会创建 Client.当调用远程方法的时候,将数据通过 dubbo 协议编码发送到 NettyServer,然后 NettServer 收到数据后解码,并调用本地方法,并返回数据,完成一次RPC 调用。
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
NettyHandler类它继承了netty框架的SimpleChannelHandler类,重写了messageReceived方法。接收到客户端请求的入口就是messageReceived方法
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
handler.received(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
执行了handler的received方法,这个handler其实就是DubboProtocol中的requestHandler,因为在启动netty服务的时候,就将requestHandler对象一直传递到了NettyServer,再通过NettyServer类的构造函数将它保存到了NettyServer类的终极父类AbstractPeer的handler属性上,AbstractPeer类又实现了ChannelHandler接口,重写了received方法。
所以当netty框架接收到请求时执行messageReceived方法里面的handler.received(channel, e.getMessage()); ,其实执行的是AbstractPeer类的received方法,received然后里面又执行了handler.received(ch, msg); 然后received中又调用了reply方法;
在reply方法中,完成了数据的解码,和合法性校验。最终调用本地方法,返回数据,完成一次RPC 调用。
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
//如果是callback 需要处理高版本调用低版本的问题
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1){
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods){
if (inv.getMethodName().equals(method)){
hasMethod = true;
break;
}
}
}
if (!hasMethod){
logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
最后
如图,dubbo的的模型十分易懂,但涉及到的东西确实很多。以上只是对第一步:0.start 做了一个简单的流水账分析。
所以,本文只是想做个引子,更多的细节还需要靠大家去挖掘。剩下的只有去debug the universe了。