☆聊聊Dubbo(七):自定义Filter实践

0 前言

在现行微服务的趋势下,一次调用的过程中涉及多个服务节点,产生的日志分布在不同的服务器上,虽说可以使用ELK技术将分散的日志,汇总到es中,但是如何将这些日志贯穿起来,则是一个关键问题。

如果需要查看一次调用的全链路日志,则一般的做法是通过在系统边界中产生一个 traceId,向调用链的后续服务传递 traceId,后续服务继续使用 traceId打印日志,并再向其他后续服务传递 traceId,此过程简称,traceId透传

在使用HTTP协议作为服务协议的系统里,可以统一使用一个封装好的http client做traceId透传。但是dubbo实现traceId透传就稍微复杂些了。根据上节讲的《☆聊聊Dubbo(六):核心源码-Filter链原理》,一般情况下,会自定义Filter来实现traceId透传,但还有两种比较特殊的实现方式:(1)重新实现dubbo内部的相关类;(2)基于RpcContext实现;

1 基于重写实现

1.1 源码分析

Dubbo Consumer与Provider调用过程

Proxy 是 Dubbo 使用javassist为consumer 端service生成的动态代理instance。

Implement 是provider端的service实现instance。

traceId透传,即要求Proxy 和 Implement具有相同的traceId。Dubbo具有良好的分层特征,transport的对象是RPCInvocation

所以,重写的重点逻辑实现,就是Proxy将traceId放入RPCInvocation,交由Client进行序列化和TCP传输,Server反序列化得到RPCInvocation,取出traceId,交由Implement即可。

下面为consumer端 JavassistProxyFactory 的代码分析:

public class JavassistProxyFactory extends AbstractProxyFactory {

    /**
     * Spring容器启动时,该代理工厂类方法会为Consumer生成Service代理类
     * invoker和interfaces都是从Spring配置文件中读取出来 
     */
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        // 生成Service代理类的每个方法的字节码,都调用了InvokerInvocationHandler.invoke(...)方法,
        // 做实际RpcInvocation包装、序列化、TCP传输、反序列化结果
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper类不能正确处理带$的类名
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName, 
                                      Class<?>[] parameterTypes, 
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
}

下面为consumer端 InvokerInvocationHandler 的代码分析:

public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;
    
    public InvokerInvocationHandler(Invoker<?> handler){
        this.invoker = handler;
    }

    /**
     * 真正调用RPC时,各个Service代理的字节码里调用了这个通用的invoke
     * proxy就是之前生成的代理对象,第二个参数是方法名,第三个参数是参数列表
     * 知道了(1)哪个接口(2)哪个方法(3)参数是什么,就完全可以映射到Provider端实现并获取返回值
     */
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        // 因为到这里,还是consumer端的业务线程,所以在这里取ThreadLocal里的traceId,
        // 再放入RpcInvocation的attachment,那么Provider就可以从收到的RpcInvocation实例取出透传的traceId
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }
}

下面为Provider端 DubboProtocol 的代码分析:

    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
        
        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                //如果是callback 需要处理高版本调用低版本的问题
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || methodsStr.indexOf(",") == -1){
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods){
                            if (inv.getMethodName().equals(method)){
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod){
                        logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
                        return null;
                    }
                }

                // Provider收到报文之后,从线程池中取出一个线程,反序列化出RpcInvocation、并调用实现类的对应方法
                // 所以,此处就是Provider端的实现类的线程,取出traceId,放入ThreadLocal中
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }

1.2 具体实现

package com.alibaba.dubbo.rpc.proxy;

/**
 * traceId工具类这个类是新添加的
 */
public class TraceIdUtil {

    private static final ThreadLocal<String> TRACE_ID = new ThreadLocal<String>();

    public static String getTraceId() {
        return TRACE_ID.get();
    }

    public static void setTraceId(String traceId) {
        TRACE_ID.set(traceId);
    }

}

/**
 * InvokerHandler 这个类 是修改的
 */
public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler){
        this.invoker = handler;
    }

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        if ("toString".equals(methodName) && parameterTypes.length == 0) {
            return invoker.toString();
        }
        if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
            return invoker.hashCode();
        }
        if ("equals".equals(methodName) && parameterTypes.length == 1) {
            return invoker.equals(args[0]);
        }
        // 这里将cosumer 端的traceId放入RpcInvocation
        RpcInvocation rpcInvocation = new RpcInvocation(method, args);
        rpcInvocation.setAttachment("traceId", TraceIdUtil.getTraceId());
        return invoker.invoke(rpcInvocation).recreate();
    }

}


