GRPC java 分布式调用链跟踪实践

Opentracing基本模型

image.png

如图,在跟踪链中有以下几个比较重要的数据结构和概念:

span:标识一次分布式调用,其自身包含了id,parentId(指向上级Span的id), traceIds,服务名称等重要属性,其应尽量保持精简;
trace:标识整个请求链,即一些列Span的组合。其自身的ID将贯穿整个调用链,其中的每个Span都必须携带这个traceId,因此traceId将在整个调用链中传递;
cs:客户端发起请求,标志Span的开始;
sr:服务端接收到请求,并开始处理内部事务,其中sr - cs则为网络延迟和时钟抖动;
ss:服务端处理完请求,返回响应内容,其中ss - sr则为服务端处理请求耗时;
cr:客户端接收到服务端响应内容,标志着Span的结束,其中cr - ss则为网络延迟和时钟抖动。

客户端调用时间=cr-cs
服务端处理时间=sr-ss

分布式系统调用跟踪的基本架构要求

低侵入性,高性能,高可用容错,低丢失率等。

基于GRPC的分布式系统调用跟踪实践

创建TraceContext

TraceContext通过Threadlocal对span进行保存,并且将traceid和spanid向底层服务传递,zebra对线程上下文传递进行了封装,具体参照GRPC如何实现公共参数与业务参数分离传递下面是TraceContext具体代码

public class TraceContext{

   private static final String SPAN_LIST_KEY = "spanList";

   public static final String TRACE_ID_KEY = "traceId";

   public static final String SPAN_ID_KEY = "spanId";

   public static final String ANNO_CS = "cs";

   public static final String ANNO_CR = "cr";

   public static final String ANNO_SR = "sr";

   public static final String ANNO_SS = "ss";

   private TraceContext(){}

   public static void setTraceId(String traceId) {
       RpcContext.getContext().set(TRACE_ID_KEY, traceId);
   }

   public static String getTraceId() {
       return (String) RpcContext.getContext().get(TRACE_ID_KEY);
   }

   public static String getSpanId() {
       return (String) RpcContext.getContext().get(SPAN_ID_KEY);
   }

   public static void setSpanId(String spanId) {
       RpcContext.getContext().set(SPAN_ID_KEY, spanId);
   }

   @SuppressWarnings("unchecked")
   public static void addSpan(Span span){
       ((List<Span>)RpcContext.getContext().get(SPAN_LIST_KEY)).add(span);
   }

   @SuppressWarnings("unchecked")
   public static List<Span> getSpans(){
       return (List<Span>) RpcContext.getContext().get(SPAN_LIST_KEY);
   }

   public static void clear(){
       RpcContext.getContext().remove(TRACE_ID_KEY);
       RpcContext.getContext().remove(SPAN_ID_KEY);
       RpcContext.getContext().remove(SPAN_LIST_KEY);
   }

   public static void start(){
       clear();
       RpcContext.getContext().set(SPAN_LIST_KEY, new ArrayList<Span>());
   }
}

创建TraceAgent

TraceAgent将span信息上传至kafka,代码如下:

public class TraceAgent {
   private GrpcProperties grpcProperties;
   private KafkaSender sender;
   private AsyncReporter<zipkin2.Span> report;

   public TraceAgent() {
       grpcProperties = SpringContextUtils.getBean(GrpcProperties.class);
       sender = KafkaSender.newBuilder().bootstrapServers(grpcProperties.getCallChainUpdAddr()).topic("zipkin").encoding(Encoding.JSON).build();
       report = AsyncReporter.builder(sender).build();
   }

   public void send(final List<Span> spans){
       spans.forEach(item ->{
           report.report(item);
       });
   }
}

创建ZebraClientTracing

ZebraClientTracing用于记录调用端的span信息,具体代码如下:

