详解Dubbo(二):消费端代理

前言

上一篇讲了Dubbo消费端初始化的过程,在应用启动时,Dubbo会扫描classpath下的类,找到@Reference注解后注入生成的远程服务代理。这里面主要涉及到Invoker和Proxy的生成,这篇文章先分析下Proxy的生成过程。

代理工厂

Dubbo中代理实例是通过代理工厂来获得的,代理工厂的接口定义如下:

@SPI("javassist")
public interface ProxyFactory {
    @Adaptive({PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker) throws RpcException;

    @Adaptive({PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;

    @Adaptive({PROXY_KEY})
    <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}

其中getProxy()方法就是给消费端使用来获取代理实例的,可以看到要拿到一个Proxy实例需要提供一个Invoker,第二个方法中的generic参数是代表是否生成一个泛化接口的代理,关于泛化调用后面会有专门的文章来讲解。最后一个getInvoker()是Provider端暴露服务的时候使用,这个讲到服务提供方的时候再说。

代理工厂实现

Dubbo提供了2种代理工厂的实现,一种是JDK自带的动态代理,一种是使用javassist直接生成字节码的实现,默认使用的是第二种。
JDK动态代理
实现类是JdkProxyFactory,直接调用的Proxy.newProxyInstance().

public class JdkProxyFactory extends AbstractProxyFactory {

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
    }
}

这里面的interfaces参数是AbstractProxyFactory中添加的,除了包含invoker要调用的interface外,还会额外添加两个接口EchoServiceDestroyable,这两个是Dubbo框架默认会实现的接口。就是说再所有返回的代理中,都额外实现了这两个接口。
JDK代理的逻辑都是在InvokerInvocationHandler中实现的:

@Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (parameterTypes.length == 0) {
            if ("toString".equals(methodName)) {
                return invoker.toString();
            } else if ("$destroy".equals(methodName)) {
                invoker.destroy();
                return null;
            } else if ("hashCode".equals(methodName)) {
                return invoker.hashCode();
            }
        } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
            return invoker.equals(args[0]);
        }
        RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
        String serviceKey = invoker.getUrl().getServiceKey();
        rpcInvocation.setTargetServiceUniqueName(serviceKey);
      
        if (consumerModel != null) {
            rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
            rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
        }

        return invoker.invoke(rpcInvocation).recreate();
    }

上面的逻辑比较简单,首先判断下要调用的方法是否属于Object类,或者是调用的toString()$destroy()hashCode()方法,如果是的话,就直接调用Invoker的本地方法,不会发起远程调用。否则,将请求参数封装至·RpcInvocation·中,通过Invoker发送出去。
Javassist动态代理
Javassist是一个字节码生成工具,它可以在程序运行期间动态编译源代码生成class的字节码。Dubbo加载这些类的字节码,然后发起调用。说简单点就是,Dubbo为每个远程接口都生成一份源代码并编译,在请求时直接调用这些编译好的类的方法就可以了,这样显然比动态代理效率来的高。

public class JavassistProxyFactory extends AbstractProxyFactory {

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }
}

代理工厂实现类里直接调用的·Proxy.getProxy()·获取一个proxy的class,然后通过传入InvokerInvocationHandler参数调用newInstance()方法。这个参数跟JDK动态代理回调的handler是同一个。所以这里的核心就是Proxy的class是怎么产生和加载的。注意这里用的Proxy类不是JDK那个,这个是Dubbo定义的。