package com.alibaba.dubbo.rpc.protocol.dubbo;

/**
 * dubbo protocol support 重新实现DubboProtocol
 *
 */
public class DubboProtocol extends AbstractProtocol {


    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                //如果是callback 需要处理高版本调用低版本的问题
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || methodsStr.indexOf(",") == -1){
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods){
                            if (inv.getMethodName().equals(method)){
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod){
                        logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                // 这里将收到的consumer端的traceId放入provider端的thread local
                TraceIdUtil.setTraceId(inv.getAttachment("traceId"));
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }
    }
}

2 基于RpcContext实现

在具体讲解自定义filter来实现透传traceId的方案前,我们先来研究下RpcContext对象。其RpcContext本质上是个ThreadLocal对象,其维护了一次rpc交互的上下文信息

/*
 * Copyright 1999-2011 Alibaba Group.
 *  
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *  
 *      http://www.apache.org/licenses/LICENSE-2.0
 *  
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.dubbo.rpc;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.utils.NetUtils;

/**
 * Thread local context. (API, ThreadLocal, ThreadSafe)
 * 
 * 注意:RpcContext是一个临时状态记录器,当接收到RPC请求,或发起RPC请求时,RpcContext的状态都会变化。
 * 比如:A调B,B再调C,则B机器上,在B调C之前,RpcContext记录的是A调B的信息,在B调C之后,RpcContext记录的是B调C的信息。
 * 
 * @see com.alibaba.dubbo.rpc.filter.ContextFilter
 * @author qian.lei
 * @author william.liangf
 * @export
 */
public class RpcContext {
    
    private static final ThreadLocal<RpcContext> LOCAL = new ThreadLocal<RpcContext>() {
        @Override
        protected RpcContext initialValue() {
            return new RpcContext();
        }
    };

    /**
     * get context.
     * 
     * @return context
     */
    public static RpcContext getContext() {
        return LOCAL.get();
    }
    
    /**
     * remove context.
     * 
     * @see com.alibaba.dubbo.rpc.filter.ContextFilter
     */
    public static void removeContext() {
        LOCAL.remove();
    }

    private Future<?> future;

    private List<URL> urls;

    private URL url;

    private String methodName;

    private Class<?>[] parameterTypes;

    private Object[] arguments;

    private InetSocketAddress localAddress;

    private InetSocketAddress remoteAddress;

    private final Map<String, String> attachments = new HashMap<String, String>();

    private final Map<String, Object> values = new HashMap<String, Object>();

    // now we don't use the 'values' map to hold these objects
    // we want these objects to be as generic as possible
    private Object request;
    private Object response;

    @Deprecated
    private List<Invoker<?>> invokers;
    
    @Deprecated
    private Invoker<?> invoker;

    @Deprecated
    private Invocation invocation;
    
    protected RpcContext() {
    }

    /**
     * Get the request object of the underlying RPC protocol, e.g. HttpServletRequest
     *
     * @return null if the underlying protocol doesn't provide support for getting request
     */
    public Object getRequest() {
        return request;
    }

    /**
     * Get the request object of the underlying RPC protocol, e.g. HttpServletRequest
     *
     * @return null if the underlying protocol doesn't provide support for getting request or the request is not of the specified type
     */
    @SuppressWarnings("unchecked")
    public <T> T getRequest(Class<T> clazz) {
        return (request != null && clazz.isAssignableFrom(request.getClass())) ? (T) request : null;
    }


    public void setRequest(Object request) {
        this.request = request;
    }

    /**
     * Get the response object of the underlying RPC protocol, e.g. HttpServletResponse
     *
     * @return null if the underlying protocol doesn't provide support for getting response
     */
    public Object getResponse() {
        return response;
    }

    /**
     * Get the response object of the underlying RPC protocol, e.g. HttpServletResponse
     *
     * @return null if the underlying protocol doesn't provide support for getting response or the response is not of the specified type
     */
    @SuppressWarnings("unchecked")
    public <T> T getResponse(Class<T> clazz) {
        return (response != null && clazz.isAssignableFrom(response.getClass())) ? (T) response : null;
    }

    public void setResponse(Object response) {
        this.response = response;
    }

