dubbo之SPI

前言

SPI( Service Provider Interface),个人理解其实是OOP中面向接口编程的一种交互机制;接口定义与具体实现完全剥离,模块、应用通过SPI交互,弱化彼此间的耦合关系,提升应用、模块的可伸缩性。简而言之,SPI是为了动态获取接口的自定义实现类实例,也就是说SPI的结果是一个Object(接口的自定义实现类实例),可以用一个公式表示为 接口的自定义实现类实例 = SPI(接口)。目前使用的SPI主要有以下特点:

1、服务提供方定义接口,具体以jar包形式对外提供;
2、接口定义与实现分离,服务提供方只关注接口定义,无需关注具体实现;
3、方便灵活,使用方有更多的自主性,可以根据实际需求自己实现;

了解完SPI,下面来看看常见SPI的实现,JDK的SPI以及Dubbo的SPI(Spring的SPI暂时不做解析)。

名词说明

SPI实例:指代SPI接口的实现类new出的对象,比如接口B,实现类BB,SPI实例即BB.class.newInstance()生成的对象。
SPI扩展类实例:指代实现SPI接口的实现类对象,比如接口B,实现类BB,SPI扩展类实例即BB.class对象

一、JDK的SPI

1、使用

首先来看JDK的SPI,使用方式如下,这里以接口提供方与使用方分开的形式进行描述:

1、服务提供方定义接口,比如 xxx.xxx.xxx.spi.SimpleSpi;
2、服务使用方引入服务提供方对外提供的jar包,比如实现类:xxx.xxx.xxx.spi.ConsumerSimpleSpi;
3、classPath下新建 META-INF/services/xxx.xxx.xxx.spi.SimpleSpi文件(每行一个实现类),内容如下:
  xxx.xxx.xxx.spi.ConsumerSimpleSpi1
    xxx.xxx.xxx.spi.ConsumerSimpleSpi2
4、使用比较简单,主要分两步:初始化ServiceLoader、遍历使用
  ServiceLoader<SimpleSpi> simpleSpis = ServiceLoader.load(SimpleSpi.class);
    for(SimpleSpi simpleApi : simpleSpis){
      simpleSpis.sayHello();;
  }

2、实现

JDK的SPI的核心逻辑在ServiceLoader,通过静态方法load对外暴露,load内部调用私有构造方法;构造方法内部重点关注reload方法;这个名字有一点误导,内部其实只做了providers(HashMap)的clear操作和LazyIterator的初始化,并没有进行实质性加载。

private ServiceLoader(Class<S> svc, ClassLoader cl) {
    service = Objects.requireNonNull(svc, "Service interface cannot be null");
    loader = (cl == null) ? ClassLoader.getSystemClassLoader() : cl;
    acc = (System.getSecurityManager() != null) ? AccessController.getContext() : null;
    reload();
}

public void reload() {
    providers.clear();
    lookupIterator = new LazyIterator(service, loader);
}

LazyIterator,顾名思义延迟迭代器,真正的加载在迭代时一次性完成(一个接口的所有实现全部加载),缓存至ServiceLoader.providers。核心逻辑如下:

// 判断是否有实现,若有则缓存全限定名(例如:xxx.xxx.xxx.spi.ConsumerSimpleSpi1)至nextName,为下一步实例化作准备
private boolean hasNextService() {
  //非核心逻辑省略 Enumeration<URL> configs = null,
  // 位于LazyIterator,用于存放META-INF/services/下的文件URL
    if (configs == null) {
        String fullName = PREFIX + service.getName();
        if (loader == null)
            configs = ClassLoader.getSystemResources(fullName);
        else
            configs = loader.getResources(fullName);
    }
  
  // 解析 META-INF/services/目录下文件内容,拿到具体的service实现类名
    while ((pending == null) || !pending.hasNext()) {
        if (!configs.hasMoreElements()) {
            return false;
        }
        pending = parse(service, configs.nextElement());
    }
    nextName = pending.next();
    return true;
}

下面是实例化具体的Service

private S nextService() {
    if (!hasNextService())
        throw new NoSuchElementException();
    //String cn = nextName;
    //通过反射生成具体Service实例
    c = Class.forName(cn, false, loader);
    //中间非核心逻辑省略;
    S p = service.cast(c.newInstance());
    // 放入providers缓存
    providers.put(cn, p);
    return p;
    throw new Error();          // This cannot happen
}

