dubbo服务暴露就是一个远程代理,打开网络监听,接受服务调用请求,将服务接口名,IP,port发布到注册中心的过程。通过《dubbo启动过程分析》可以了解到,在spring容器启动时会将容器中所有的bean初始化成单实例(默认),如果bean继承相应的接口,在实例初始化完成后,会调用实现类中某些接口方法。dubbo的初始化也是通过这样一个过程完成的。
ServiceConfig.export->doExport()-> doExportUrls()->doExportUrlsFor1Protocol()
//根据url暴露
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();//获取服务提供者协议名称
if (name == null || name.length() == 0) {
name = "dubbo";//默认是dubbo
}
//获取服务主机名,为空则自动查找本机IP
if (NetUtils.isInvalidLocalHost(host)) {
anyhost = true;
try {
host = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
logger.warn(e.getMessage(), e);
}
-----------------------------------------------
}
//SPI加载协议实现类中的默认端口常量
final int defaultPort = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();
if (port == null || port == 0) {//protocol没有配置port
port = defaultPort;
}
if (port == null || port <= 0) {
port = getRandomPort(name);//使用本机随机可用端口
if (port == null || port < 0) {
port = NetUtils.getAvailablePort(defaultPort);
putRandomPort(name, port);
}
logger.warn("Use random available port(" + port + ") for protocol " + name);
}
//组织参数
Map<String, String> map = new HashMap<String, String>();
..............................................................
//形成类似dubbo://的统一URL
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
String scope = url.getParameter(Constants.SCOPE_KEY);//获取是scope属性
//配置为none不暴露
if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
//配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
//如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && registryURLs.size() > 0
&& url.getParameter("register", true)) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
//导入监控中心信息
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
//首先将URL中dubbo替换为registry,创建远程代理Invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
//此处protocol为spi加载的适配类,会根据invoker中的protocol不同
//调用不同的具体实现类,此处在SPI分析中已经说明
//Protocol$Adaptor.export()-->dubbofilterwrapper.export()-->dubbolistenerwrapper.export()-->dubboprotocol.export()
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
} else {//本地服务
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
接下来将创建接口实现类的包装类,也就是服务的包装类,调用过程如下:
proxyfactory$adpative.getInvoker()--> StubProxyFactoryWrapper.getInvoker()->JavassistProxyFactory.getInvoker()
调用Javaassit创建服务包装类,生成的类如下,重点看invokeMethod()这个方法,后面invoker.invoke()会掉到此方法去执行业务逻辑。
package com.alibaba.dubbo.common.bytecode;
import com.alibaba.dubbo.demo.provider.DemoServiceImpl;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
public class Wrapper1
extends Wrapper
implements ClassGenerator.DC
{
...........................................................
public Object invokeMethod(Object paramObject, String paramString, Class[] paramArrayOfClass, Object[] paramArrayOfObject)
throws InvocationTargetException
{
DemoServiceImpl localDemoServiceImpl;
try
{
localDemoServiceImpl = (DemoServiceImpl)paramObject;
}
catch (Throwable localThrowable1)
{
throw new IllegalArgumentException(localThrowable1);
}
try
{
if ((!"sayHello".equals(paramString)) || (paramArrayOfClass.length == 1)) {
return localDemoServiceImpl.sayHello((String)paramArrayOfObject[0]);
}
}
catch (Throwable localThrowable2)
{
throw new InvocationTargetException(localThrowable2);
}
}
}
最后返回AbstractProxyInvoker的实现,此实现类是一个非常重要的类,包含了服务实现类,服务接口,url,还有刚才生成的服务包装类的引用。
然后执行到如下的方法:
Exporter<?> exporter = protocol.export(invoker);
此处调用的过程为protocol$Adaptive.export()-->ProtocolFilterWrapper.export()--->ProtocolListenerWrapper.export(),两个wrapper什么都没有做,直接放行,最终调到RegistryProtocol.export()
参数为刚才封装好的AbstractProxyInvoker实现类,然后:
ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
下面来分析这个方法,这个方法非常重要,就在此方法中调用DubboProtocol完成服务的发布
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker){
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
exporter = new ExporterChangeableWrapper<T>
//此处打开网络监听
((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return (ExporterChangeableWrapper<T>) exporter;
}
在doLocalExport()中,将provider url与AbstractProxyInvoker实现类封装到InvokerDelegete对象中,然后执行protocol.export(invokerDelegete)方法,此处后面会重点分析,此方法返回一个DubboExporter对象,将此对象与AbstractProxyInvoker实现类封装在ExporterChangeableWrapper对象中,并存储在RegistryProtocol这个类的bounds属性,这个属性是一个线程安全的map,以便以后服务调用使用。下面分析调用dubbo协议打开网络监听的过程。依然是SPI机制,经过ProtocolFilterWrapper.export(),完成对Invoker的包装,Invoker中加入了Filter调用链。
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
//SPI机制加载所有Filter扩展
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (filters.size() > 0) {
//加入Filter执行链
for (int i = filters.size() - 1; i >= 0; i --) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
.......................................................
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
.......................................................
};
}
}
return last;
}
然后经过ProtocolListenerWrapper.export()调到DubboProtocol.export(),将结果封装成ListenerExporterWrapper返回。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
...................................................
openServer(url);
return exporter;
}
将前面传递过来的invoker,key,exporterMap封装到DubboExporter中,打开server此处是后续调用netty打开网络监听,最终返回到DubboProtocol处的是HeaderExchangeServer,此对象持有nettyserver,在nettyserver构造方法中,有两个构造参数一个是url,另一个是DecodeHandler,此对象里又封装了HeaderExchangeHandler,又封装了ExchangeHandler。初始化了nettyserver的基本参数如:ip,port,timeout等等。具体调用流程如下:
openServer(url)-->createServer(url)-->Exchangers.bind(url, requestHandler)-->HeaderExchanger.bind()-->NettyTransporter.bind()-->nettyserver.doopen()
在doopen()方法中,就是对netty的初始化操作,设置线程池,绑定decoder、encoder、handler,然后打开端口,进行监听。
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
channel = bootstrap.bind(getBindAddress());
}
打开网络监听后,再次回到RegistryProtocol类中,下面就开始调用注册中心进行服务注册和订阅。首先调用getRegistry(originInvoker),spi机制,初始化时注入的是registryFactory适配类,根据url中注册中心参数获取具体的实现类,此处是ZookeeperRegistryFactory
private Registry getRegistry(final Invoker<?> originInvoker){
URL registryUrl = originInvoker.getUrl();
if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
}
return registryFactory.getRegistry(registryUrl);
}
调用到AbstractRegistryFactory.getRegistry()>ZookeeperRegistryFactory.createRegistry(),最终返回一个 ZookeeperRegistry对象
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
//此处是spi机制注入的适配类
private ZookeeperTransporter zookeeperTransporter;
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
返回到RegistryProtocol.export(),获得到了 ZookeeperRegistry,然后调用registry.register(registedProviderUrl)进行注册,进一步跟踪,调用链路为:FailbackRegistry.register()-->AbstractRegistry.register()-->ZookeeperRegistry.doRegister(),在zk上创建一个临时节点,注册完成
protected void doRegister(URL url) {
try {
//在zk上创建一个临时节点
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
回到RegistryProtocol,注册完成之后,下面开始订阅,需要感知zk node的变化,此处使用的zk的watcher机制,首先初始化一个NotifyListener,后面监听变化调用到此对象中的notify方法。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
调用链路:FailbackRegistry.subscribe()-->AbstractRegistry.subscribe()-->ZookeeperRegistry.doSubscribe(),主要是注册主节点和子节点监听
protected void doSubscribe(final URL url, final NotifyListener listener) {
.......................
List<URL> urls = new ArrayList<URL>();
for (String path : toCategoriesPath(url)) {
//zkListeners根据key存储了主节点和子节点监听
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {//没有 初始化
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
//根据主节点监听获取子节点监听
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {//没有,new一个
listeners.putIfAbsent(listener, new ChildListener() {
public void childChanged(String parentPath, List<String> currentChilds) {
//节点删除,更新会触发notify方法
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
zkClient.create(path, false);//创造一个永久节点??
//添加子节点监听器
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
notify(url, listener, urls);
}
.......................
}
notify(url, listener, urls)调用链比较复杂,链路为:
FailbackRegistry.notify()-->FailbackRegistry.doNotify()-->AbstractRegistry.notify()-->listener.notify(),此处的listener为刚才registryProtocol传递过来的OverrideListener,然后调用OverrideListener.notify()-->RegistryProtocol.doChangeLocalExport(),对修改了url的invoker重新export,至此整个发布过程全部完成。
private <T> void doChangeLocalExport(final Invoker<T> originInvoker, URL newInvokerUrl){
String key = getCacheKey(originInvoker);
final ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null){
logger.warn(new IllegalStateException("error state, exporter should not be null"));
return ;//不存在是异常场景 直接返回
} else {
final Invoker<T> invokerDelegete = new InvokerDelegete<T>(originInvoker, newInvokerUrl);
//调用dubboprotocol.export()重新发布
exporter.setExporter(protocol.export(invokerDelegete));
}
}