插件开发的基本概念
核心对象概念: Span, Trace Segment, ContextCarrier, ContextSnapshot
span
span 概念上可以理解为一次方法调用, rpc调用, db访问, 和 Google Dapper 的 OpenTracing 类似
Skywalking 中 Span 的分类依据是否"跨进程"
- 不跨线程: LocalSpan 可以分为
- 一次普通方法调用
- 跨线程后记录异步线程执行信息
- 跨进程: RemoteSpan, 可以细分为
- EntrySpan: "接收端 或 入口端", 例如SOA服务端 或 服务端入口端点, 例如: Controller 入口, RPC 服务端, 消息队列的消费者,
- ExitSpan: "发送端 或 出口端", 代表应用服务的客户端, SOA 客户端, 消息队列生产者, 数据库查询请求, RPC 组件请求
Trace Segment
Trace Segment 是在一个线程中归属同一个操作的所有 span 的聚合, 这些span都有唯一的 segmentId
核心属性如下
- TraceSegmentId: 雪花算法生成的id
- Refs: 父级引用, 对于 RPC 调用只有一个, 但对于消息队列或批处理框架则可能会有多个(多个生产者)
- Spans: 此 segments 的集合
- relatedGlobalTraceId: 此 Trace Segment 的 traceId, 8.x 后只有一个了, 之前是 DistributedTraceIds
- ignore: 为 true 则不会上传 skywalking 的collector
- isSizeLimited: 从属于此 Trace Segment 数量限制, 默认300, 超过时会变成 NoopSpan
跨进程支持 ContextCarrier
skywalking 需要解决一个重要问题的是"跨进程调用链" 的收集
例如有 A -> B 的一个 RPC 调用, ContextCarrier 的处理过程如下
==客户端==
- A 创建一个空的 ContextCarrier
- 通过 ContextManager#createExitSpan 创建一个 ExitSpan
- 将 ContextCarrier.items 所有元素放到调用过程的请求信息中, 如 HTTP HEADER, DUBBO attachments, kafka 消息的 header 中
==服务端==
- B 收到请求后 ContextCarrier 传输到 服务端后, 提取所有相关信息
- 通过 ContextManager#createEntrySpan 绑定服务端B 和客户端A
跨线程支持 ContextSnapshot
跨进程, 跨线程处理非常类似, 都需要传播上下文, 线程之间则不需要序列化数据
过程如下
- 使用 ContextManager#capture 获取 ContextSnapshot 对象
- 子线程通过 方法参数或由现有参数访问 ContextSnapshot
- 在子线程中使用 ContextManager#continue
ContextManager 处理机制
处理过程分为, 收集, 发送
收集过程
核心组件为: ThreadLocal<TracingContext> CONTEXT
, 用于收集 Span, 发送span 到skywalking 服务端
ContextManager#createEntrySpan 中通过 TracingContext#createEntrySpan 方法添加 Span, 其中 TracingContext 中维护了一个 LinkedList<AbstractSpan> 的栈
TracingContext#createEntrySpan 逻辑
@Override
public AbstractSpan createEntrySpan(final String operationName) {
// 判断是否超出限制
if (isLimitMechanismWorking()) {
NoopSpan span = new NoopSpan();
return push(span);
}
...
// 如果存在则提取父级span, 并开始 span 计时
final AbstractSpan parentSpan = peek();
if (parentSpan != null && parentSpan.isEntry()) {
...
entrySpan = parentSpan;
return entrySpan.start();
} else {
entrySpan = new EntrySpan(
spanIdGenerator++, parentSpanId,
operationName, owner
);
entrySpan.start();
// 添加到栈中 activeSpanStack.addLast(span);
return push(entrySpan);
}
}
发送 Span 逻辑
触发点: ContextManager#stopSpan(), 最终调用到 TracingContext#finish 方法, 主要逻辑如下
private void finish() {
if (isRunningInAsyncMode) {
asyncFinishLock.lock();
}
try {
// Span 调用 stop 时,会对 activeSpanStack进行 pop 操作 移除元素, 因此可以用于判断是否完成
boolean isFinishedInMainThread = activeSpanStack.isEmpty() && running;
if (isFinishedInMainThread) {
/*
* Notify after tracing finished in the main thread.
*/
TracingThreadListenerManager.notifyFinish(this);
}
// 异步处理部分
...
} finally {
if (isRunningInAsyncMode) {
asyncFinishLock.unlock();
}
}
}
这块可以看到, 最终使用的的是 TracingContext.ListenerManager 进行处理, 它维护多个 TracingContextListener, 在 TraceSegment 完成时进行调用, 其中最重要实现类为 TraceSegmentServiceClient
TraceSegmentServiceClient 实现了 IConsumer<TraceSegment>, 通过轻量级队列进行消息的消费
private volatile DataCarrier<TraceSegment> carrier;
消费逻辑如下, 通过 GRPC stream调用, 将 TraceSegment 转换为 grpc 用的 SegmentObject 发送到服务端
public void consume(List<TraceSegment> data) {
if (CONNECTED.equals(status)) {
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
StreamObserver<SegmentObject> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(
Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
).collect(new StreamObserver<Commands>() {
@Override
public void onNext(Commands commands) {
ServiceManager.INSTANCE.findService(CommandService.class)
.receiveCommand(commands);
}
@Override
public void onError(
Throwable throwable) {
status.finished();
...
}
@Override
public void onCompleted() {
status.finished();
}
});
try {
for (TraceSegment segment : data) {
SegmentObject upstreamSegment = segment.transform();
upstreamSegmentStreamObserver.onNext(upstreamSegment);
}
} catch (Throwable t) {
LOGGER.error(t, "Transform and send UpstreamSegment to collector fail.");
}
upstreamSegmentStreamObserver.onCompleted();
status.wait4Finish();
segmentUplinkedCounter += data.size();
} else {
segmentAbandonedCounter += data.size();
}
printUplinkStatus();
}
来源
- Apache Skywalking 实战