可以看到,首次使用时,最终的加载结果是,在providers有多个SimpleSpi的实现类以<实现类名,实现类实例>的形式缓存下来。JDK的SPI实现流程图:

JDK-SPI (1).jpg

缺点:

1、无法按需加载。无论是否用到,每次会加载接口的所有实现类,
2、不够灵活。虽然内置providers缓存,使用的时候还是需要遍历loader;
3、非线程安全。providers缓存无锁,并发场景存在线程安全问题。

Dubbo的SPI

1、使用

聊完JDK的SPI,下面来看Dubbo的SPI如何实现,以及做了哪些改进。与JDK的SPI类似,也是通过配置文件的形式,指定接口具体实现,同样以上面的 xxx.xxx.xxx.spi.SimpleSpi接口为例,不同的是配置文件内容,比如 xxx.xxx.xxx.spi.SimpleSpi有多个实现类,对应spi配置文件内容如下:

spi1=xxx.xxx.xxx.spi.ConsumerSimpleSpi1
spi2=xxx.xxx.xxx.spi.ConsumerSimpleSpi2

使用也更简单,直接通过ExtensionLoader调用即可,比如要获取xxx.xxx.xxx.spi.ConsumerSimpleSpi1的SPI实例,代码如下:

ConsumerSimpleSpi1 spi1 = ExtensionLoader.getExtensionLoader(SimpleSpi.class).getExtension("spi1");

可以看到,与JDK的SPI相比,Dubbo的SPI使用起来更方便一些,设计相对也更合理。下面我们就来了解Dubbo的SPI具体是如何实现的。

2、实现

Dubbo的SPI完全由ExtensionLoader实现,整个过程可以分为两步:

  1. 获取接口的ExtensionLoader实例
  2. 获取接口的指定SPI实现

第一步比较简单,获取ExtensionLoader实例的过程如下:

1、先从EXTENSION_LOADERS取,若有则直接返回,无则开始创建;
2、通过私有构造方法创建loader;初始化化过程中,若接口是ExtensionFactory,初始化结束,否则继续步骤3;
3、获取ExtensionFactory的loader;
4、调用getAdaptiveExtensio(详细过程见下文),加载ExtensionFactory的spi实现
5、ExtensionLoader创建完毕,放入EXTENSION_LOADERS缓存,并返回创建的loader
getExtensionLoader.jpg

获取接口spi实现相对复杂一点,当前dubbo版本提供四种方式,分别对应四个方法:

  1. ExtensionLoader.getExtension(String name)
  2. ExtensionLoader.getAdaptiveExtension()
  3. ExtensionLoader.getDefaultExtension()
  4. ExtensionLoader.getActivateExtension(URL url, String[] values, String group)
    下面对每个方法做详细解析
    ExtensionLoader.getExtension(String name)
public T getExtension(String name) {
        if (StringUtils.isEmpty(name)) {
            throw new IllegalArgumentException("Extension name == null");
        }
            // name= 'true'会直接返回默认SPI实现
        if ("true".equals(name)) {
            return getDefaultExtension();
        }
            //先从缓存拿holder 缓存定义
            //ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<>();
        Holder<Object> holder = cachedInstances.get(name);
        if (holder == null) {
          //holder为空,则new一个
            cachedInstances.putIfAbsent(name, new Holder<Object>());
            holder = cachedInstances.get(name);
        }
        Object instance = holder.get();
            //DCL锁,防止并发创建
        if (instance == null) {
            synchronized (holder) {
                instance = holder.get();
                if (instance == null) {
                    //核心,不存在,则创建扩展实现
                    instance = createExtension(name);
                    //创建完毕,放入holder缓存
                    holder.set(instance);
                }
            }
        }
        return (T) instance;
    }

上层方法逻辑比较简单的,先从map缓存取holder,然后从holder缓存取SPI扩展实现实例,存在则直接返回,否则执行创建逻辑(这里需要注意,仅会实例化当前使用到的spi实例,并放入缓存;不会创建其他没有用到的spi实例);核心逻辑在createExtension

