文档目的:
公司目前使用的dubbo版本是2.6.2,看完dubbo官方文档中的一些功能,所以就想知道dubbo调用过程大概是怎么样的。
看源码前提:
一定要先看DUBBO SPI的相关知识:
内容:
一上来必须是看看官方的这三张图
服务引用过程:
取名:图1
调用过程的两张:
取名:图2
取名:图3
图2中红色部分为dubbo服务调用过程。
下面将分析各层的实现和功能点
Interface:为开发定义的接口,使用@Reference引用服务
Proxy:dubbo为我们生成的代理类,应该是在引用过程创建的,这个之后再分析具体实现-ReferenceConfig.get()->ReferenceConfig.createProxy()->ProxyFactory.getProxy(Invoker)
Invoker:属于Cluster层,从图中得到的信息时,先从Register层获得注册了的服务,然后通过路由规则过滤(Router),之后再通过LoadBalance算法得到实际调用服务。具体逻辑定位:com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker。dubbo在外层还是用了com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper来做包装,支持服务降级功能(dubbo中SPI相关的功能XxxWrapper)。看默认的cluster是failover(失败重试其他,默认重试次数:2,失败之后重试两次,参见:com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker
相关功能使用说明:
- 路由规则:https://dubbo.apache.org/zh/docs/v2.7/user/examples/routing-rule/
- 负载均衡算法:https://www.cnblogs.com/wyq178/p/9822731.html
- 服务降级:https://dubbo.apache.org/zh/docs/v2.7/user/examples/service-downgrade/
- Filter:Protocol层。定义暴露服务和引用服务的逻辑。dubbo定义了三个Protocol的包装类,分别是:
- com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper : protocol类型非registry时,使用责任链模式,将定义的com.alibaba.dubbo.rpc.Filter集成到Invoker中
- com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper:protocol类型非registry时,在暴露服务和引用服务时,将ExporterListener和InvokerListener嵌入到ListenerExporterWrapper和ListenerInvokerWrapper以实现对服务暴露,服务撤销和服务引用,引用销毁的事件监听。
- com.alibaba.dubbo.qos.protocol.QosProtocolWrapper:protocol类型为registry时,开启qos服务。详情参见:https://blog.csdn.net/yuanshangshenghuo/article/details/107563319
我们目前关注的是ProtocolFilterWrappe中的逻辑,定义的Filter已经被放入Invoker的执行链中。
- Invoker: 取决于具体配置的dubbo的protocol,图中使用的protocol是dubbo,对应的Invoker是com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker.
我之前使用过injvm的protocol,其暴露服务直接维护在内存map中,引用的时候直接从map中取。他在filter层之下,说明使用injvm协议也会走Filter逻辑。
- ExchangeClient:封装请求响应模式,同步转异步
其下之后都是数据传输相关,看可以参见各层功能及可扩展点
这里分析将protocol使用dubbo时的请求和返回的处理流程:
1. 部分初始化逻辑
DubboProtocol在引用服务的时候,会初始化ExchangeClient,
1.1 当url中没有指定:连接数-Constants.CONNECTIONS_KEY或该值为0时,则该应用对提供服务的应用(服务ip+port)会使用同一个ExchangeClient,参见:DubboProtocol.getClients。这种情况会在外层包装一层ReferenceCountExchangeClient,用与计数该ExchangeClient被多少Invoker使用,在销毁的时候来判断最后一个引用执行destory的时候才真正close掉ExchangeClient。
1.2 其中调用Exchanges.connect时,如果url中没有Constants.CODEC_KEY时,则指定Constants.CODEC_KEY=exchange。但是因为DubboProtocol.initClient时已经指定了Constants.CODEC_KEY=dubbo,所以最终的加解密方式为dubbo,对应DubboCodec。这里的说明是为了之后请求和返回值的加解密逻辑进行说明,不然无法确定加解密逻辑。参见:DubboProtocol.initClient。
1.3 创建ExchangerCliient逻辑:DubboProtocol.initClient中Exchanges.connect中有确定Exchanger的逻辑,默认使用的是header,对应HeaderExchangeClient。handler参数是DubboProtocol.requestHandler, 包装成HeaderExchangeHandler后又被包装成DecodeHandler(类型为ChannelHandler)。后续在Transporter.connect创建client时,又被自适应的AllDispatcher.dispatch方法包装成AllChannelHandler->HeartbeatHandler->MultiMessageHandler,参见:new NettyClient()。
1.3.1 在创建AllChannelHandler时,其父类WrappedChannelHandler会使用自适应的ThreadPool来创建线程池,然后放进自适应的DataStore中(componentName=consumer,key=url.getPort()),该线程池在之后的new AbstractClient()时取出赋值给AbstractClient.executor后从DataStore移出。线程名称在new NettyClinet()中的wrapChannelHandler中执行了线程名为:DubboClientHandler-ip:port-xxxx,线程池类型默认值被设置为cached。
1.3.2 创建HeaderExchangeClient时,会创建维持心跳的定时任务放在HeaderExchangeClient.scheduled的静态线程池中执行。线程名:dubbo-remoting-client-heartbeat-xxxx(核心线程数=2)
1.4 创建Client逻辑:HeaderExchanger.connect中会创建client,先选择自适应的Transporter,默认的是com.alibaba.dubbo.remoting.transport.netty.NettyTransporter。NettyTransporter中创建的是NettyClient。
2. DubboInvoker
经过Cluster层和Filter的处理逻辑之后(ClusterInvoker->filter->Invoker),请求来到了DubboInvoker。AbstractInvoker.invoke处理了一些通用逻辑之后,交由DubboInvoker.doInvoke. 其中主要是处理三种调用方式:return=false,async=true以及正常的调用。
3. HeaderExchangeClient
接下来将从正常调用方式来分析,从1.3可知,默认使用的是HeaderExchangeClient。其成员变量channel使用的是HeaderExchangeChannel,HeaderExchangeChannel中的channel是NettyClient。
HeaderExchangeClient.request
HeaderExchangeChannel.request:封装Request,构建DefaultFuture返回。同步转异步。DefaultFuture起了一个守护线程来处理超时的future。详见:DefaultFuture
4. NettyClient
. 1.4中说过使用的client是NettyClient,现在看看NettyClient的创建过程:
new NettyClinet()的时候会执行父类AbstractClient以及父类的父类AbstractEndpoint,基本的逻辑是:
4.1. 确定加解密codec的方式(参见:AbstractEndpoint.getChannelCodec),1.2中说过了codec使用的是DubboCodec。
4.2. 断线重连的线程池DubboClientReconnectTimer-xxxx 静态的线程池(核心线程数为2,定时检查channel是否连接正常,不正常时重新连接创建新的channel),所有继承了AbstractClient的client共享(参见:AbstractClient.initConnectStatusCheckCommand),
4.3. doOpen中主要是初始化ClientBootstrap,学过netty的都基本能看懂,设置一些连接参数及pipeline中加入ChannelHandler,分别是读取时的解密的decoder,写入时的加密的encoder,数据写入和处理都会处理的handler-NettyHandler。
4.4. doConnect中与url对应的服务端建立channel,赋值给NettyClient.channel,细节:NettyClient.channel被定义成volatile,因为连接断开后重连会重新赋值。
4.5 此处还需要关注channelFactory的创建,具体参数理解参照:https://jianshu.com/p/eec2677651c0 中Netty3.x的线程模型。
5. 加密和解密
下面就再看看DubboCodec的加密和解密的逻辑:
5.1 加密:详细说明可以参见:https://www.jianshu.com/p/99a0bc93eeb6
简单描述就是:
5.1.1 先确定序列化的方式,还是通过Dubbo SPI的自适应,找到对应的Serialization,此处默认的是hessian2。
5.1.2 发送的数据结构是header+body的形式,header中会保存body的长度,序列化方式,及其他信息。
header信息,长度为16的byte数组,
头两个字节存魔数
第三个字节:存是否是Request,是否是Event,是否是FLAG_TWOWAY(需要等待返回值),以及序列化的方式serialization.getContentTypeId (后五位,也就是能支持32中序列化方式)
第四个字节:从解密代码来看,是用来返回时存放状态码的。
从第五个字节开始,用8个字节存request的id(Long)
从第13个字节开始,用四个字节将body的长度(Integer)
将request对象序列化后写入ChannelBuffer,还有个校验是检查body长度不能超过8M
Body:其中就包含客户端jar包版本号,调用的服务名,调用服务的version,调用的方法名,参数类型,参数值,attachments。参见下图代码:参见:DubboCodec.encodeRequestData
5.2 解密:
解密的逻辑基本就是按加密的结构来解析就行了。
[图片上传中...(image-b73439-1629862540429-0)]
从header中解析出序列化的方式,request的id,以及状态码。
根据header中的序列化方式解析出返回值,就是反序列化过程。
其中重要的一点就是,真正的反序列化的调用并不是在netty的worker线程,而是在DubboCodec.decodeBody的时候,被包装成DecodeableRpcResult返回了,并在7中的线程池中执行DecodeHandler.received(Channel, message)时,才真正的执行返回值的反序列化。
6. HeaderExchangeClient到加密逻辑
下面要整理出3中HeaderExchangeChannel.request是如何走到5.1的加密逻辑的:
3中说明了HeaderExchangeChannel中的channel是NettyClient,NettyClient.send(Request)->AbstractClient(NettyClient).send(Request, sent)->NettyChannel.write(Message),至此就会走到NettyClient初始化时设置的channelPipeline了,先执行NettyHandler.writeRequested
NettyHandler.writeRequested
->OneToOneEncoder(NettyCodecAdapter).handleDownstream
->OneToOneEncoder(NettyCodecAdapter).doEncode
->NettyCodecAdapter$InternalEncoder.encode 之后请求就发出去了。
之后又处理dubbo自己的逻辑:看完好像是发送请求之后,回写一些数据
调用链:
NettyHandler.writeRequested
->AbstractPeer(NettyClient).sent(Channel, msg)
->AbstractChannelHandlerDelegate(MultiMessageHandler).sent(Channel, message)
->HeartBeatHandler.sent(Channel, message)
->WrappedChannelHandler(AllChannelHandler).sent(Channel, message)
->AbstractChannelHandlerDelegate(DecodeHandler).sent(Channel, message)
->HeaderExchangeHandler.sent(Channel, message)
->DubboProtocol.requestHandler.sent:空方法
HeartBeatHandler中增加了更新NettyChannel中的发送数据的时间戳逻辑
HeaderExchangeHandler中调用DefaultFuture.sent方法更HeaderExchangeChannel.request时创建的DefaultFuture中的发送请求时间。
7. 数据解密之后如何将结果返回给调用线程:
可以debug,也可以通过之前梳理的创建过程来查看调用链。
NettyHandler.messageReceived
->AbstractPeer(NettyClient).received(Channel, msg)
->MultiMessageHandler.received(Channel, msg)
->HeartBeatHandler.received(Channel, msg)
->AllChannelHandler.received(Channel, msg){用1.3.1中创建的线程池执行ChannelEventRunnable任务}
->之后的流程只在线程池中执行的:
DecodeHandler.received(Channel, message) :反序列化RpcResult
->HeaderExchangeHandler.received(Channel, msg)
->DefaultFuture.received(Channel, Response)
->DefaultFuture.doReceived(Response){设置结果,唤醒调用线程,如果有callback,执行callback逻辑}
->之后是调用线程逻辑:(Invoker->filter->ClusterInvoker)
DubboInvoker.doInvoke()
->Filter逻辑
->FailoverClusterInvoker.doInvoke(){处理重试逻辑}
->MockClusterInvoker.invoke(){之前说过的mock逻辑}
HeaderExchangeHandler.received:
HeaderExchangeHandler.handleResponse:
DefaultFuture.received: 可以看出Request和Response是通过Request.id来对应起来的,生成规则参见:com.alibaba.dubbo.remoting.exchange.Request.newId()
8. 部分Filter逻辑:
具体Cosumer端配置了哪些Filter,可以看platform-util的jar中META-INFO/dubbo/com.alibaba.dubbo.rpc.Filter和 dubbo的jar中META-INFO/dubbo/internal/com.alibaba.dubbo.rpc.Filter中的配置
ConsumerRecordFilter
ResponseConsumerFilter (cat打点:Consumer.code和Consumer.success)
DubboAppContextFilter (dubboApplication参数放入attachment)
SentinelDubboConsumerFilter(sentinel 限流)
MonitorFilter: 如果url中带了monitor参数,则上报调用和返回值信息
FutureFilter: 支持下面三种事件的触发, url中指定的onthrow.method,onreturn.method,oninvoke.method ,具体使用待确定
CatTransaction:PigeonCall的cat数据上报,接口调用信息
ConsumerContextFilter:调用前初始化RpcContext中部分信息,调用后清理掉RpcContext中的attachments
com.yupaopao.platform.util.dubbo.filter.ExceptionFilter:对返回值类型是Response的调用,统一处理异常
参考知识:
XxxWrapper装饰类在dubbo中的实现方式:https://www.cnblogs.com/killbug/p/7341968.html
-
各层说明:(来自官网)
config 配置层:对外配置接口,以 ServiceConfig, ReferenceConfig 为中心,可以直接初始化配置类,也可以通过 spring 解析配置生成配置类
proxy 服务代理层:服务接口透明代理,生成服务的客户端 Stub 和服务器端 Skeleton, 以 ServiceProxy 为中心,扩展接口为 ProxyFactory
registry 注册中心层:封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactory, Registry, RegistryService
cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口为 Cluster, Directory, Router, LoadBalance
monitor 监控层:RPC 调用次数和调用时间监控,以 Statistics 为中心,扩展接口为 MonitorFactory, Monitor, MonitorService
protocol 远程调用层:封装 RPC 调用,以 Invocation, Result 为中心,扩展接口为 Protocol, Invoker, Exporter
exchange 信息交换层:封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
transport 网络传输层:抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec
serialize 数据序列化层:可复用的一些工具,扩展接口为 Serialization, ObjectInput, ObjectOutput, ThreadPool
-
2.6.2 Dubbo线程模型:
参见: