dubbo阅读(二) -- 组件装载器和代理实现

dubbo是组件式编程的,耦合度很低。其中用到的加载组建的器件就是ExtensionLoader,姑且叫他组件装载器,ExtensionLoader支持了dubbo动态选择接口的特定实现。

一、功能分析

ExtensionLoader的基本用法:

ExtensionLoader extensionLoader = ExtensionLoader.getExtensionLoader(Protocol.class);

Protocol protocol = extensionLoader.getAdaptiveExtension();// , getExtension("inplementName");
Protocol protocol = extensionLoader.getDefaultExtension();// 返回默认选择的Protocol实现
Protocol protocol = extensionLoader.getExtension("rmi");//选择指定的Protocol实现:rmi

总体流程如下:

  • 1.getExtensionLoader中的参数对应的接口类实现需要带有SPI注解,value是需要选择的默认实现类的name。同时如果需要其接口可以被代理,需要在对应的接口上加上注解Adaptive。举例:

接口Protocol的定义如下:


package com.alibaba.dubbo.rpc;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.Adaptive;
import com.alibaba.dubbo.common.extension.SPI;

/**
 * Protocol. (API/SPI, Singleton, ThreadSafe)
 */
@SPI("dubbo")
public interface Protocol {

    int getDefaultPort();

    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

    void destroy();

}

根据以上配置,Protocol的子类的export和refer两个方法支持被代理。getDefaultExtension返回dubbo协议实现。

  • 2. 对一个接口的若干实现类,用一个配置文件存储起来。路径目录:META-INF/dubbo/internal,META-INF/dubbo,META-INF/,文件名是接口名。每个实现类用Properties的方式存储。举例:

接口com.alibaba.dubbo.rpc.Protocol的实现存储在文件dubbo-2.5.10/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol中,内容如下:

registry=com.alibaba.dubbo.registry.integration.RegistryProtocol
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=com.alibaba.dubbo.rpc.support.MockProtocol
injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol
rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol
hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol
com.alibaba.dubbo.rpc.protocol.http.HttpProtocol
com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol
thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol
memcached=com.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol

redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol
qos=com.alibaba.dubbo.qos.protocol.QosProtocolWrapper


二、实现分析

2.1 获取加载器


    public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
        if (type == null)
            throw new IllegalArgumentException("Extension type == null");
        if (!type.isInterface()) {
            throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
        }
        if (!withExtensionAnnotation(type)) {
            throw new IllegalArgumentException("Extension type(" + type +
                    ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
        }

        ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        if (loader == null) {
            EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
            loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
        }
        return loader;
    }


    private ExtensionLoader(Class<?> type) {
        this.type = type;
        objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
    }

如果有缓存了,从缓存获取。否则创建一个新的加载器,对象objectFactory的作用是,当满足条件1.获取的对象不是ExtensionFactory的子类。2.获取的对象对应的类有setProtocol(Protocol p)/setCluster(Cluster c)等方法(set+带SPI的接口),将获取对应的adaptive实例为入参,调用该方法注入(这里应该有循环依赖的问题)。具体实现是ExtensionLoader#injectExtension

2.2 获取实例