public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
        ...
        ...
        //参数校验的逻辑省略
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < ics.length; i++) {
            String itf = ics[i].getName();
            ...
            ...
            sb.append(itf).append(';');
        }

        // key是所有要实现的接口用分号连在一起
        String key = sb.toString();
        
        // 查看缓存中是否已经生成过这个代理,如果生成过直接返回,如果生成中则等待
        final Map<String, Object> cache;
        synchronized (PROXY_CACHE_MAP) {
            cache = PROXY_CACHE_MAP.computeIfAbsent(cl, k -> new HashMap<>());
        }

        Proxy proxy = null;
        synchronized (cache) {
            do {
                Object value = cache.get(key);
                if (value instanceof Reference<?>) {
                    proxy = (Proxy) ((Reference<?>) value).get();
                    if (proxy != null) {
                        return proxy;
                    }
                }

                if (value == PENDING_GENERATION_MARKER) {
                    try {
                        cache.wait();
                    } catch (InterruptedException e) {
                    }
                } else {
                    cache.put(key, PENDING_GENERATION_MARKER);
                    break;
                }
            }
            while (true);
        }

        long id = PROXY_CLASS_COUNTER.getAndIncrement();
        String pkg = null;
        ClassGenerator ccp = null, ccm = null;
        try {
            // Java类源代码生成器,封装了javaassist
            ccp = ClassGenerator.newInstance(cl);

            Set<String> worked = new HashSet<>();
            List<Method> methods = new ArrayList<>();

            for (int i = 0; i < ics.length; i++) {
                if (!Modifier.isPublic(ics[i].getModifiers())) {
                    String npkg = ics[i].getPackage().getName();
                    if (pkg == null) {
                        pkg = npkg;
                    } else {
                        if (!pkg.equals(npkg)) {
                            throw new IllegalArgumentException("non-public interfaces from different packages");
                        }
                    }
                }
                //添加类要实现的接口
                ccp.addInterface(ics[i]);
                //添加类要实现的方法
                for (Method method : ics[i].getMethods()) {
                    String desc = ReflectUtils.getDesc(method);
                    if (worked.contains(desc) || Modifier.isStatic(method.getModifiers())) {
                        continue;
                    }
                    if (ics[i].isInterface() && Modifier.isStatic(method.getModifiers())) {
                        continue;
                    }
                    worked.add(desc);

                    int ix = methods.size();
                    Class<?> rt = method.getReturnType();
                    Class<?>[] pts = method.getParameterTypes();

                    StringBuilder code = new StringBuilder("Object[] args = new Object[").append(pts.length).append("];");
                    for (int j = 0; j < pts.length; j++) {
                        code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
                    }
                    code.append(" Object ret = handler.invoke(this, methods[").append(ix).append("], args);");
                    if (!Void.TYPE.equals(rt)) {
                        code.append(" return ").append(asArgument(rt, "ret")).append(";");
                    }

                    methods.add(method);
                    ccp.addMethod(method.getName(), method.getModifiers(), rt, pts, method.getExceptionTypes(), code.toString());
                }
            }

            if (pkg == null) {
                pkg = PACKAGE_NAME;
            }

            // 添加构造函数和成员变量
            String pcn = pkg + ".proxy" + id;
            ccp.setClassName(pcn);
            ccp.addField("public static java.lang.reflect.Method[] methods;");
            ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
            ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
            ccp.addDefaultConstructor();
            Class<?> clazz = ccp.toClass();
            clazz.getField("methods").set(null, methods.toArray(new Method[0]));

            // 创建Proxy的子类,覆盖newInstance()方法
            String fcn = Proxy.class.getName() + id;
            ccm = ClassGenerator.newInstance(cl);
            ccm.setClassName(fcn);
            ccm.addDefaultConstructor();
            ccm.setSuperClass(Proxy.class);
            ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
            Class<?> pc = ccm.toClass();
            proxy = (Proxy) pc.newInstance();
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        } finally {
            // release ClassGenerator
            if (ccp != null) {
                ccp.release();
            }
            if (ccm != null) {
                ccm.release();
            }
            synchronized (cache) {
                if (proxy == null) {
                    cache.remove(key);
                } else {
                    cache.put(key, new WeakReference<Proxy>(proxy));
                }
                cache.notifyAll();
            }
        }
        return proxy;
    }