private T createExtension(String name) {
    //这里会先加载spi配置里的扩展类实现结果缓存在cachedClasses 
    Class<?> clazz = getExtensionClasses().get(name);
    if (clazz == null) {
        throw findException(name);
    }
    try {
        //老规矩,先从缓存取; 缓存定义
        //ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<Class<?>, Object>()
        T instance = (T) EXTENSION_INSTANCES.get(clazz);
        if (instance == null) {
            //反射创建结果对象
            EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
            instance = (T) EXTENSION_INSTANCES.get(clazz);
        }
        //注入其他依赖spi扩展
        injectExtension(instance);
        //代理类处理
        Set<Class<?>> wrapperClasses = cachedWrapperClasses;
        if (CollectionUtils.isNotEmpty(wrapperClasses)) {
            for (Class<?> wrapperClass : wrapperClasses) {
                instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
            }
        }
        // 创建完毕,返回
        return instance;
    } catch (Throwable t) {
        throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
                type + ") couldn't be instantiated: " + t.getMessage(), t);
    }
}

这块逻辑比较复杂,一点一点来看,先来看 getExtensionClasses

private Map<String, Class<?>> getExtensionClasses() {
    //先从缓存拿 Holder<Map<String, Class<?>>> cachedClasses = new Holder<>()
    Map<String, Class<?>> classes = cachedClasses.get();
    //同样的DCL锁,
    if (classes == null) {
        synchronized (cachedClasses) {
            classes = cachedClasses.get();
            if (classes == null) {
                // 核心 加载扩展实现类
                classes = loadExtensionClasses();
                cachedClasses.set(classes);
            }
        }
    }
    return classes;
}

// synchronized in getExtensionClasses,只能在同步块中执行
private Map<String, Class<?>> loadExtensionClasses() {
    // 缓存默认扩展实现name;逻辑比较简单,就不列代码了;
    // 直接取SPI注解value参数值,并缓存于cachedDefaultName;
    cacheDefaultExtensionName();        
    //从下面几个目录加载目录下SPI配置文件:
    // 1、 META-INF/dubbo/internal/
    // 2、 META-INF/dubbo/
    // 3、 META-INF/services/ 这里是有优先级的,越靠前,优先级越高
    Map<String, Class<?>> extensionClasses = new HashMap<>();
    loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName());
    loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
    loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName());
    loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
    loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName());
    loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
    return extensionClasses;
}

private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir, String type) {
        //伪代码描述
        // 1、拼接文件全路径
        // 2、拿到classLoader
        // 3、根据文件路径拿到URL
        // 4、遍历URL集合,加载资源resource
      String fileName = dir + type;
      while (urls.hasMoreElements()) {
          java.net.URL resourceURL = urls.nextElement();
          //核心逻辑,重点关注
          loadResource(extensionClasses, classLoader, resourceURL);
     }
}

private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader, java.net.URL resourceURL) {
     BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), "utf-8"));
     try {
         //伪代码描述
         String line;
         while (还有下一行) {
            //1、循环按行读取resource文件
            //2、解析出需加载类全限定名,结果比如 xxx.xxx.xxx.spi.ConsumerSimpleSpi1
           // 3、加载class到extensionClasses缓存
           loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader), name);             
            } finally {
          reader.close();
     }
 }

loadClass的逻辑比较重要,单独拉出来

private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException {
    if (!type.isAssignableFrom(clazz)) {
        throw new IllegalStateException("Error occurred when loading extension class (interface: " +
                type + ", class line: " + clazz.getName() + "), class "
                + clazz.getName() + " is not subtype of interface.");
    }
    if (clazz.isAnnotationPresent(Adaptive.class)) {
        // 扩展类有adaptive注解,则缓存至cachedAdaptiveClass;Class<?> cachedAdaptiveClass = null
        // 重点关注,最多只允许一个SPI扩展类上有Adaptive注解
        cacheAdaptiveClass(clazz);
    } else if (isWrapperClass(clazz)) {
        //是否wrapper,即clazz是唯一的构造方法参数,缓存至cachedWrapperClasses
       // Set<Class<?>> cachedWrapperClasses;
        cacheWrapperClass(clazz);
    } else {
        clazz.getConstructor();
        //老逻辑,暂不关注
        if (StringUtils.isEmpty(name)) {
            name = findAnnotationName(clazz);
            if (name.length() == 0) {
                throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
            }
        }
        
        String[] names = NAME_SEPARATOR.split(name);
        if (ArrayUtils.isNotEmpty(names)) {
            // 多个则默认第一个是active; 
            // Map<String, Object> cachedActivates = new ConcurrentHashMap<>,其中name是key
            cacheActivateClass(clazz, names[0]);
            for (String n : names) {
                // 缓存name;
                // ConcurrentMap<Class<?>, String> cachedNames = new ConcurrentHashMap<>
                cacheName(clazz, n);
                //最后,保存class实例到extensionClasses
                saveInExtensionClass(extensionClasses, clazz, name);
            }
        }
    }
}