ExtensionLoader#getDefaultExtension为例:


    public T getDefaultExtension() {
        getExtensionClasses();
        if (null == cachedDefaultName || cachedDefaultName.length() == 0
                || "true".equals(cachedDefaultName)) {
            return null;
        }
        return getExtension(cachedDefaultName);
    }



    private Map<String, Class<?>> getExtensionClasses() {
        Map<String, Class<?>> classes = cachedClasses.get();
        if (classes == null) {
            synchronized (cachedClasses) {
                classes = cachedClasses.get();
                if (classes == null) {
                    classes = loadExtensionClasses();
                    cachedClasses.set(classes);
                }
            }
        }
        return classes;
    }


        private Map<String, Class<?>> loadExtensionClasses() {
        final SPI defaultAnnotation = type.getAnnotation(SPI.class);
        if (defaultAnnotation != null) {
            String value = defaultAnnotation.value();
            if (value != null && (value = value.trim()).length() > 0) {
                String[] names = NAME_SEPARATOR.split(value);
                if (names.length > 1) {
                    throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
                            + ": " + Arrays.toString(names));
                }
                if (names.length == 1) cachedDefaultName = names[0];
            }
        }

        Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
        loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
        loadFile(extensionClasses, DUBBO_DIRECTORY);
        loadFile(extensionClasses, SERVICES_DIRECTORY);
        return extensionClasses;
    }


    private void loadFile(Map<String, Class<?>> extensionClasses, String dir) {
        String fileName = dir + type.getName();
        try {
            Enumeration<java.net.URL> urls;
            ClassLoader classLoader = findClassLoader();
            if (classLoader != null) {
                urls = classLoader.getResources(fileName);
            } else {
                urls = ClassLoader.getSystemResources(fileName);
            }
            if (urls != null) {
                while (urls.hasMoreElements()) {
                    java.net.URL url = urls.nextElement();
                    try {
                        BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8"));
                        try {
                            String line = null;
                            while ((line = reader.readLine()) != null) {
                                final int ci = line.indexOf('#');
                                if (ci >= 0) line = line.substring(0, ci);
                                line = line.trim();
                                if (line.length() > 0) {
                                    try {
                                        String name = null;
                                        int i = line.indexOf('=');
                                        if (i > 0) {
                                            name = line.substring(0, i).trim();
                                            line = line.substring(i + 1).trim();
                                        }
                                        if (line.length() > 0) {
                                            Class<?> clazz = Class.forName(line, true, classLoader);
                                            if (!type.isAssignableFrom(clazz)) {
                                                throw new IllegalStateException("Error when load extension class(interface: " +
                                                        type + ", class line: " + clazz.getName() + "), class "
                                                        + clazz.getName() + "is not subtype of interface.");
                                            }
                                            if (clazz.isAnnotationPresent(Adaptive.class)) {
                                                if (cachedAdaptiveClass == null) {
                                                    cachedAdaptiveClass = clazz;
                                                } else if (!cachedAdaptiveClass.equals(clazz)) {
                                                    throw new IllegalStateException("More than 1 adaptive class found: "
                                                            + cachedAdaptiveClass.getClass().getName()
                                                            + ", " + clazz.getClass().getName());
                                                }
                                            } else {
                                                try {
                                                    clazz.getConstructor(type);
                                                    Set<Class<?>> wrappers = cachedWrapperClasses;
                                                    if (wrappers == null) {
                                                        cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
                                                        wrappers = cachedWrapperClasses;
                                                    }
                                                    wrappers.add(clazz);
                                                } catch (NoSuchMethodException e) {
                                                    clazz.getConstructor();
                                                    if (name == null || name.length() == 0) {
                                                        name = findAnnotationName(clazz);
                                                        if (name == null || name.length() == 0) {
                                                            if (clazz.getSimpleName().length() > type.getSimpleName().length()
                                                                    && clazz.getSimpleName().endsWith(type.getSimpleName())) {
                                                                name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase();
                                                            } else {
                                                                throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url);
                                                            }
                                                        }
                                                    }
                                                    String[] names = NAME_SEPARATOR.split(name);
                                                    if (names != null && names.length > 0) {
                                                        Activate activate = clazz.getAnnotation(Activate.class);
                                                        if (activate != null) {
                                                            cachedActivates.put(names[0], activate);
                                                        }
                                                        for (String n : names) {
                                                            if (!cachedNames.containsKey(clazz)) {
                                                                cachedNames.put(clazz, n);
                                                            }
                                                            Class<?> c = extensionClasses.get(n);
                                                            if (c == null) {
                                                                extensionClasses.put(n, clazz);
                                                            } else if (c != clazz) {
                                                                throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
                                                            }
                                                        }
                                                    }
                                                }
                                            }
                                        }
                                    } catch (Throwable t) {
                                        IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t);
                                        exceptions.put(line, e);
                                    }
                                }
                            } // end of while read lines
                        } finally {
                            reader.close();
                        }
                    } catch (Throwable t) {
                        logger.error("Exception when load extension class(interface: " +
                                type + ", class file: " + url + ") in " + url, t);
                    }
                } // end of while urls
            }
        } catch (Throwable t) {
            logger.error("Exception when load extension class(interface: " +
                    type + ", description file: " + fileName + ").", t);
        }
    }



    private T createExtension(String name) {
        Class<?> clazz = getExtensionClasses().get(name);
        if (clazz == null) {
            throw findException(name);
        }
        try {
            T instance = (T) EXTENSION_INSTANCES.get(clazz);
            if (instance == null) {
                EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
                instance = (T) EXTENSION_INSTANCES.get(clazz);
            }
            injectExtension(instance);
            Set<Class<?>> wrapperClasses = cachedWrapperClasses;
            if (wrapperClasses != null && wrapperClasses.size() > 0) {
                for (Class<?> wrapperClass : wrapperClasses) {
                    instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                }
            }
            return instance;
        } catch (Throwable t) {
            throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
                    type + ")  could not be instantiated: " + t.getMessage(), t);
        }
    }


  • 1.loadFile:从三个特定目录加载接口type的配置文件。读取到类路径,并尝试用Class.forName加载它,保存到对象extensionClasses中。有两个特殊情况:
    (1).判断它是否带有注解Adaptive,有的话则这个子类本身是个代理类,当调用getAdaptiveExtension直接返回这个代理类的对象(Adaptive注解在方法和类名上的含义不一样)。只有AdaptiveCompiler类和AdaptiveExtensionFactory类带有这个类注解,因为他们就是干代理这件事情的。
    (2).如果读取到的子类有用type类型对象初始化自身的构造函数,那么说明这是一个装饰类,专门保存。装饰器类是个特殊的接口实现,它被用作扩展其修饰的对象的功能(和代理的功能类似)。

  • 2.createExtension:根据name获取到对应的Class,生成实例,注入其他SPI对象,同时将装配器都装配上,返回实例。