上面代码中参数校验和缓存部分我们跳过,直接看到后面实际上是使用了ClassGenerator生成了两个类,一个是Proxy的子类,它只有一个newInstance()的方法,接收一个InvocationHandler的参数,这个方法返回的就是真正的接口实现类。具体源代码生成和编译部分的代码就不看了,实际是就是字符串的拼接然后用javaassist进行编译并加载。
下面看下Dubbo的Demo中的远程接口,通过javassist生成的代理类的源代码是个什么样子的。
源接口:

public interface DemoService {

    String sayHello(String name);

    default CompletableFuture<String> sayHelloAsync(String name) {
        return CompletableFuture.completedFuture(sayHello(name));
    }

}

生成的实现类源代码(为了便于查看,做了一下format):

public class org.apache.dubbo.common.bytecode.Proxy0 extends org.apache.dubbo.common.bytecode.Proxy {
    public Object newInstance(java.lang.reflect.InvocationHandler h){ 
        return new org.apache.dubbo.common.bytecode.proxy0(h); 
    }
}

public class org.apache.dubbo.common.bytecode.proxy0 implements Destroyable, EchoService, DemoService{
    public static java.lang.reflect.Method[] methods;
    private java.lang.reflect.InvocationHandler handler;
    public proxy0(java.lang.reflect.InvocationHandler arg0){
        handler = arg0;
    }

    public void $destroy(){
        Object[] args = new Object[0]; 
        handler.invoke(this, methods[0], args);
    }
    
    public String sayHello(String arg0){
        Object[] args = new Object[1]; 
        args[0] = arg0; 
        return handler.invoke(this, methods[1], args); 
    }
    
    public CompletableFuture sayHelloAsync(String arg0){
        Object[] args = new Object[1]; 
        args[0] = arg0; 
        return handler.invoke(this, methods[2], args); 
    }
    
    public Object $echo(Object arg0){
        Object[] args = new Object[1]; 
        args[0] = ($w)$1; 
        return handler.invoke(this, methods[3], args); 
    }
}

从生成的源码可以看到javassist代理工厂实际上就是生成一个封装类,所有的方法调用都是通过调用handler来实现的,这种方式是Dubbo默认的生成代理的方式。

本地存根

有时候我们想在调用Proxy前后做一些事情,比如记录日志,异常捕获等,这个时候就会用到Dubbo的Stub。
首先看下官方文档对本地存根的描述:

改成远程服务调用后,客户端通常只剩下接口,而实现全在服务器端,但提供方有些时候想在客户端也执行部分逻辑,比如:做 ThreadLocal 缓存,提前验证参数,调用失败后伪造容错数据等等,此时就需要在 API 中带上 Stub,客户端生成 Proxy 实例,会把 Proxy 通过构造函数传给 Stub [1],然后把 Stub 暴露给用户,Stub 可以决定要不要去调 Proxy。

一个Stub的Demo:

package com.foo;
public class BarServiceStub implements BarService {
    private final BarService barService;
    
    // 构造函数传入真正的远程代理对象
    public BarServiceStub(BarService barService){
        this.barService = barService;
    }
 
    public String sayHello(String name) {
        // 此代码在客户端执行, 你可以在客户端做ThreadLocal本地缓存,或预先验证参数是否合法,等等
        try {
            return barService.sayHello(name);
        } catch (Exception e) {
            // 你可以容错,可以做任何AOP拦截事项
            return "容错数据";
        }
    }
}

实际上stub就是一个代理模式的实现,在dubbo调用到达获取Proxy这一步的时候,不是直接返回proxy,而是把proxy实例给到用户自定义的stub类。所以Dubbo要求Stub必须要有一个入参是对应接口的构造函数。这样最终ProxyFactory返回的是用户自定义的Stub,用户在Stub中决定在调用Proxy的前后做一些自定义操作。用户通过@Reference注解的stub属性来设置Stub类的名字。

总结

Dubbo消费端生成Proxy的逻辑还是比较简单的,下一篇会详细解析下消费端Invoker的构造过程。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,377评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,390评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,967评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,344评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,441评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,492评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,497评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,274评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,732评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,008评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,184评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,837评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,520评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,156评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,407评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,056评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,074评论 2 352

推荐阅读更多精彩内容