Skywalking源码研究之上报和采集

关于Agent端的上报和OAP的采集,Skywalking技术比较成熟,重点关注插件在拦截器中如何进行上报

ContextManager

插件进行数据上报(上传span),主要是通过ContextManager提供的API完成的,重点方法如下

beforeMethod中常用的方法:

  • ContextManager#createEntrySpan 创建入口span
  • ContextManager#createLocalSpan 创建本地span
  • ContextManager#createExitSpan 创建出口span
  • Tags#URL#set 设置span的url属性
  • span#setComponent 设置span的component属性
  • SpanLayer#asRPCFramework 标记span为RPC
  • SpanLayer#asHttp 标记span为http
  • 等等

注意:beforeMethod调用的各种span的create方法并没有实际的进行数据上报,只是暂存在ThreadLocal中

实际触发上报一般是在afterMethod中,常用的方法

  • ContextManager#stopSpan

比较直观,即在方法结束时标记span结束,重点是该方法内部会调用TracingContext#finish方法,这个方法会实际触发数据上报

以Dubbo插件为例,看一下插件对上报API的使用

@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                         MethodInterceptResult result) throws Throwable {
    // dubbo的rpcContext,使用它可以通过网络传递附件,这里就是传递上下文载体
    RpcContext rpcContext = RpcContext.getContext();
    
    if (isConsumer) {// 消费方
        final ContextCarrier contextCarrier = new ContextCarrier();
        // 调用createExitSpan创建出口span
        span = ContextManager.createExitSpan(generateOperationName(requestURL, invocation), contextCarrier, host + ":" + port);
        // 使用附件传输上下文载体
        CarrierItem next = contextCarrier.items();
        while (next.hasNext()) {
            next = next.next();
            rpcContext.setAttachment(next.getHeadKey(), next.getHeadValue());
        }
    } else { // 服务方
        ContextCarrier contextCarrier = new ContextCarrier();
        // 跟据附件获取上下文载体
        CarrierItem next = contextCarrier.items();
        while (next.hasNext()) {
            next = next.next();
            next.setHeadValue(rpcContext.getAttachment(next.getHeadKey()));
        }
        // 调用createEntrySpan创建入口span
        span = ContextManager.createEntrySpan(generateOperationName(requestURL, invocation), contextCarrier);
        span.setPeer(rpcContext.getRemoteAddressString()); // span设置peer属性
    }

    Tags.URL.set(span, generateRequestURL(requestURL, invocation)); // 设置url属性
    collectArguments(needCollectArguments, argumentsLengthThreshold, span, invocation); // 收集参数
    span.setComponent(ComponentsDefine.DUBBO); // 设置component属性
    SpanLayer.asRPCFramework(span); // 设置layer属性
}

@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                          Object ret) throws Throwable {
    
    // 标记span结束
    ContextManager.stopSpan();
    return ret;
}

数据上报

finish

上文提到ContextManager#stopSpan方法会触发数据上报,因为内部会调用TracingContext#finish方法:

private void finish() {
    // 正在运行的span是否为空,即segment下的span全部完成
    boolean isFinishedInMainThread = activeSpanStack.isEmpty() && running;
    if (isFinishedInMainThread) { // 如果已完成
        // 发布segment完成通知
        TracingThreadListenerManager.notifyFinish(this); 
    }
}

可以看到span在stop之后并不一定会上报,而是整个segment下的span全部完成才触发上报,并且使用发布监听模式通知消费者处理,其中消费者就是TraceSegmentServiceClient

TraceSegmentServiceClient

TraceSegmentServiceClient接收到segment结束信号后,同样不是直接建立网络连接上报,而是将消息发给了skywalking内部的消息队列

@Override
public void afterFinished(TraceSegment traceSegment) {
    if (!carrier.produce(traceSegment)) { // 发布到carrier消息队列
        ...
    }
}

Agent 收集到 Trace 数据后,不是写入外部消息队列( 例如,Kafka )或者日志文件,而是 Agent写入内存消息队列,后台线程【异步】发送给 Collector

这个消息队列就是skywalking自实现的:DataCarrier

同时TraceSegmentServiceClient本身即是消息的发布者,又是消费者

