简介
dubbo的服务导出主要分为四个部分:前置工作、导出服务、启动服务和连接注册中心。
首先看服务导出的入口方法:ServiceBean 的 onApplicationEvent。
public void onApplicationEvent(ContextRefreshedEvent event) {
if (!isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
}
ServiceBean实现了spring的ApplicationListener接口,重写了onApplicationEvent方法,这个方法会在spring容器初始化完成后被调用。这个方法先判断服务是否已经被导出或者被取消导出,如果都没有,则输出一条日志(相应的在我们项目的输出日志中可以看到),然后调用export导出服务。ServiceBean 是 Dubbo 与 Spring 框架进行整合的关键,具有同样作用的类还有 ReferenceBean。
1.前置工作
前置工作包括检查/补全用户配置和根据配置组装url。
Dubbo 使用 URL 作为配置载体,所有的拓展点都是通过 URL 获取配置。
方法间的调用关系如下。
首先看export方法,该方法调用了其父类的方法ServiceConfig.export
public synchronized void export() {
checkAndUpdateSubConfigs();
if (provider != null) {
if (export == null) {
export = provider.getExport();
}
if (delay == null) {
delay = provider.getDelay();
}
}
if (export != null && !export) {
return;
}
if (delay != null && delay > 0) {
delayExportExecutor.schedule(this::doExport, delay, TimeUnit.MILLISECONDS);
} else {
doExport();
}
}
这是个synchronized方法,首先从provider(这是个ProviderConfig)中获取export和delay的值,譬如配置了<dubbo:provider export="false" />,则获取的export值为false,表示不导出服务。然后对export和delay的值进行判断,满足一定条件则调用doExport方法。doExport继而调用了doExportUrls方法。
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
doExportUrls方法主要做了两件事:
第一,加载注册中心的url列表,dubbo允许服务同时注册在多个注册中心;
第二,遍历协议配置列表,在每个协议下导出服务,dubbo支持使用不同的协议导出服务;
先看loadRegistries方法
protected List<URL> loadRegistries(boolean provider) {
// check && override if necessary
List<URL> registryList = new ArrayList<URL>();
if (CollectionUtils.isNotEmpty(registries)) {
for (RegistryConfig config : registries) {
String address = config.getAddress();
if (StringUtils.isEmpty(address)) {
address = Constants.ANYHOST_VALUE;
}
if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {
Map<String, String> map = new HashMap<String, String>();
appendParameters(map, application);
appendParameters(map, config);
map.put("path", RegistryService.class.getName());
appendRuntimeParameters(map);
if (!map.containsKey("protocol")) {
map.put("protocol", "dubbo");
}
List<URL> urls = UrlUtils.parseURLs(address, map);
for (URL url : urls) {
url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
|| (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
registryList.add(url);
}
}
}
}
}
return registryList;
}
loadRegistries的作用是加载Registry配置,并把配置内容转换成url返回。【再强调一下dubbo是使用url作为配置载体的,因此需要把Registry配置转换并添加到url中】首先遍历Registry配置,获取配置的注册中心地址,若为空则设为"0.0.0.0"。然后构建参数映射集合map,在map中添加Application配置、Registry配置、参数path和protocol(没有则默认为dubbo),之后将map解析为url列表。最后遍历url列表,将url的registry参数设为protocol的值,将protocol参数的值设为"registry"。
然后看doExportUrlsFor1Protocol方法
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();
if (StringUtils.isEmpty(name)) {
name = Constants.DUBBO;
}
Map<String, String> map = new HashMap<String, String>();
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
appendRuntimeParameters(map);
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
appendParameters(map, this);
if (CollectionUtils.isNotEmpty(methods)) {
.......
} // end of methods for
}
if (ProtocolUtils.isGeneric(generic)) {
map.put(Constants.GENERIC_KEY, generic);
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
......
}
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(Constants.TOKEN_KEY, token);
}
}
// export service
........
}
这个方法的作用是使用某个协议导出服务。首先获取协议配置的名称,若为空则默认dubbo。接着构建参数映射集合map,添加side参数为"provider",添加application配置、application配置、provider配置、protocol配置、service配置和方法配置。然后判断是否是泛化调用,是否带有token。之后就是导出服务啦,详见下一章。
2.导出服务
导出服务包括导出服务到本地和导出服务到远程。
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
......
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(Constants.SCOPE_KEY);
// don't export when none is configured
if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
.......
}
}
this.urls.add(url);
}
根据url的scope参数值,导出服务分为3种情况:
- 若scope为none,则不导出服务;
- 若scope不等于remote,则导出本地服务;
- 若scope不等于local,则导出远程服务;
2.1 创建Invoker
无论是导出本地服务还是导出远程服务,很关键的一步是创建Invoker。dubbo在服务提供端,以及服务引用端均会使用 Invoker。
Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起 invoke 调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。
在代码中正是通过proxyFactory.getInvoker()来创建Invoker的。这里的proxyFactory是通过扩展点机制加载的ProxyFactory$Adaptive,在代理类中通过getExtension("javassist")获取接口的真正实现StubProxyFactoryWrapper实例(debug看一下就知道cachedInstances缓存的是StubProxyFactoryWrapper实例)。StubProxyFactoryWrapper的getInvoker方法调用了JavassistProxyFactory的getInvoker,接下来我们就来看这个方法。
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
首先通过Wrapper获取接口对应的包装类,然后新建一个继承AbstractProxyInvoker的匿名对象【这就是创建的Invoker】。该类重写了AbstractProxyInvoker的doInvoke方法,即调用包装类的invokeMethod。接下来的疑问是如何获得包装类,包装类的invokeMethod是什么。
看获取包装类的方法Wrapper.getWrapper(),这个方法先从缓存中获取包装类,没有则调用makeWrapper方法
private static Wrapper makeWrapper(Class<?> c) {
......
StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ ");
StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ ");
StringBuilder c3 = new StringBuilder("public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws " + InvocationTargetException.class.getName() + "{ ");
......
// make class
long id = WRAPPER_CLASS_COUNTER.getAndIncrement();
ClassGenerator cc = ClassGenerator.newInstance(cl);
cc.setClassName((Modifier.isPublic(c.getModifiers()) ? Wrapper.class.getName() : c.getName() + "$sw") + id);
.......
try {
Class<?> wc = cc.toClass();
......
return (Wrapper) wc.newInstance();
......
}
在makeWrapper中构建了包装类的setPropertyValue、getPropertyValue和invokeMethod方法(注意invokeMethod方法,正是AbstractProxyInvoker的doInvoke方法中调用的),然后通过javassist 构建 Class,并通过反射创建对象。
因此,当我们调用创建的Invoker的doInvoke方法时,方法执行的其实是动态生成的包装类的invokeMethod方法。
2.2 导出本地服务
回到ServiceConfig的doExportUrlsFor1Protocol方法,这里调用了exportLocal来导出本地服务,注意exportLocal方法的入参是url(即在前置工作中组装的url)。
private void exportLocal(URL url) {
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL)
.setHost(LOCALHOST_VALUE)
.setPort(0);
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
}
}
先判断是否需要导出本地服务:
- 若url的protocol参数是"injvm",说明已经导出本地服务了,无需再次导出。
- 若url的protocol参数不是"injvm"(url的protocol参数可以通过<dubbo:protocol>标签配置或者默认为dubbo),则组装一个名为local的URL。
然后创建Invoker,通过protocol的export方法导出服务。这里的protocol是Protocol$Adaptive,在代理类中通过getExtension("injvm")获得接口的真正实现ProtocolFilterWrapper(通过debug找到的)。这个包装类是ProtocolFilterWrapper(ProtocolListenerWrapper(InjvmProtocol)),包装类的export方法会生成Filter链,然后执行InjvmProtocol的export。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
这个方法其实就是新建了一个InjvmExporter实例。
2.3 导出远程服务
导出远程服务的方法调用关系如下:
再次回到doExportUrlsFor1Protocol方法
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
.....
// export to remote if the config is not local (export to local only when config is local)
if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(Constants.PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
.......
}
导出远程服务分为两种情况:
- registryURLs非空,则给registryURL添加参数,再导出服务;
- registryURLs为空,则直接导出服务;
在doExportUrlsFor1Protocol中通过protocol的export方法将Invoker对象导出为Exporter对象,这里的protocol也是通过扩展点机制获得的Protocol$Adaptive,经过debug可以看到真正的实现类是ProtocolFilterWrapper(ProtocolListenerWrapper(RegistryProtocol))。【为什么是RegistryProtocol?还记得在Adaptive类中是根据Invoker的url的protocol参数获取扩展点实现类的,回忆在loadRegistries方法中我们已经把url的protocol参数的值改为"registry",因此通过getExtension("registry")获得了RegistryProtocol】于是我们来看RegistryProtocol的export方法。
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
URL registryUrl = getRegistryUrl(originInvoker);
// url to export locally
URL providerUrl = getProviderUrl(originInvoker);
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
//to judge if we need to delay publish
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
这个方法主要做了三件事:
(1)导出服务doLocalExport;
(2)注册服务;
(3)订阅服务;
最后返回一个DestroyableExporter实例。注册服务会在第4部分分析,这边先看doLocalExport方法。
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegate<T>(originInvoker, providerUrl);
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
这个方法会根据Invoker生成key,随后根据key从缓存中获取Exporter,为空则同步双重检查,新建ExporterChangeableWrapper实例。关注protocol的export方法(debug可知这里的url的protocol为dubbo),因此分析DubboProtocol的export。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
optimizeSerialization(url);
return exporter;
}
export新建了一个DubboExporter导出服务,然后调用openServer方法启动服务。
3.启动服务
启动服务模块的方法调用关系如下
首先看DubboProtocol的openServer
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
根据url的address在缓存中获取server:
- server为空,调用createServer创建服务器;
- server不为空,则根据 url 中的配置重置服务器。单台机器的单个端口只允许启动一个服务器实例,因此需要重置服务器的一些配置。
createServer方法创建server是通过调用Exchangers.bind实现的。Exchangers.bind是个静态方法,它会根据url获取Exchanger,然后调用该Exchanger的bind方法,这里调用的是HeaderExchanger的bind。
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
这里仅需关心Transporters.bind。Transporters.bind也是个静态方法,该方法通过扩展机制获取Transporter接口的实现,然后调用其bind方法。以netty为例,调用NettyTransporter的bind。
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
方法返回一个NettyServer实例。看NettyServer的构造器,调用了其父类AbstractServer的构造器。
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = Constants.ANYHOST_VALUE;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
try {
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
//fixme replace this with better method
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
构造器中的doOpen方法是个抽象方法,运行时调用的是NettyServer的doOpen。
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
doOpen创建了boss、worker线程池和ServerBootstrap,设置了PipelineFactory,并且绑定到指定的ip和端口上。至此,NettyServer 创建完成。
4.连接注册中心
虽然dubbo不是必须连接注册中心,可以通过服务直连的方式,但是服务直连不利于服务治理,建议还是使用注册中心。连接注册中心的方法调用关系如图。
回到RegistryProtocol的export方法,前面我们提到这个方法做的第二件事就是注册服务,于是看到注册服务的方法register。
public void register(URL registryUrl, URL registeredProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl);
}
register方法分为两步:
(1)获取Registry实例;
(2)调用Registry的register方法注册服务;
先看获取Registry实例,通过registryFactory的getRegistry获得注册中心,这里实际调用的是抽象类AbstractRegistryFactory的getRegistry方法。这个方法会从缓存中获取Registry实例,如果没有则会双重检查后创建,即调用createRegistry。这是个抽象方法,这边以ZookeeperRegistryFactory的createRegistry为例。
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
createRegistry返回了一个ZookeeperRegistry实例,即创建的注册中心。接着来看ZookeeperRegistry的构造器。
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
zkClient = zookeeperTransporter.connect(url);
zkClient.addStateListener(state -> {
if (state == StateListener.RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
});
}
这边重点关注zookeeperTransporter的connect方法,用于初始化zkClient。zk客户端搞定后,创建注册中心的工作完成。看一下zookeeperTransporter.connect(url),实际调用的是AbstractZookeeperTransporter的connect方法。
public ZookeeperClient connect(URL url) {
ZookeeperClient zookeeperClient;
List<String> addressList = getURLBackupAddress(url);
// The field define the zookeeper server , including protocol, host, port, username, password
if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
logger.info("find valid zookeeper client from the cache for address: " + url);
return zookeeperClient;
}
// avoid creating too many connections, so add lock
synchronized (zookeeperClientMap) {
if ((zookeeperClient = fetchAndUpdateZookeeperClientCache(addressList)) != null && zookeeperClient.isConnected()) {
logger.info("find valid zookeeper client from the cache for address: " + url);
return zookeeperClient;
}
zookeeperClient = createZookeeperClient(toClientURL(url));
logger.info("No valid zookeeper client found from cache, therefore create a new client for url. " + url);
writeToClientMap(addressList, zookeeperClient);
}
return zookeeperClient;
}
方法中调用抽象方法createZookeeperClient创建zk客户端,这边假设调用的是默认实现类CuratorZookeeperTransporter的createZookeeperClient方法。
public ZookeeperClient createZookeeperClient(URL url) {
return new CuratorZookeeperClient(url);
}
该方法创建了一个CuratorZookeeperClient实例并返回。
至此,获取注册中心工作完成,接下来看注册服务部分。首先理解对于Zookeeper,注册服务所做的工作就是将服务配置数据写入到 Zookeeper 的某个路径的节点下。
在获取Registry实例部分我们得到的注册中心是ZookeeperRegistry,而这个类的register方法由其父类FailbackRegistry实现,看下这个方法。
public void register(URL url) {
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// Sending a registration request to the server side
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
addFailedRegistered(url);
}
}
在这个方法中调用doRegister方法真正的去注册服务。若注册失败,则会将注册失败的url存储起来,目的是用于定期重试,可见这里包含了失败重试机制。关注doRegister,这又是个抽象方法,调用ZookeeperRegistry的doRegister方法。
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
这边调用了zk客户端的create方法,实际调用的是AbstractZookeeperClient类的create,用于创建服务节点,其节点路径由 toUrlPath 方法生成。
总结
以上就是服务导出的内容,首先做一些前置工作,包括检查/补全用户配置和组装url。然后导出服务,这边又分为导出本地服务和导出远程服务。导出本地服务相对简单,导出远程服务还包括启动服务器和连接注册中心。