@Component
public class ZebraClientTracing {
   public Span startTrace(String method) {
       String id = IdUtils.get() + "";
       String traceId = null;
       if (null == TraceContext.getTraceId()) {
           TraceContext.start();
           traceId = id;
       } else {
           traceId = TraceContext.getTraceId();
       }
       long timestamp = System.currentTimeMillis() * 1000;
       // 注册本地信息
       Endpoint endpoint = Endpoint.newBuilder().ip(NetUtils.getLocalHost()).serviceName(EtcdRegistry.serviceName)
               .port(50003).build();
       // 初始化span
       Span consumerSpan = Span.newBuilder().localEndpoint(endpoint).id(id).traceId(traceId)
               .parentId(TraceContext.getSpanId() + "").name(EtcdRegistry.serviceName).timestamp(timestamp)
               .addAnnotation(timestamp, TraceContext.ANNO_CS).putTag("method", method)
               .putTag("pkgId", RpcContext.getContext().getAttachment("pkg")).build();
       // 将tracing id和spanid放到上下文
       RpcContext.getContext().get().put(TraceContext.TRACE_ID_KEY, consumerSpan.traceId());
       RpcContext.getContext().get().put(TraceContext.SPAN_ID_KEY, String.valueOf(consumerSpan.id()));
       return consumerSpan;
   }

   public void endTrace(Span span, Stopwatch watch,int code) {
       span = span.toBuilder().addAnnotation(System.currentTimeMillis() * 1000, TraceContext.ANNO_CR)
               .duration(watch.stop().elapsed(TimeUnit.MICROSECONDS)).putTag("code", code+"").build();
       TraceAgent traceAgent = new TraceAgent();
       traceAgent.send(TraceContext.getSpans());
   }
}

创建ZebraServerTracing

ZebraServerTracing用于记录服务端的span信息,具体代码如下:

@Component
public class ZebraServerTracing {
   public Span startTrace(String method) {
       String traceId = (String) RpcContext.getContext().get(TraceContext.TRACE_ID_KEY);
       String parentSpanId = (String) RpcContext.getContext().get(TraceContext.SPAN_ID_KEY);

       String id = IdUtils.get() + "";
       TraceContext.start();
       TraceContext.setTraceId(traceId);
       TraceContext.setSpanId(parentSpanId);

       long timestamp = System.currentTimeMillis() * 1000;
       Endpoint endpoint = Endpoint.newBuilder().ip(NetUtils.getLocalHost()).serviceName(EtcdRegistry.serviceName)
               .port(50003).build();
       Span providerSpan = Span.newBuilder().id(id).parentId(parentSpanId).traceId(traceId)
               .name(EtcdRegistry.serviceName).timestamp(timestamp).localEndpoint(endpoint)
               .addAnnotation(timestamp, TraceContext.ANNO_SR).putTag("method", method)
               .putTag("pkgId", RpcContext.getContext().getAttachment("pkg"))
               .build();
       TraceContext.addSpan(providerSpan);
       return providerSpan;
   }

   public void endTrace(Span span, Stopwatch watch,int code) {
       span = span.toBuilder().addAnnotation(System.currentTimeMillis() * 1000, TraceContext.ANNO_SS)
               .duration(watch.stop().elapsed(TimeUnit.MICROSECONDS)).putTag("code", code+"").build();
       TraceAgent traceAgent = new TraceAgent();
       traceAgent.send(TraceContext.getSpans());
   }
}

创建grpc client拦截器

public class HeaderClientInterceptor implements ClientInterceptor {

    private static final Logger log = LogManager.getLogger(HeaderClientInterceptor.class);
    private final ZebraClientTracing clientTracing;
    
    public static ClientInterceptor instance() {
        return new HeaderClientInterceptor();
    }