    /**
     * is provider side.
     * 
     * @return provider side.
     */
    public boolean isProviderSide() {
        URL url = getUrl();
        if (url == null) {
            return false;
        }
        InetSocketAddress address = getRemoteAddress();
        if (address == null) {
            return false;
        }
        String host;
        if (address.getAddress() == null) {
            host = address.getHostName();
        } else {
            host = address.getAddress().getHostAddress();
        }
        return url.getPort() != address.getPort() || 
                ! NetUtils.filterLocalHost(url.getIp()).equals(NetUtils.filterLocalHost(host));
    }

    /**
     * is consumer side.
     * 
     * @return consumer side.
     */
    public boolean isConsumerSide() {
        URL url = getUrl();
        if (url == null) {
            return false;
        }
        InetSocketAddress address = getRemoteAddress();
        if (address == null) {
            return false;
        }
        String host;
        if (address.getAddress() == null) {
            host = address.getHostName();
        } else {
            host = address.getAddress().getHostAddress();
        }
        return url.getPort() == address.getPort() && 
                NetUtils.filterLocalHost(url.getIp()).equals(NetUtils.filterLocalHost(host));
    }

    /**
     * get future.
     * 
     * @param <T>
     * @return future
     */
    @SuppressWarnings("unchecked")
    public <T> Future<T> getFuture() {
        return (Future<T>) future;
    }

    /**
     * set future.
     * 
     * @param future
     */
    public void setFuture(Future<?> future) {
        this.future = future;
    }

    public List<URL> getUrls() {
        return urls == null && url != null ? (List<URL>) Arrays.asList(url) : urls;
    }

    public void setUrls(List<URL> urls) {
        this.urls = urls;
    }

    public URL getUrl() {
        return url;
    }

    public void setUrl(URL url) {
        this.url = url;
    }

    /**
     * get method name.
     * 
     * @return method name.
     */
    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    /**
     * get parameter types.
     * 
     * @serial
     */
    public Class<?>[] getParameterTypes() {
        return parameterTypes;
    }

    public void setParameterTypes(Class<?>[] parameterTypes) {
        this.parameterTypes = parameterTypes;
    }

    /**
     * get arguments.
     * 
     * @return arguments.
     */
    public Object[] getArguments() {
        return arguments;
    }

    public void setArguments(Object[] arguments) {
        this.arguments = arguments;
    }

    /**
     * set local address.
     * 
     * @param address
     * @return context
     */
    public RpcContext setLocalAddress(InetSocketAddress address) {
        this.localAddress = address;
        return this;
    }

    /**
     * set local address.
     * 
     * @param host
     * @param port
     * @return context
     */
    public RpcContext setLocalAddress(String host, int port) {
        if (port < 0) {
            port = 0;
        }
        this.localAddress = InetSocketAddress.createUnresolved(host, port);
        return this;
    }

    /**
     * get local address.
     * 
     * @return local address
     */
    public InetSocketAddress getLocalAddress() {
        return localAddress;
    }

    public String getLocalAddressString() {
        return getLocalHost() + ":" + getLocalPort();
    }
    
    /**
     * get local host name.
     * 
     * @return local host name
     */
    public String getLocalHostName() {
        String host = localAddress == null ? null : localAddress.getHostName();
        if (host == null || host.length() == 0) {
            return getLocalHost();
        }
        return host;
    }

    /**
     * set remote address.
     * 
     * @param address
     * @return context
     */
    public RpcContext setRemoteAddress(InetSocketAddress address) {
        this.remoteAddress = address;
        return this;
    }
    
    /**
     * set remote address.
     * 
     * @param host
     * @param port
     * @return context
     */
    public RpcContext setRemoteAddress(String host, int port) {
        if (port < 0) {
            port = 0;
        }
        this.remoteAddress = InetSocketAddress.createUnresolved(host, port);
        return this;
    }

    /**
     * get remote address.
     * 
     * @return remote address
     */
    public InetSocketAddress getRemoteAddress() {
        return remoteAddress;
    }
    
    /**
     * get remote address string.
     * 
     * @return remote address string.
     */
    public String getRemoteAddressString() {
        return getRemoteHost() + ":" + getRemotePort();
    }
    
    /**
     * get remote host name.
     * 
     * @return remote host name
     */
    public String getRemoteHostName() {
        return remoteAddress == null ? null : remoteAddress.getHostName();
    }