// TraceSegmentServiceClient的初始化方法
public void boot() {
    ...
    carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE, BufferStrategy.IF_POSSIBLE); // 创建消息队列
    carrier.consume(this, 1); // 消费者就是this, 即TraceSegmentServiceClient本身
}

而消费的方法就是TraceSegmentServiceClient#consume方法

@Override
public void consume(List<TraceSegment> data) { // 参数就是Segment,因为使用队列,可能是多个
    // 使用Grpc包工具
    final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
    StreamObserver<SegmentObject> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(
        Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
    ).collect(new StreamObserver<Commands>() {
        ...
    });
    
    
    try {
        // 循环流式上报segment
        for (TraceSegment segment : data) {
            SegmentObject upstreamSegment = segment.transform();
            upstreamSegmentStreamObserver.onNext(upstreamSegment);
        }
    } catch (Throwable t) {
        LOGGER.error(t, "Transform and send UpstreamSegment to collector fail.");
    }
    // 结束
    upstreamSegmentStreamObserver.onCompleted();
}

这里的upstreamSegmentStreamObserver工具是使用grpc-stub的grpc协议

<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-stub</artifactId>
    <version>${grpc.version}</version>
</dependency>

使用流式数据传输的方式实现数据上报,上传的对象SegmentObject

OAP采集

OAP采集器主要在server-receiver-plugin包中,依然是使用插件的方式应对不同数据的采集,比如

  • skywalking-trace-receiver-plugin 分布式链路采集
  • skywalking-jvm-receiver-plugin jvm采集
  • 等等大概10几个采集插件

这里主要分析分布式链路采集插件:skywalking-trace-receiver-plugin

TraceSegmentReportServiceHandler

分布式链路trace的采集主要入口TraceSegmentReportServiceHandler#collect方法,接受流式数据对象SegmentObject(对应了Agent端的上传对象)

@Override
public StreamObserver<SegmentObject> collect(StreamObserver<Commands> responseObserver) {
    // 使用grpc.stub的服务端
    return new StreamObserver<SegmentObject>() {
        // 收到数据的处理
        @Override
        public void onNext(SegmentObject segment) {
            try {
                // 调用segmentParserService.send
                segmentParserService.send(segment);
            } catch (Exception e) {
                ...
            }
        }
        ...
    };
}

具体的Segment信息处理交由segmentParserService#send

SegmentParse

Segment信息的解析器,send方法如下

public void send(SegmentObject segment) {
    final TraceAnalyzer traceAnalyzer = new TraceAnalyzer(moduleManager, listenerManager, config);
    traceAnalyzer.doAnalysis(segment);
}

转交给TraceAnalyzer分析并处理

TraceAnalyzer

其中doAnalysis负责分析Segment对象,方法如下

// 解析SegmentO对象,基本都是订阅发布模式
public void doAnalysis(SegmentObject segmentObject) {
    if (segmentObject.getSpansList().size() == 0) {
        return;
    }

    createSpanListeners();

    notifySegmentListener(segmentObject);

    // 循环所有的span
    segmentObject.getSpansList().forEach(spanObject -> {
        if (spanObject.getSpanId() == 0) {
            // 解析第一个 Span
            notifyFirstListener(spanObject, segmentObject);
        }

        if (SpanType.Exit.equals(spanObject.getSpanType())) {
            // 解析出口Span
            notifyExitListener(spanObject, segmentObject);
        } else if (SpanType.Entry.equals(spanObject.getSpanType())) {
            // 解析入口Span
            notifyEntryListener(spanObject, segmentObject);
        } else if (SpanType.Local.equals(spanObject.getSpanType())) {
            // 解析本地Span
            notifyLocalListener(spanObject, segmentObject);
        } else {
            log.error("span type value was unexpected, span type name: {}", spanObject.getSpanType()
                                                                                      .name());
        }
    });
    
    // 通知 Span 监听器们,执行构建各自的数据
    notifyListenerToBuild();
}

总结就是通知 Span 监听器们,去构建各自的数据,经过流式处理,最终存储到存储器。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,233评论 6 495
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,357评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,831评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,313评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,417评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,470评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,482评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,265评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,708评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,997评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,176评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,503评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,150评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,391评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,034评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,063评论 2 352

推荐阅读更多精彩内容