2.3 ExtensionLoader动态代理获取动态对象

核心实现在ExtensionLoader#createAdaptiveExtensionClass->createAdaptiveExtensionClassCode

    private Class<?> createAdaptiveExtensionClass() {
        String code = createAdaptiveExtensionClassCode();
        ClassLoader classLoader = findClassLoader();
        com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
        return compiler.compile(code, classLoader);
    }

ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension()动态地获取代理对象。其中传入的接口的被代理方法需要注解上Adaptive。同时对代理方法的参数需要满足如下一条:

1.入参中有一个参数是com.alibaba.dubbo.common.URL类型的。
2.入参中有一个参数有返回的类型是com.alibaba.dubbo.common.URL类型的method。实际上通常方法命名为getUrl()

这样保证了代理方法能获取到一个URL对象,解析URL里面的信息(例如 url.getParameter("cluster", "failover"); ,选取了url中key为cluster对应的value对应的对象,如果value找不到,则选取默认值failover; 获取Protocol接口类型的对象做了特异性处理,调用url.getProtocol()获取)实例名称extName,最后调用ExtensionLoader#getExtension(String name)动态获取对象.
(例如:ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);)。

2.4 ProxyFactory接口获取代理对象

dubbo 默认配置的代理工厂接口ProxyFactory的实现是JavassistProxyFactory,通过其getProxy方法获取到代理实例。查看代码:

//com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory.java

    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

getProxy接受两个参数,invoker 和interfaces.其中interfaces中携带了要生成的代理方法,代理方法中会去调用invoker执行实际的业务逻辑。具体实现是com.alibaba.dubbo.common.bytecode.Proxy.java#getProxy,内部使用ClassGenerator生成代理对象。

Proxy#getProxy生成动态代理类的主要流程如下:

  • 定义变量handler,同时生成方法newInstance,参数为invoker,这样生成一个将handler初始化为invoker的代理对象。

  • 定义静态变量public static java.lang.reflect.Method[] methods;,将参数interfaces拥有的方法都放入methods中。实现interfaces的所有方法,实际调用handler.invoke(this,methods[i],params);

章节2.3和2.4的都是代理,但是还是有区别的。2.3的动态代理是组件加载器的功能,根据url选取一个本地实现,这个实现是具体的。 2.4中的代理实际应用是返回consumer的bean对象,将接口都实现成转发到invoker处理,通过invoker将请求通过网络发给producer处理。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容