    /**
     * get local host.
     * 
     * @return local host
     */
    public String getLocalHost() {
        String host = localAddress == null ? null : 
            localAddress.getAddress() == null ? localAddress.getHostName() 
                    : NetUtils.filterLocalHost(localAddress.getAddress().getHostAddress());
        if (host == null || host.length() == 0) {
            return NetUtils.getLocalHost();
        }
        return host;
    }

    /**
     * get local port.
     * 
     * @return port
     */
    public int getLocalPort() {
        return localAddress == null ? 0 : localAddress.getPort();
    }

    /**
     * get remote host.
     * 
     * @return remote host
     */
    public String getRemoteHost() {
        return remoteAddress == null ? null : 
            remoteAddress.getAddress() == null ? remoteAddress.getHostName() 
                    : NetUtils.filterLocalHost(remoteAddress.getAddress().getHostAddress());
    }

    /**
     * get remote port.
     * 
     * @return remote port
     */
    public int getRemotePort() {
        return remoteAddress == null ? 0 : remoteAddress.getPort();
    }

    /**
     * get attachment.
     * 
     * @param key
     * @return attachment
     */
    public String getAttachment(String key) {
        return attachments.get(key);
    }

    /**
     * set attachment.
     * 
     * @param key
     * @param value
     * @return context
     */
    public RpcContext setAttachment(String key, String value) {
        if (value == null) {
            attachments.remove(key);
        } else {
            attachments.put(key, value);
        }
        return this;
    }

    /**
     * remove attachment.
     * 
     * @param key
     * @return context
     */
    public RpcContext removeAttachment(String key) {
        attachments.remove(key);
        return this;
    }

    /**
     * get attachments.
     * 
     * @return attachments
     */
    public Map<String, String> getAttachments() {
        return attachments;
    }

    /**
     * set attachments
     * 
     * @param attachment
     * @return context
     */
    public RpcContext setAttachments(Map<String, String> attachment) {
        this.attachments.clear();
        if (attachment != null && attachment.size() > 0) {
            this.attachments.putAll(attachment);
        }
        return this;
    }
    
    public void clearAttachments() {
        this.attachments.clear();
    }

    /**
     * get values.
     * 
     * @return values
     */
    public Map<String, Object> get() {
        return values;
    }

    /**
     * set value.
     * 
     * @param key
     * @param value
     * @return context
     */
    public RpcContext set(String key, Object value) {
        if (value == null) {
            values.remove(key);
        } else {
            values.put(key, value);
        }
        return this;
    }

    /**
     * remove value.
     * 
     * @param key
     * @return value
     */
    public RpcContext remove(String key) {
        values.remove(key);
        return this;
    }

    /**
     * get value.
     * 
     * @param key
     * @return value
     */
    public Object get(String key) {
        return values.get(key);
    }

    public RpcContext setInvokers(List<Invoker<?>> invokers) {
        this.invokers = invokers;
        if (invokers != null && invokers.size() > 0) {
            List<URL> urls = new ArrayList<URL>(invokers.size());
            for (Invoker<?> invoker : invokers) {
                urls.add(invoker.getUrl());
            }
            setUrls(urls);
        }
        return this;
    }

    public RpcContext setInvoker(Invoker<?> invoker) {
        this.invoker = invoker;
        if (invoker != null) {
            setUrl(invoker.getUrl());
        }
        return this;
    }

    public RpcContext setInvocation(Invocation invocation) {
        this.invocation = invocation;
        if (invocation != null) {
            setMethodName(invocation.getMethodName());
            setParameterTypes(invocation.getParameterTypes());
            setArguments(invocation.getArguments());
        }
        return this;
    }

    /**
     * @deprecated Replace to isProviderSide()
     */
    @Deprecated
    public boolean isServerSide() {
        return isProviderSide();
    }
    
    /**
     * @deprecated Replace to isConsumerSide()
     */
    @Deprecated
    public boolean isClientSide() {
        return isConsumerSide();
    }
    
    /**
     * @deprecated Replace to getUrls()
     */
    @Deprecated
    @SuppressWarnings({ "unchecked", "rawtypes" })
    public List<Invoker<?>> getInvokers() {
        return invokers == null && invoker != null ? (List)Arrays.asList(invoker) : invokers;
    }

    /**
     * @deprecated Replace to getUrl()
     */
    @Deprecated
    public Invoker<?> getInvoker() {
        return invoker;
    }

    /**
     * @deprecated Replace to getMethodName(), getParameterTypes(), getArguments()
     */
    @Deprecated
    public Invocation getInvocation() {
        return invocation;
    }
    