    private HeaderClientInterceptor() {
        clientTracing = SpringContextUtils.getBean(ZebraClientTracing.class);
    }

    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
            CallOptions callOptions, Channel next) {
        return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
            //判断API网关是否要打开调用链
            boolean isGatewayTracing = "1".equals(RpcContext.getContext().getAttachment(ZebraConstants.ZEBRA_OPEN_TRACING))?true:false;
            boolean isSubTracing = RpcContext.getContext().get(TraceContext.TRACE_ID_KEY)!=null?true:false;
            Stopwatch watch =null;
            Span span =null;
            
            @Override
            public void start(Listener<RespT> responseListener, Metadata headers) {
                if(isSubTracing||isGatewayTracing){
                    span =clientTracing.startTrace(method.getFullMethodName());
                    watch = Stopwatch.createStarted();
                }
                copyThreadLocalToMetadata(headers);
                super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
                    @Override
                    public void onHeaders(Metadata headers) {
                        super.onHeaders(headers);
                    }

                    @Override
                    public void onClose(Status status, Metadata trailers) {
                        super.onClose(status, trailers);
                        if(isSubTracing||isGatewayTracing)
                            clientTracing.endTrace(span, watch,status.getCode().value());
                    }
                }, headers);
            }
        };
    }

    private void copyThreadLocalToMetadata(Metadata headers) {
        Map<String, String> attachments = RpcContext.getContext().getAttachments();
        Map<String, Object> values = RpcContext.getContext().get();
        try {
            if (!attachments.isEmpty()) {
                headers.put(GrpcUtil.GRPC_CONTEXT_ATTACHMENTS, SerializerUtil.toJson(attachments));
            }
            if (!values.isEmpty()) {
                headers.put(GrpcUtil.GRPC_CONTEXT_VALUES, SerializerUtil.toJson(values));
            }
        } catch (Throwable e) {
            log.error(e.getMessage(), e);
        }
    }
}

创建grpc server拦截器

public class HeaderServerInterceptor implements ServerInterceptor {

    private static final Logger log = LogManager.getLogger(HeaderServerInterceptor.class);

    private final ZebraServerTracing serverTracing;

    public static ServerInterceptor instance() {
        return new HeaderServerInterceptor();
    }

    private HeaderServerInterceptor() {
        serverTracing = SpringContextUtils.getBean(ZebraServerTracing.class);
    }

    @Override
    public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, final Metadata headers,
            ServerCallHandler<ReqT, RespT> next) {
        return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
            boolean isSubTracing = RpcContext.getContext().get(TraceContext.TRACE_ID_KEY) != null ? true : false;
            Stopwatch watch = null;
            Span span = null;

            @Override
            public void request(int numMessages) {
                if (isSubTracing) {
                    span = serverTracing.startTrace(call.getMethodDescriptor().getFullMethodName());
                    watch = Stopwatch.createStarted();
                }
                InetSocketAddress remoteAddress = (InetSocketAddress) call.getAttributes()
                        .get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
                RpcContext.getContext().setAttachment(ZebraConstants.REMOTE_ADDRESS, remoteAddress.getHostString());
                copyMetadataToThreadLocal(headers);
                log.debug("FullMethodName:{},RemoteAddress={},attachments={},context={}",
                        call.getMethodDescriptor().getFullMethodName(), remoteAddress.getHostString(),
                        headers.get(GrpcUtil.GRPC_CONTEXT_ATTACHMENTS), headers.get(GrpcUtil.GRPC_CONTEXT_VALUES));
                super.request(numMessages);
            }

            @Override
            public void close(Status status, Metadata trailers) {
                delegate().close(status, trailers);
                if(isSubTracing)
                    serverTracing.endTrace(span, watch,status.getCode().value());
            }

        }, headers);
    }

    private void copyMetadataToThreadLocal(Metadata headers) {
        String attachments = headers.get(GrpcUtil.GRPC_CONTEXT_ATTACHMENTS);
        String values = headers.get(GrpcUtil.GRPC_CONTEXT_VALUES);
        try {
            if (attachments != null) {
                Map<String, String> attachmentsMap = SerializerUtil.fromJson(attachments,
                        new TypeToken<Map<String, String>>() {
                        }.getType());
                RpcContext.getContext().setAttachments(attachmentsMap);
            }
            if (values != null) {
                Map<String, Object> valuesMap = SerializerUtil.fromJson(values, new TypeToken<Map<String, Object>>() {
                }.getType());
                for (Map.Entry<String, Object> entry : valuesMap.entrySet()) {
                    RpcContext.getContext().set(entry.getKey(), entry.getValue());
                }
            }
        } catch (Throwable e) {
            log.error(e.getMessage(), e);
        }
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,794评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,050评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,587评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,861评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,901评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,898评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,832评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,617评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,077评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,349评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,483评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,199评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,824评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,442评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,632评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,474评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,393评论 2 352

推荐阅读更多精彩内容