Dubbo具有良好的扩展性,其扩展性依赖于dubbo使用的各种设计模式和扩展点加载机制,本文对Dubbo SPI加载机制进行探讨。
Dubbo SPI概要
Dubbo SPI在Java SPI的基础上进行了扩展,SPI即Service Provider Interface,期初提供给厂商用作插件开发,只是申明了接口,具体实现不在程序中直接定义,实现Java SPI的步骤大致如下:
- 定义接口和方法;
- 编写接口的实现类;
- 在META-INF/services/目录下创建以接口全路径命名的文件,在文件中指明具体实现类的全路径名,多个实现类由分行符分隔;
- 通过java.util.ServiceLoader来加载具体的实现类。
Dubbo SPI对Java SPI进行了改进和扩展,包括:
- Dubbo SPI实现了按需加载扩展类,Java SPI则是一次性加载所有的扩展实现类而造成不必要的资源浪费;
- Dubbo SPI提供了对Spring IOC和Spring AOP的支持。
Dubbo SPI核心原理
Dubbo的扩展机制主要依赖于三个核心注解和一个逻辑类,即
- @SPI
- @Adaptive
- @Activate
- ExtensionLoader
@SPI可以使用在接口、类和枚举类上,通常情况都是用于接口上,标记当前接口是一个扩展点,可以有多个不同的实现,运行时通过配置找到具体的扩展实现类,其源码如下所示,
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface SPI {
String value() default "";
}
其中value代表指定的默认扩展实现类,例如扩展点Compiler指定了默认的扩展实现类为 javassist。
@SPI("javassist")
public interface Compiler {
Class<?> compile(String code, ClassLoader classLoader);
}
@Adaptive表示自适应,其定义如下
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Adaptive {
String[] value() default {};
}
该注解可以用在类上和接口上
- 用于类上表明该实现用硬编码完成不需要运行时动态生成Java代码,后面介绍获取自适应扩展时会详细介绍;
- 用于方法上标示该方法为自适应方法,需要动态生成Java代码,并有Dubbo提供的编译器编译进行编译。
@Activate标示一个扩展点需要自动激活,并指定激活条件,其定义如下
Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Activate {
String[] group() default {};
String[] value() default {};
String[] before() default {};
String[] after() default {};
int order() default 0;
}
其中group和value标示自动激活条件,before、after和order标示激活的顺序。
ExtensionLoader类是整个扩展机制的核心类,负责配置的加载,扩展类class文件的创建和缓存以及扩展类的加载和缓存,ExtensionLoader获取扩展类的入口包括三个:
- getExtension,获取普通的扩展类;
- getAdaptiveExtension,获取自适应的扩展类;
- getActivateExtension,获取自动加载的扩展类。
通过下面的例子来说明Dubbo扩展点的特点
@SPI("defaultBuz")
public interface BusinessService1 {
void doBusiness();
}
@SPI("defaultBuz")
public interface BusinessService2 {
void doBusiness();
}
@SPI("defaultBuz")
public interface BusinessService3 {
void doBusiness();
}
public class BusinessService1Impl implements BusinessService1 {
BusinessService2 businessService2;
BusinessService3 businessService3;
public BusinessService1Impl(BusinessService2 businessService2) {
this.businessService2 = businessService2;
}
public void setBusinessService3(BusinessService3 businessService3) {
this.businessService2 = businessService2;
}
void doBusiness() {
businessService2.doBusiness();
businessService3.doBusiness();
}
}
定义了三个扩展点BusinessService1、BusinessService2和BusinessService3,并提供了BusinessService1的默认实现BusinessService1Impl,该实现中依赖了另外两个扩展点,通过构造方法依赖BusinessService2,通过setter方法依赖BusinessService3,实现了doBusiness方法,即一次调用另外两个扩展点的doBusiness方法。
- 自动包装,加载扩展实现类是如果发现扩展类通过构造函数注入了其他扩展点,如BusinessService1通过构造函数注入了BusinessService2,该扩展实现封装为一个Wrapper扩展点;
- 自动加载,加载机智如果发现扩展实现通过setter方法注入了其他扩展点,如BusinessService1通过setter方法注入的BusinessService3,加载机制会自动加载setter方法的扩展点;
- 自适应,自适应则是通过@Adaptive注解实现的,通过传入的参数动态生成Java代码,调用参数指定的扩展实现进行方法调用;
- 自动激活,自动激活是通过@Activate实现的,通过该注解可以指定自动激活的条件和激活的顺序。
ExtensionLoader源码解析
获取某个扩展点的ExtensionLoader的逻辑比较简单,源码如下所示,首先判断传入的class对象是否符合扩展点的要求,即为接口且接口上有@SPI注解,然后尝试从缓存中获取,缓存中不存在,则通过new创建一个,并放入缓存中。
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
if (type == null)
throw new IllegalArgumentException("Extension type == null");
//判断是否为接口
if (!type.isInterface()) {
throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
}
//判断接口上是否有@SPI注解
if (!withExtensionAnnotation(type)) {
throw new IllegalArgumentException("Extension type(" + type +
") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
}
//从缓存中获取,EXTENSION_LOADERS为一个ConcurrentHashMap
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
//缓存中不存在,则创建一个ExtensionLoader,并放入缓存
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
return loader;
}
上图展示了创建普通扩展类实例的过程,不同的方法我用不同的颜色进行了区分,下面从源码层面进行分析,首先是主逻辑方法getExtension,其大致流程为
- 判断name是否为“true”,为“true”则返回默认扩展类,默认扩展类是SPI注解中value指定的扩展类名称;
- 尝试从缓存中获取扩展类,缓存中存在则直接返回;
- 缓存中不存在,则通过createExtension方法创建扩展类,并放入缓存中
//获取扩展类
public T getExtension(String name) {
if (name == null || name.length() == 0)
throw new IllegalArgumentException("Extension name == null");
//name为true,返回默认扩展类
if ("true".equals(name)) {
return getDefaultExtension();
}
//从缓存中获取扩展类
Holder<Object> holder = cachedInstances.get(name);
if (holder == null) {
cachedInstances.putIfAbsent(name, new Holder<Object>());
holder = cachedInstances.get(name);
}
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
//缓存中不存在,调用createExtension方法创建扩展类
instance = createExtension(name);
holder.set(instance);
}
}
}
return (T) instance;
}
当ConcurrentMap cachedInstances中不存在扩展类的实例时会通过createExtension创建扩展类实例,方法源码如下所示,其大致过程如下
- 获取扩展类的class对象;
- 从缓存EXTENSION_INSTANCES中获取扩展类的实例,若不存在则通过newInstance创建实例对象;
- 通过injectExtension方法注入当前扩展类中setter方法中依赖的其他扩展点;
- 获取当前扩展类所有的包装扩展类WrapperClass,然后注入其setter方法中依赖的其他扩展点。
private T createExtension(String name) {
//根据name获取class文件
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
//内存中获取扩展类的实例,若不存在则创建
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
//注入依赖的扩展类,此类扩展类是以set方法
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && wrapperClasses.size() > 0) {
for (Class<?> wrapperClass : wrapperClasses) {
//注入依赖的扩展类,此类扩展类是以成员变量的身份存在于其他扩展类
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
type + ") could not be instantiated: " + t.getMessage(), t);
}
}
创建扩展类的示例对象是通过其class对象,然后用newInstance创建的,获取class对象也是先从缓存中获取,缓存中不存在则通过loadExtensionClasses()方法来加载所有的扩展类。
//获取所有扩展类
private Map<String, Class<?>> getExtensionClasses() {
//从缓存中获取
Map<String, Class<?>> classes = cachedClasses.get();
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
//缓存中不存在,则加载所有的扩展类class文件
classes = loadExtensionClasses();
cachedClasses.set(classes);
}
}
}
return classes;
}
加载所有扩展类loadExtensionClasses方法源码如下所示,该方法首先会获取SPI注解上value指定的默认扩展实现名并缓存起来,然后一次通过文件流的方式加载"META-INF/dubbo/internal/"、"META-INF/dubbo/"和"META-INF/services/"路径下制定的扩展类。
//加载所有扩展类
private Map<String, Class<?>> loadExtensionClasses() {
final SPI defaultAnnotation = type.getAnnotation(SPI.class);
if (defaultAnnotation != null) {
String value = defaultAnnotation.value();
if (value != null && (value = value.trim()).length() > 0) {
String[] names = NAME_SEPARATOR.split(value);
if (names.length > 1) {
throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
+ ": " + Arrays.toString(names));
}
if (names.length == 1) cachedDefaultName = names[0];
}
}
Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
//加载"META-INF/dubbo/internal/"路径下制定的扩展类
loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
//加载"META-INF/dubbo/"路径下制定的扩展类
loadFile(extensionClasses, DUBBO_DIRECTORY);
//加载"META-INF/services/"路径下制定的扩展类
loadFile(extensionClasses, SERVICES_DIRECTORY);
return extensionClasses;
}
在createExtension创建扩展类的同时会调用injectExtension方法注入依赖的扩展类,injectExtension方法的源码如下所示,首先获取当前扩展类以set开头的setter方法并获取setter方法的参数类型,然后通过ExtensionFactory创建依赖的扩展类的实例,通过反射调用set方法,注入依赖的扩展类。
private T injectExtension(T instance) {
try {
if (objectFactory != null) {
for (Method method : instance.getClass().getMethods()) {
//判断方法是否为set开头的方法,并且只有一个参数并且是public的,其实就是一个setter方法
if (method.getName().startsWith("set")
&& method.getParameterTypes().length == 1
&& Modifier.isPublic(method.getModifiers())) {
Class<?> pt = method.getParameterTypes()[0];
try {
String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
//通过ExtensionFactory创建扩展类
Object object = objectFactory.getExtension(pt, property);
if (object != null) {
//通过反射调用set方法,注入依赖的扩展类
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error("fail to inject via method " + method.getName()
+ " of interface " + type.getName() + ": " + e.getMessage(), e);
}
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}
获取依赖扩展类是通过ExtensionFactory完成的,该工厂提供了一个抽象方法根据type和name获取扩展实现类,其源码如下所示,
@SPI
public interface ExtensionFactory {
<T> T getExtension(Class<T> type, String name);
}
接口ExtensionFactory本身也是一个扩展点,有三个实现类,分别为
- AdaptiveExtensionFactory,有类级别的@Adptive注解,表示默认情况下会加载该扩展实现,处理过程是获取所有ExtensionFactory的实现,依次调用getExtension(Class<T> type, String name)方法直到获取到type和name指定的扩展类为止,若不存在则返回null;
- SpiExtensionFactory,该ExtensionFactory是通过创建ExtensionLoader,通过type和name获取包装扩展类;
- SpringExtensionFactory,上文中提过Dubbo SPI实现了对Spring IOC和AOP的支持,SpringExtensionFactory是这方面很好的体现,该ExtensionFactory缓存Spring的上下文Context,通过name去匹配Context中的bean,找到后判断是否符合类型type,若匹配则返回该bean。
获取自适应扩展类的思路与获取普通扩展类类似,其源码如下所示,具体步骤我在源码的基础上进行了注释,其大致过程为
- 尝试从缓存中获取
- 缓存中不存在,通过createAdaptiveExtension创建扩展类并缓存;
//获取自适应的扩展类
public T getAdaptiveExtension() {
//首先尝试从缓存中获取扩展类实例
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if (createAdaptiveInstanceError == null) {
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
//缓存中不存在,则创建自适应的扩展类
instance = createAdaptiveExtension();
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
}
}
}
} else {
throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
}
}
return (T) instance;
}
createAdaptiveExtension创建扩展类,并且通过injectExtension注入其依赖的扩展类,其过程也是获取class对象通过newInstance创建实例对象。
//创建自适应的扩展类
private T createAdaptiveExtension() {
try {
//获取自适应的扩展类的class文件实例化,并且注入依赖的其他扩展类
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can not create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}
//获取自适应扩展类的class文件
private Class<?> getAdaptiveExtensionClass() {
//获取所有扩展类class对象
getExtensionClasses();
//检查缓存,直接返回
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
//缓存不存在则创建自适应扩展类的class文件
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
同样是先从缓存中获取,缓存中不存在则通过createAdaptiveExtensionClass创建class对象
private Class<?> createAdaptiveExtensionClass() {
//动态生成@Adaptive结尾的代码
String code = createAdaptiveExtensionClassCode();
//获取类加载器
ClassLoader classLoader = findClassLoader();
//获取Compiler的自适应扩展类
com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
//编译得到class文件
return compiler.compile(code, classLoader);
}
不同于普通扩展类的class文件通过文件流的方式读入内存中,对于应用于接口方法级别的@Adaptive所在的接口会通过createAdaptiveExtensionClassCode方法生成后缀为@Adaptive的源代码,例如Protocol接口,其定义如下
@SPI("dubbo")
public interface Protocol {
int getDefaultPort();
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
void destroy();
}
Protocol为一个Dubbo扩展点,export和refer两个方法有@Adaptive注解,表明该方法可以根据参数自适应选择正确的扩展实现,createAdaptiveExtensionClassCode返回的源码如下
public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
}
自动生成的代码实现一些基本功能,如异常处理等,对自适应的方法进行了封装,通过URL中的参数创建对应的扩展extension,实际上还是调用具体extension中的方法,相当于装饰者模式。
动态生成的源代码通过Dubbo提供的编译器Compiler编译为class文件,Dubbo编译器Compiler的定义如下
@SPI("javassist")
public interface Compiler {
Class<?> compile(String code, ClassLoader classLoader);
}
可以看到Compiler也是一个扩展点,不禁感叹Dubbo中一切皆可扩展!Compiler指定了默认扩展实现为"javassist",该接口有四个实现者:
- AbstractCompiler(实现通用功能)
- AdaptiveCompiler(使用类级别的@Adaptive)
- JavassistCompiler(默认扩展实现)
- JdkCompiler
后面三种均继承AbstractCompiler,AbstractCompiler实现了异常处理等基本功能。其中AdaptiveCompiler使用了类级别的@Adaptive,使用在类级别上的@Adaptive不会生成以“@Adaptive”结尾的Java源代码,不会根据URL参数动生成代码,而是使用编码生成,Dubbo中只有两处使用了类级别的@Adaptive,即AdaptiveCompiler和AdaptiveExtensionFactory。
自动激活扩展点主要是通过@Activate注解指定激活条件,满足激活条件的扩展点通过调用普通扩展点加载方法getExtension来完成,最后对设置的排序规则进行排序,其源码如下所示
public List<T> getActivateExtension(URL url, String[] values, String group) {
List<T> exts = new ArrayList<T>();
List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
if (!names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
getExtensionClasses();
for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
String name = entry.getKey();
Activate activate = entry.getValue();
if (isMatchGroup(group, activate.group())) {
T ext = getExtension(name);
if (!names.contains(name)
&& !names.contains(Constants.REMOVE_VALUE_PREFIX + name)
&& isActive(activate, url)) {
exts.add(ext);
}
}
}
Collections.sort(exts, ActivateComparator.COMPARATOR);
}
List<T> usrs = new ArrayList<T>();
for (int i = 0; i < names.size(); i++) {
String name = names.get(i);
if (!name.startsWith(Constants.REMOVE_VALUE_PREFIX)
&& !names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
if (Constants.DEFAULT_KEY.equals(name)) {
if (!usrs.isEmpty()) {
exts.addAll(0, usrs);
usrs.clear();
}
} else {
T ext = getExtension(name);
usrs.add(ext);
}
}
}
if (!usrs.isEmpty()) {
exts.addAll(usrs);
}
return exts;
}
至此三种获取扩展点实现实例的方法介绍完毕,本文并未对自适应扩展实现中源码的生成的过程进行详解,有兴趣的可以研究Dubbo源码。
扩展点使用示例
定义一个扩展点HelloService,通过@SPI指定扩展点的默认实现为"impl2",实现了两个扩展实现HelloServiceImpl1和HelloServiceImpl2如下所示,
@SPI("impl2")
public interface HelloService {
String sayHello();
}
public class HelloServiceImpl1 implements HelloService{
public String sayHello() {
String hello = "hello Impl1";
System.out.println(hello);
return hello;
}
}
public class HelloServiceImpl2 implements HelloService {
public String sayHello() {
String hello = "hello Impl2";
System.out.println(hello);
return hello;
}
}
在目录src/main/resources/META-INF/dubbo下创建com.kkssyy.dubbo.common.service.HelloService文件
impl1=com.kkssyy.dubbo.common.service.HelloServiceImpl1
impl2=com.kkssyy.dubbo.common.service.HelloServiceImpl2
创建ExtensionClient来进行验证
public class ExtensionClient {
public static void main(String[] args) {
ExtensionLoader extensionLoader = ExtensionLoader.getExtensionLoader(HelloService.class);
HelloService helloService = (HelloService) extensionLoader.getExtension("impl1");
HelloService helloService2 = (HelloService) extensionLoader.getDefaultExtension();
//打印hello Impl1
helloService.sayHello();
//打印hello Impl2
helloService2.sayHello();
}
}
helloService为指定的“impl1”扩展实现,该实现控制台打印"hello Impl1",helloService2为默认扩展实现,该实现控制台打印"hello Impl2"。
总结
本文对Dubbo的扩展机智进行了详解,对ExtensionLoader的源码进行了梳理,并对相关的知识进行全面的介绍,从源码中可以看出ExtensionLoader全流程使用了线程安全的ConcurrentHashMap来缓存class和实例对象,提高了多次方法调用的效率,使用了装饰者模式和工厂模式等设计模式提高了代码的扩展性,此外Dubbo中几乎所有的实现都是基于扩展点的,比如加载扩展类的ExtensionFactory和编译器Compiler等,大大提高了框架的可扩展性。
写在最后
源码路上,共勉!!!