    /**
     * 异步调用 ,需要返回值,即使步调用Future.get方法,也会处理调用超时问题.
     * @param callable
     * @return 通过future.get()获取返回结果.
     */
    @SuppressWarnings("unchecked")
    public <T> Future<T> asyncCall(Callable<T> callable) {
        try {
            try {
                setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
                final T o = callable.call();
                //local调用会直接返回结果.
                if (o != null) {
                    FutureTask<T> f = new FutureTask<T>(new Callable<T>() {
                        public T call() throws Exception {
                            return o;
                        }
                    });
                    f.run();
                    return f;
                } else {
                    
                }
            } catch (Exception e) {
                throw new RpcException(e);
            } finally {
                removeAttachment(Constants.ASYNC_KEY);
            }
        } catch (final RpcException e) {
            return new Future<T>() {
                public boolean cancel(boolean mayInterruptIfRunning) {
                    return false;
                }
                public boolean isCancelled() {
                    return false;
                }
                public boolean isDone() {
                    return true;
                }
                public T get() throws InterruptedException, ExecutionException {
                    throw new ExecutionException(e.getCause());
                }
                public T get(long timeout, TimeUnit unit)
                        throws InterruptedException, ExecutionException,
                        TimeoutException {
                    return get();
                }
            };
        }
        return ((Future<T>)getContext().getFuture());
    }
    
    /**
     * oneway调用,只发送请求,不接收返回结果.
     * @param callable
     */
    public void asyncCall(Runnable runable) {
        try {
            setAttachment(Constants.RETURN_KEY, Boolean.FALSE.toString());
            runable.run();
        } catch (Throwable e) {
            //FIXME 异常是否应该放在future中?
            throw new RpcException("oneway call error ." + e.getMessage(), e);
        } finally {
            removeAttachment(Constants.RETURN_KEY);
        }
    }
}

注:RpcContext里的attachments信息会填入到RpcInvocation对象中, 一起传递过去

因此有人就建议可以简单的把traceId注入到RpcContext中,这样就可以简单的实现traceId的透传了,事实是否如此,先让我们来一起实践一下。

定义Dubbo接口类:

public interface IEchoService {
    String echo(String name);
}

编写服务端代码(Provider):

@Service("echoService")
public class EchoServiceImpl implements IEchoService {
 
    @Override
    public String echo(String name) {
        String traceId = RpcContext.getContext().getAttachment("traceId");
        System.out.println("name = " + name + ", traceId = " + traceId);
        return name;
    }
 
    public static void main(String[] args) {
        ClassPathXmlApplicationContext applicationContext =
                new ClassPathXmlApplicationContext("spring-dubbo-test-producer.xml");
 
        System.out.println("server start");
        while (true) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
            }
        }
    } 
}

编写客户端代码(Consumer):

public class EchoServiceConsumer {
 
    public static void main(String[] args) {
        ClassPathXmlApplicationContext applicationContext =
                new ClassPathXmlApplicationContext("spring-dubbo-test-consumer.xml");
 
        IEchoService service = (IEchoService) applicationContext
                .getBean("echoService");
 
        // *) 设置traceId
        RpcContext.getContext().setAttachment("traceId", "100001");
        System.out.println(RpcContext.getContext().getAttachments());
        // *) 第一调用
        service.echo("lilei");
 
        // *) 第二次调用
        System.out.println(RpcContext.getContext().getAttachments());
        service.echo("hanmeimei");
    }
}

执行的结果如下:

服务端输出:
name = lilei, traceId = 100001
name = hanmeimei, traceId = null
 
客户端输出:
{traceId=100001}
{}

从服务端的输出信息中,我们可以惊喜的发现,traceId确实传递过去了,但是只有第一次有,第二次没有。而从客户端对RpcContext的内容输出,也印证了这个现象,同时产生这个现象的本质原因是 RpcContext对象的attachment在一次rpc交互后被清空了

给RpcContext的clearAttachments方法, 设置断点后复现. 我们可以找到如下调用堆栈:

java.lang.Thread.State: RUNNABLE
    at com.alibaba.dubbo.rpc.RpcContext.clearAttachments(RpcContext.java:438)
    at com.alibaba.dubbo.rpc.filter.ConsumerContextFilter.invoke(ConsumerContextFilter.java:50)
    at com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper$1.invoke(ProtocolFilterWrapper.java:91)
    at com.alibaba.dubbo.rpc.protocol.InvokerWrapper.invoke(InvokerWrapper.java:53)
    at com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:77)
    at com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:227)
    at com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:72)
    at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:52)
    at com.alibaba.dubbo.common.bytecode.proxy0.echo(proxy0.java:-1)
    at com.test.dubbo.EchoServiceConsumer.main(EchoServiceConsumer.java:20)