至此,SPI配置文件中所有实现类实例加载到cachedClasses,下一步就是从cachedClasses中直接拿到指定的SPI类实例,利用反射创建SPI实例,然后再对SPI实例执行附加操作,比如注入依赖的SPI实现、代理类加工等;代理类的处理比较简单,直接利用反射调用construtor创建代理对象,参数只有一个,即前面的SPI实例;主要看依赖的SPI注入过程

private T injectExtension(T instance) {
    try {
        if (objectFactory != null) {
            //遍历结果实例的所有setter方法
            for (Method method : instance.getClass().getMethods()) {
                if (isSetter(method)) {
                    // 如果有DisableInject注解,跳过
                    if (method.getAnnotation(DisableInject.class) != null) {
                        continue;
                    }
                    // 根据第一个参数判断,如果是基本类型,则跳过
                    Class<?> pt = method.getParameterTypes()[0];
                    if (ReflectUtils.isPrimitives(pt)) {
                        continue;
                    }
                    try {
                        // 解析setter方法属性
                        String property = getSetterProperty(method);
                        //调用set方法, 注入依赖的spi扩展,这里可以去看ExtensionFactory的逻辑
                        Object object = objectFactory.getExtension(pt, property);
                        if (object != null) {
                            //反射调用
                            method.invoke(instance, object);
                        }
                    } catch (Exception e) {
                        logger.error("Failed to inject via method " + method.getName()
                                + " of interface " + type.getName() + ": " + e.getMessage(), e);
                    }
                }
            }
        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
    return instance;
}

代码看的有点懵了吧,画个图,getExtension流程:

getExtension.jpg

加载SPI实现类,loadExtensionClasses流程:


loadExtensionClasses (1).jpg

ExtensionLoader.getAdaptiveExtension

获取Adaptive扩展SPI实例逻辑也比较简单,直接上代码

public T getAdaptiveExtension() {
    //先从缓存取
    Object instance = cachedAdaptiveInstance.get();
    //DCL锁,防并发
    if (instance == null) {
        //创建过程未出现过错误
        if (createAdaptiveInstanceError == null) {
            synchronized (cachedAdaptiveInstance) {
                instance = cachedAdaptiveInstance.get();
                if (instance == null) {
                    try {
                        //核心逻辑,创建,然后放入缓存
                        instance = createAdaptiveExtension();
                        cachedAdaptiveInstance.set(instance);
                    } catch (Throwable t) {
                        createAdaptiveInstanceError = t;
                        throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
                    }
                }
            }
        } else {
            throw new IllegalStateException("Failed to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
        }
    }
    return (T) instance;
}

重点关注createAdaptiveExtension方法:

private T createAdaptiveExtension() {
    try {
        return injectExtension((T) getAdaptiveExtensionClass().newInstance());
    } catch (Exception e) {
        throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
    }
}

//injectExtension逻辑上面已经介绍过了,直接来看getAdaptiveExtensionClass
private Class<?> getAdaptiveExtensionClass() {
            //这里会加载SPI配置文件中的扩展类,流程前面已经介绍过了,不再冗述
        getExtensionClasses();
        if (cachedAdaptiveClass != null) {
            return cachedAdaptiveClass;
        }
            //如果加载的SPI扩展类都没有@Adaptive注解,则创建Adaptive扩展类实例
        return cachedAdaptiveClass = createAdaptiveExtensionClass();
}

private Class<?> createAdaptiveExtensionClass() {
            // 这里封装了一个codeGenerator,直接用字符串拼接代码
        String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
        ClassLoader classLoader = findClassLoader();
            // Compiler接口必有Adaptive实现,否则死循环了
        org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
            //使用compiler编译源码,返回class实例
        return compiler.compile(code, classLoader);
}

代码看起来比较简单,同样,画个图:


getAdativeExtension.jpg

有个细节需要注意,所有通过SPI配置文件加载到的SPI扩展类中,最多只允许一个SPI扩展类上有Adaptive注解;若所有加载到的SPI扩展类上都没有@Adaptive注解,且SPI接口的所有方法中,至少有一个方法上有@Adaptive注解,才会通过字节码生成Adaptive扩展类实例;具体逻辑可以参考ExtensionLoader.cacheAdaptiveClass,AdaptiveClassCodeGenerator的hasAdaptiveMethod、generateMethodContent方法

ExtensionLoader.getDefaultExtension()

获取默认SPI扩展实例的逻辑更简单,直接看代码:

public T getDefaultExtension() {
    //直接加载SPI扩展类
    getExtensionClasses();
    //若无默认SPI,则直接返回null;@SPI("true"),也会返回null
    if (StringUtils.isBlank(cachedDefaultName) || "true".equals(cachedDefaultName)) {
        return null;
    }
    // 走getExension逻辑
    return getExtension(cachedDefaultName);
}

ExtensionLoader.getActivateExtension(URL url, String[] values, String group)

字面理解是获取激活状态的扩展类实现,可以理解为是根据URL中的参数,结合对Activate注解,返回符合条件的spi实例,直接看逻辑

public List<T> getActivateExtension(URL url, String[] values, String group) {
    List<T> exts = new ArrayList<T>();
    List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
    //若value不包含 "-default",加载SPI扩展类,并遍历cachedActivates
    //Map<String, Object> cachedActivates ,key - spi名,value - @Activate注解对象
    if (!names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
        getExtensionClasses();
        for (Map.Entry<String, Object> entry : cachedActivates.entrySet()) {
            String name = entry.getKey();
            Object activate = entry.getValue();
            String[] activateGroup, activateValue;

            if (activate instanceof Activate) {
                activateGroup = ((Activate) activate).group();
                activateValue = ((Activate) activate).value();
            } else if (activate instanceof com.alibaba.dubbo.common.extension.Activate) {
                activateGroup = ((com.alibaba.dubbo.common.extension.Activate) activate).group();
                activateValue = ((com.alibaba.dubbo.common.extension.Activate) activate).value();
            } else {
                continue;
            }
            // group匹配,根据cachedActivates中缓存的spi名称,拿到扩展spi实现
            if (isMatchGroup(group, activateGroup)) {
                T ext = getExtension(name);
                // 配置的names包含当前name且names不包含"-${name}",且满足active条件
                if (!names.contains(name)
                        && !names.contains(Constants.REMOVE_VALUE_PREFIX + name)
                        && isActive(activateValue, url)) {
                    exts.add(ext);
                }
            }
        }
        // 根据order排序
        Collections.sort(exts, ActivateComparator.COMPARATOR);
    }
    //若names里有'-default'
    List<T> usrs = new ArrayList<T>();
    for (int i = 0; i < names.size(); i++) {
        String name = names.get(i);
        // 不是以'-'开头,且names不包含'-name'
        if (!name.startsWith(Constants.REMOVE_VALUE_PREFIX)
                && !names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
            //name=default,全部返回
            if (Constants.DEFAULT_KEY.equals(name)) {
                if (!usrs.isEmpty()) {
                    exts.addAll(0, usrs);
                    usrs.clear();
                }
            } else {
                T ext = getExtension(name);
                usrs.add(ext);
            }
        }
    }
    if (!usrs.isEmpty()) {
        exts.addAll(usrs);
    }
    return exts;
}

老规矩,画个图:


getActivateExtension (1).jpg

整体上可以看到,Dubbo的SPI在实现和使用上更加灵活、设计也更加合理;同时保证了线程安全。

注:dubbo源码版本2.7.1,欢迎指正

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 前言 Dubbo 为了更好地达到 OCP 原则(即“对扩展开放,对修改封闭”的原则),采用了“微内核+插件”的架构...
    小波同学阅读 4,676评论 0 5
  • 前言:在日常的代码中,我们经常使用简单工厂来生成,某个接口不同实现的实例,但是其实还是有替代方案来完成,比如jav...
    北交吴志炜阅读 4,576评论 0 2
  • 1.jdk SPI介绍 SPI 全称为 Service Provider Interface,是一种服务发现机制。...
    ElevenKing阅读 5,940评论 0 7
  • 本篇其实更应该在Dubbo系列的最早去写,只不过因为当初在读源码的时候没有开始更新博客,所以漏了重要的一个章节,那...
    此鱼不得水阅读 19,669评论 1 7
  • 借用dubbo官网介绍的设计图,如下: ** 说明:图中从下至上分为十层,各层均为单向依赖,右边的黑色箭头代表层之...
    德州spark阅读 3,110评论 0 1