前提
Dubbo的SPI是从JDK的SPI 扩展加强而来的。
- JDK中SPI 机制如下(可以查看JDBC的实现)
需要在 classpath 下创建一个目录,该目录命名必须是:META-INF/service
-
在该目录下创建一个 properties 文件,该文件需要满足以下几个条件 :
文件名必须是扩展的接口的全路径名称
文件内部描述的是该扩展接口的所有实现类
文件的编码格式是 UTF-8
通过 java.util.ServiceLoader 的加载机制来发现
- Dubbo 改进了JDK标准的SPI以下问题:
- JDK 标准的 SPI 会一次性实例化扩展点所有实现,如果有扩展实现初始化很耗时,但如果没用上也加载,会很浪费资源。
- 如果扩展点加载失败,连扩展点的名称都拿不到了。比如:JDK 标准的 ScriptEngine,通过
getName()
获取脚本类型的名称,但如果 RubyScriptEngine 因为所依赖的 jruby.jar 不存在,导致 RubyScriptEngine 类加载失败,这个失败原因被吃掉了,和 ruby 对应不起来,当用户执行 ruby 脚本时,会报不支持 ruby,而不是真正失败的原因 - 增加了对扩展点 IoC 和 AOP 的支持,一个扩展点可以直接 setter 注入其它扩展点
Dubbo SPI实现
官网约定描述
在扩展类的 jar 包内 1,放置扩展点配置文件
META-INF/dubbo/接口全限定名
,内容为:配置名=扩展实现类全限定名
,多个实现类用换行符分隔
示例
以扩展 Dubbo 的协议为例,接口声明如下 ,需要定义成可扩展的接口,要增加 @SPI
@SPI("dubbo")
public interface Protocol {
int getDefaultPort();
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
void destroy();
default List<ProtocolServer> getServers() {
return Collections.emptyList();
}
}
在协议的实现 jar 包内放置文本文件:META-INF/dubbo/org.apache.dubbo.rpc.Protocol
,内容为:
dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
实现类内容
package org.apache.dubbo.rpc.protocol.dubbo;
import org.apache.dubbo.rpc.Protocol;
public class DubboProtocol extends AbstractProtocol { {
// ...
}
在Dubbo中可以通过三种方式来获取对应的类
// 获取自适应扩展点
ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
// 获取指定名称的扩展点
ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("dubbo");
// 获取激活扩展点
ExtensionLoader.getExtensionLoader(Protocol.class).getActivateExtension(url,"dubbo");
原理
-
获取自适应扩展点
-
什么是自适应扩展点
因为Dubbo每个接口可能都有很多的实现,那在运行的过程中是如何决定选择哪一个实现的方法呢?所以自适应就是为了解决这个问题
首先来看下
@Adaptive
注解,查看接口Protocol
的声明- 标注在类上,表明该类为自定义的适配类
- 标注在方法上,表明需要动态的为该方法创建适配类(会动态生成一个适配类)
-
@Adpative
注解可以指定多个值,如果指定的值在URL中没有找到,则以@SPI中指定的值作为默认的扩展进行返回;如对应的值是 {"key1","key2"},如果key1在URL中有对应的值,则使用key1的值作为扩展名称,如果key1找不到,则使用key2的值进行扩展,如果key2也没有值或是找不到,则使用默认@SPI中指定的值进行加载,否则抛错- 如果没有指定值,则会按接口的名称进行按英文字母进行拼接,用. 进行拼接,如 org.apache.dubbo.xxx.YyyInvokerWrapper,则生成的名称为 yyy.invoker.wrapper
@Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE, ElementType.METHOD}) public @interface Adaptive { /** * Decide which target extension to be injected. The name of the target extension is decided by the parameter passed * in the URL, and the parameter names are given by this method. * <p> * If the specified parameters are not found from {@link URL}, then the default extension will be used for * dependency injection (specified in its interface's {@link SPI}). * <p> * For example, given <code>String[] {"key1", "key2"}</code>: * <ol> * <li>find parameter 'key1' in URL, use its value as the extension's name</li> * <li>try 'key2' for extension's name if 'key1' is not found (or its value is empty) in URL</li> * <li>use default extension if 'key2' doesn't exist either</li> * <li>otherwise, throw {@link IllegalStateException}</li> * </ol> * If the parameter names are empty, then a default parameter name is generated from interface's * class name with the rule: divide classname from capital char into several parts, and separate the parts with * dot '.', for example, for {@code org.apache.dubbo.xxx.YyyInvokerWrapper}, the generated name is * <code>String[] {"yyy.invoker.wrapper"}</code>. * * @return parameter names in URL */ String[] value() default {}; }
-
调用如下方法,获取自适应扩展点
ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
public T getAdaptiveExtension() {
//确认是否有缓存
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if (createAdaptiveInstanceError != null) {
throw new IllegalStateException("Failed to create adaptive instance: " +
createAdaptiveInstanceError.toString(),
createAdaptiveInstanceError);
}
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
// 没有就创建一个适配类
instance = createAdaptiveExtension();
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
}
}
}
}
return (T) instance;
}
private T createAdaptiveExtension() {
try {
// 获取适配类并注入依赖
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}
private Class<?> getAdaptiveExtensionClass() {
// 从jar包中获取相应的SPI实现的类,这块会在下面进行分析
getExtensionClasses();
// 如果cachedAdaptiveClass不为空,则这个类是@Adaptive 直接标注的类作为扩展实现类
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
以下来看下 getExtensionClasses
的实现
// 获取扩展实现类
private Map<String, Class<?>> getExtensionClasses() {
Map<String, Class<?>> classes = cachedClasses.get();
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
// 缓存中如果没有,从jar包中加载
classes = loadExtensionClasses();
cachedClasses.set(classes);
}
}
}
return classes;
}
private Map<String, Class<?>> loadExtensionClasses() {
// 对应的需要适配的接口,缓存默认的扩展点名称,是 @SPI指定的值
cacheDefaultExtensionName();
Map<String, Class<?>> extensionClasses = new HashMap<>();
// 从加载策略中获取,主要有三种
// DubboInternalLoadingStrategy从META-INF/dubbo/internal/目录下搜索
// ServicesLoadingStrategy从META-INF/services/目录搜索
// DubboLoadingStrategy从META-INF/dubbo/目录搜索
for (LoadingStrategy strategy : strategies) {
loadDirectory(extensionClasses, strategy.directory(), type.getName(), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
loadDirectory(extensionClasses, strategy.directory(), type.getName().replace("org.apache", "com.alibaba"), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
}
return extensionClasses;
}
private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir, String type,
boolean extensionLoaderClassLoaderFirst, boolean overridden, String... excludedPackages) {
String fileName = dir + type;
try {
Enumeration<java.net.URL> urls = null;
ClassLoader classLoader = findClassLoader();
// try to load from ExtensionLoader's ClassLoader first
if (extensionLoaderClassLoaderFirst) {
ClassLoader extensionLoaderClassLoader = ExtensionLoader.class.getClassLoader();
if (ClassLoader.getSystemClassLoader() != extensionLoaderClassLoader) {
urls = extensionLoaderClassLoader.getResources(fileName);
}
}
if (urls == null || !urls.hasMoreElements()) {
if (classLoader != null) {
urls = classLoader.getResources(fileName);
} else {
urls = ClassLoader.getSystemResources(fileName);
}
}
if (urls != null) {
while (urls.hasMoreElements()) {
java.net.URL resourceURL = urls.nextElement();
// 获取到所有包含这些目录里的相关文件,并缓存对应的Class信息
loadResource(extensionClasses, classLoader, resourceURL, overridden, excludedPackages);
}
}
} catch (Throwable t) {
logger.error("Exception occurred when loading extension class (interface: " +
type + ", description file: " + fileName + ").", t);
}
}
private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader,
java.net.URL resourceURL, boolean overridden, String... excludedPackages) {
try {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
final int ci = line.indexOf('#');
if (ci >= 0) {
line = line.substring(0, ci);
}
line = line.trim();
if (line.length() > 0) {
try {
String name = null;
int i = line.indexOf('=');
if (i > 0) {
name = line.substring(0, i).trim();
line = line.substring(i + 1).trim();
}
if (line.length() > 0 && !isExcluded(line, excludedPackages)) {
// 加载类信息
loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader), name, overridden);
}
} catch (Throwable t) {
IllegalStateException e = new IllegalStateException("Failed to load extension class (interface: " + type + ", class line: " + line + ") in " + resourceURL + ", cause: " + t.getMessage(), t);
exceptions.put(line, e);
}
}
}
}
} catch (Throwable t) {
logger.error("Exception occurred when loading extension class (interface: " +
type + ", class file: " + resourceURL + ") in " + resourceURL, t);
}
}
private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name,
boolean overridden) throws NoSuchMethodException {
if (!type.isAssignableFrom(clazz)) {
throw new IllegalStateException("Error occurred when loading extension class (interface: " +
type + ", class line: " + clazz.getName() + "), class "
+ clazz.getName() + " is not subtype of interface.");
}
// 判断当前类是否有指定 @Adaptive注解,如果有,则缓存Adaptive类
if (clazz.isAnnotationPresent(Adaptive.class)) {
cacheAdaptiveClass(clazz, overridden);
// 如果是当前类的包装类(Wrapper),有以当前接口为唯一参数的构造函数
// 在获取扩展类时,会使用扩展Wrapper类进行封装当前扩展类
// 也可以使用 @Wrapper 注解来标识,具体的类需要使用哪个 Wrapper类
} else if (isWrapperClass(clazz)) {
cacheWrapperClass(clazz);
} else {
clazz.getConstructor();
if (StringUtils.isEmpty(name)) {
name = findAnnotationName(clazz);
if (name.length() == 0) {
throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
}
}
String[] names = NAME_SEPARATOR.split(name);
if (ArrayUtils.isNotEmpty(names)) {
cacheActivateClass(clazz, names[0]);
for (String n : names) {
//缓存名称
cacheName(clazz, n);
// 加入到extensionClasses中进行缓存
saveInExtensionClass(extensionClasses, clazz, n, overridden);
}
}
}
}
到此,``getExtensionClasses`结束 ,相关的Class信息已经加载完成
再来看 createAdaptiveExtensionClass
private Class<?> createAdaptiveExtensionClass() {
// 动态创建对应类的适配类,创建类的内容,生成的类名如 Protocol$Adaptive
// Dubbo 主要是以URL作为参数传递,动态获取扩展类也是依赖于URL对象,对于增加了@Adaptive 注解的方法,必须有URL作为参数,或是参数必须有URL的成员
String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
ClassLoader classLoader = findClassLoader();
// 动态编译
org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
// 返回动态创建的适配类
return compiler.compile(code, classLoader);
}
生成的动态适配类
package org.apache.dubbo.rpc;
import org.apache.dubbo.common.extension.ExtensionLoader;
// 生成的适配类
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
//依据参数的中的对应参数,获取真实的Protocol实现类
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null) throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
public java.util.List getServers() {
throw new UnsupportedOperationException("The method public default java.util.List org.apache.dubbo.rpc.Protocol.getServers() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");
}
}
从源码中,上面有讲到 cachedAdaptiveClass
变量,这个是当一个接口的实现类有 加 @Adaptive
时,会作为默认的适配类,不需要动态再创建了
如 上面的Compiler
的实现类
@Adaptive
public class AdaptiveCompiler implements Compiler {
private static volatile String DEFAULT_COMPILER;
public static void setDefaultCompiler(String compiler) {
DEFAULT_COMPILER = compiler;
}
@Override
public Class<?> compile(String code, ClassLoader classLoader) {
Compiler compiler;
ExtensionLoader<Compiler> loader = ExtensionLoader.getExtensionLoader(Compiler.class);
String name = DEFAULT_COMPILER; // copy reference
if (name != null && name.length() > 0) {
compiler = loader.getExtension(name);
} else {
// 获取默认的方式
compiler = loader.getDefaultExtension();
}
return compiler.compile(code, classLoader);
}
}
-
获取指定名称的扩展点
这种方式和获取适配类的方式,大致加载逻辑类似,但这里是明确了具体的实现类,是通过名称进行获取的,比较特殊的点如下:
private T createExtension(String name, boolean wrap) { Class<?> clazz = getExtensionClasses().get(name); if (clazz == null) { throw findException(name); } try { T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null) { EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); } injectExtension(instance); // 如果wrap为true时,则会从cachedWrapperClasses已经缓存的包装类进行 // 如获取名为registry的 Protocol 接口的实例,会返回一个 ProtocolFilterWrapper if (wrap) { List<Class<?>> wrapperClassesList = new ArrayList<>(); if (cachedWrapperClasses != null) { wrapperClassesList.addAll(cachedWrapperClasses); wrapperClassesList.sort(WrapperComparator.COMPARATOR); Collections.reverse(wrapperClassesList); } // 判断是否有包装类,如果有确认是否有符合的内容 if (CollectionUtils.isNotEmpty(wrapperClassesList)) { for (Class<?> wrapperClass : wrapperClassesList) { Wrapper wrapper = wrapperClass.getAnnotation(Wrapper.class); if (wrapper == null || (ArrayUtils.contains(wrapper.matches(), name) && !ArrayUtils.contains(wrapper.mismatches(), name))) { instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } } } } // 注入依赖后,初始化实例,调用 lifecycle.initlialize() initExtension(instance); return instance; } catch (Throwable t) { throw new IllegalStateException("Extension instance (name: " + name + ", class: " + type + ") couldn't be instantiated: " + t.getMessage(), t); } }
-
获取激活扩展点
主要是
@Activate
注解起作用,自动激活给定的值对应的扩展,比如@Activate
可以加载指定的Filter
扩展类,因为Filter
可以有多种实现@Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE, ElementType.METHOD}) public @interface Activate { /** * Activate the current extension when one of the groups matches. The group passed into * {@link ExtensionLoader#getActivateExtension(URL, String, String)} will be used for matching. * * @return group names to match * @see ExtensionLoader#getActivateExtension(URL, String, String) */ String[] group() default {}; /** * Activate the current extension when the specified keys appear in the URL's parameters. * <p> * For example, given <code>@Activate("cache, validation")</code>, the current extension will be return only when * there's either <code>cache</code> or <code>validation</code> key appeared in the URL's parameters. * </p> * * @return URL parameter keys * @see ExtensionLoader#getActivateExtension(URL, String) * @see ExtensionLoader#getActivateExtension(URL, String, String) */ String[] value() default {}; /** * Relative ordering info, optional * Deprecated since 2.7.0 * * @return extension list which should be put before the current one */ @Deprecated String[] before() default {}; /** * Relative ordering info, optional * Deprecated since 2.7.0 * * @return extension list which should be put after the current one */ @Deprecated String[] after() default {}; /** * Absolute ordering info, optional * * @return absolute ordering info */ int order() default 0; }
来看源码
public List<T> getActivateExtension(URL url, String key, String group) { String value = url.getParameter(key); // 从URL中获取对应key的值,可以逗号隔开 return getActivateExtension(url, StringUtils.isEmpty(value) ? null : COMMA_SPLIT_PATTERN.split(value), group); }
public List<T> getActivateExtension(URL url, String[] values, String group) { List<T> activateExtensions = new ArrayList<>(); List<String> names = values == null ? new ArrayList<>(0) : asList(values); if (!names.contains(REMOVE_VALUE_PREFIX + DEFAULT_KEY)) { getExtensionClasses();// 在加载对应的接口的具体实现类时,会判断是否有加 `@Activate`注解进行标识,如果有会加入到cachedActivates进行缓存 for (Map.Entry<String, Object> entry : cachedActivates.entrySet()) { String name = entry.getKey(); Object activate = entry.getValue(); String[] activateGroup, activateValue; if (activate instanceof Activate) { activateGroup = ((Activate) activate).group(); activateValue = ((Activate) activate).value(); } else if (activate instanceof com.alibaba.dubbo.common.extension.Activate) { activateGroup = ((com.alibaba.dubbo.common.extension.Activate) activate).group(); activateValue = ((com.alibaba.dubbo.common.extension.Activate) activate).value(); } else { continue; } // 是否符合 // 也会排除类似 -dubbo if (isMatchGroup(group, activateGroup) && !names.contains(name) && !names.contains(REMOVE_VALUE_PREFIX + name) && isActive(activateValue, url)) { activateExtensions.add(getExtension(name)); } } // 进行排序 activateExtensions.sort(ActivateComparator.COMPARATOR); } List<T> loadedExtensions = new ArrayList<>(); for (int i = 0; i < names.size(); i++) { String name = names.get(i); if (!name.startsWith(REMOVE_VALUE_PREFIX) && !names.contains(REMOVE_VALUE_PREFIX + name)) { if (DEFAULT_KEY.equals(name)) { if (!loadedExtensions.isEmpty()) { activateExtensions.addAll(0, loadedExtensions); loadedExtensions.clear(); } } else { loadedExtensions.add(getExtension(name)); } } } if (!loadedExtensions.isEmpty()) { activateExtensions.addAll(loadedExtensions); } return activateExtensions; }
总结
在Dubbo中使用SPI实现动态扩展类,并通过
@Adaptive
实现在运行时动态选择扩展类在Dubbo中,自适应适配类依赖于
URL
对象,在参数或是参数的成员中需要有URL
对象当
@Adaptive
标注在类上,则会是作为自适应扩展类,不会动态生成相应的代理类当
@Adaptive
标注在方法上,则会生成动态的自适应扩展类,如果没有参数会以@SPI
的值作为默认值,并会URL中对应的key的值为参考