其最直接的调用为Dubbo自带的ConsumerContextFilter,让我们来分析其代码:

@Activate(
    group = {"consumer"},
    order = -10000
)
public class ConsumerContextFilter implements Filter {
    public ConsumerContextFilter() {
    }
 
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        RpcContext.getContext().setInvoker(invoker).setInvocation(invocation)
                .setLocalAddress(NetUtils.getLocalHost(), 0)
                .setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort());
        if(invocation instanceof RpcInvocation) {
            ((RpcInvocation)invocation).setInvoker(invoker);
        }
 
        Result var3;
        try {
            var3 = invoker.invoke(invocation);
        } finally {
            RpcContext.getContext().clearAttachments();
        }
 
        return var3;
    }
}

确实在finally代码片段中,我们发现RpcContext在每次rpc调用后, 都会清空attachment对象

既然我们找到了本质原因,那么解决方法,可以在每次调用的时候,重新设置下traceId,比如像这样(看着感觉吃像相对难看了一点):

// *) 第一调用
RpcContext.getContext().setAttachment("traceId", "100001");
service.echo("lilei");
 
// *) 第二次调用
RpcContext.getContext().setAttachment("traceId", "100001");
service.echo("hanmeimei");

3 基于Filter实现

先引入一个工具类:

public class TraceIdUtils {
 
    private static final ThreadLocal<String> traceIdCache
            = new ThreadLocal<String>();
 
    public static String getTraceId() {
        return traceIdCache.get();
    }
 
    public static void setTraceId(String traceId) {
        traceIdCache.set(traceId);
    }
 
    public static void clear() {
        traceIdCache.remove();
    }
 
}

然后我们定义一个Filter类:

package com.test.dubbo;
 
public class TraceIdFilter implements Filter {
 
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        String traceId = RpcContext.getContext().getAttachment("traceId");
        if ( !StringUtils.isEmpty(traceId) ) {
            // *) 从RpcContext里获取traceId并保存
            TraceIdUtils.setTraceId(traceId);
        } else {
            // *) 交互前重新设置traceId, 避免信息丢失
            RpcContext.getContext().setAttachment("traceId", TraceIdUtils.getTraceId());
        }
        // *) 实际的rpc调用
        return invoker.invoke(invocation);
    }
}

在resource目录下, 添加META-INF/dubbo目录, 继而添加com.alibaba.dubbo.rpc.Filter文件:

META-INF/dubbo/com.alibaba.dubbo.rpc.Filter文件

编辑(com.alibaba.dubbo.rpc.Filter文件)内容如下:

traceIdFilter=com.test.dubbo.TraceIdFilter

然后我们给dubbo的producer和consumer都配置对应的filter项:

服务端:
<dubbo:service interface="com.test.dubbo.IEchoService" ref="echoService" version="1.0.0"
        filter="traceIdFilter"/>

客户端:
<dubbo:reference interface="com.test.dubbo.IEchoService" id="echoService" version="1.0.0"
        filter="traceIdFilter"/>

服务端的测试代码小改为如下:

@Service("echoService")
public class EchoServiceImpl implements IEchoService {
 
    @Override
    public String echo(String name) {
        String traceId = TraceIdUtils.getTraceId();
        System.out.println("name = " + name + ", traceId = " + traceId);
        return name;
    }
 
}

客户端的测试代码片段为:

// *) 第一调用
RpcContext.getContext().setAttachment("traceId", "100001");
service.echo("lilei");
 
// *) 第二次调用
service.echo("hanmeimei");

同样的代码, 测试结果如下:

服务端输出:
name = lilei, traceId = 100001
name = hanmeimei, traceId = 100001
 
客户端输出:
{traceId=100001}
{}

符合预期,感觉这个方案就非常优雅了。RpcContext的attachment依旧被清空(ConsumerContextFilter在自定义的Filter后执行),但是每次rpc交互前,traceId会被重新注入,保证跟踪线索透传成功。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,233评论 6 495
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,357评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,831评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,313评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,417评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,470评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,482评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,265评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,708评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,997评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,176评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,503评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,150评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,391评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,034评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,063评论 2 352

推荐阅读更多精彩内容