使用pinpoint
进行全链路监控时,支持对请求的采样,某条请求是否被采样,取决于整个链路开始的机器。该机器使用特定的采样算法。采样的标志会一直在链路中透传。比如在http里面,会在header里面增加Pinpoint-Sampled
字段,使用不同的值表示是否采样。
s0:此条请求不采样
s1:此条请求采样
下面分为接收端和发送端,分别看下采样的处理。另外在记录span
或者spanEvent
时,需要注意对是否采样的判断。
接收端采样处理
接收端接收到请求之后,会去查看请求里面是否有Pinpoint-Sampled
字段和Pinpoint-TraceID
等字段。分为下面几种情况:
- 有
Pinpoint-Sampled
字段,并且值为s0
,表示此条请求不采样。 - 没有
Pinpoint-Sampled
字段,但是有Pinpoint-TraceID
等字段,表示此条请求被采样。 - 没有
Pinpoint-Sampled
字段,也没有Pinpoint-TraceID
等字段,认为接收该请求的机器,是整条链路的第一个机器,或者链路信息在前面有丢失。
第一种情况,处理比较简单,调用下面的方法,创建DisableTrace
,表示此Trace
不被采样,并绑定到线程上下文中。
final Trace trace = this.traceContext.disableSampling();
traceFactory.disableSampling();
final Trace trace = this.baseTraceFactory.disableSampling();
注:使用缩进表示嵌套关系
第二种情况,调用this.baseTraceFactory.continueTraceObject
里面创建DefaultTrace
或者AsyncTrace
,指定sampling
字段为true
,表示采样。
continueTrace(request, traceHeader)
this.traceContext.continueTraceObject(traceId);
this.baseTraceFactory.continueTraceObject(traceId);
第三种情况,在调用this.baseTraceFactory.newTraceObject()
时,使用配置的sampler
,进行采样判断:如果采样则创建DefaultTrace
对象,并且sampling
字段为true
,如果不采样则创建DisableTrace
对象。
final Trace trace = this.baseTraceFactory.newTraceObject();
public class DefaultBaseTraceFactory implements BaseTraceFactory {
...
@Override
public Trace newTraceObject() {
// TODO need to modify how to inject a datasender
final boolean sampling = sampler.isSampling();
if (sampling) {
final TraceRoot traceRoot = traceRootFactory.newTraceRoot();
final Span span = spanFactory.newSpan(traceRoot);
final Storage storage = storageFactory.createStorage(traceRoot);
final CallStack<SpanEvent> callStack = callStackFactory.newCallStack();
final TraceId traceId = traceRoot.getTraceId();
final SpanRecorder spanRecorder = recorderFactory.newSpanRecorder(span, traceId.isRoot(), sampling);
final WrappedSpanEventRecorder wrappedSpanEventRecorder = recorderFactory.newWrappedSpanEventRecorder(traceRoot);
final ActiveTraceHandle handle = registerActiveTrace(traceRoot);
final DefaultTrace trace = new DefaultTrace(span, callStack, storage, sampling, spanRecorder, wrappedSpanEventRecorder, handle);
return trace;
} else {
return newDisableTrace();
}
}
...
}
下面重点看下SamplingRateSampler
类的代码,该类继承了Sampler
接口,代码比较简单:使用累加的计数,当累加出来的数为配置的samplingRate
的倍数时,表示该请求被采样。
public class SamplingRateSampler implements Sampler {
private final AtomicInteger counter = new AtomicInteger(0);
private final int samplingRate;
public SamplingRateSampler(int samplingRate) {
if (samplingRate <= 0) {
throw new IllegalArgumentException("Invalid samplingRate " + samplingRate);
}
this.samplingRate = samplingRate;
}
@Override
public boolean isSampling() {
int samplingCount = MathUtils.fastAbs(counter.getAndIncrement());
int isSampling = samplingCount % samplingRate;
return isSampling == 0;
}
}
发送端采样处理
请求发送到下游的机器之前,会从当前上下文里面获取Trace
对象,并判断是否sampling
。以httpclient4 plugin
的 拦截器 HttpRequestExecutorExecuteMethodInterceptor
举例:
public class HttpRequestExecutorExecuteMethodInterceptor implements AroundInterceptor {
public void before(Object target, Object[] args) {
....
final Trace trace = traceContext.currentRawTraceObject();
if (trace == null) {
return;
}
final HttpRequest httpRequest = getHttpRequest(args);
final NameIntValuePair<String> host = getHost();
final boolean sampling = trace.canSampled();
if (!sampling) {
if (httpRequest != null) {
this.requestTraceWriter.write(httpRequest);
}
return;
}
final SpanEventRecorder recorder = trace.traceBlockBegin();
TraceId nextId = trace.getTraceId().getNextTraceId();
recorder.recordNextSpanId(nextId.getSpanId());
recorder.recordServiceType(HttpClient4Constants.HTTP_CLIENT_4);
if (httpRequest != null) {
final String hostString = getHostString(host.getName(), host.getValue());
this.requestTraceWriter.write(httpRequest, nextId, hostString);
}
InterceptorScopeInvocation invocation = interceptorScope.getCurrentInvocation();
if (invocation != null) {
invocation.getOrCreateAttachment(HttpCallContextFactory.HTTPCALL_CONTEXT_FACTORY);
}
}
...
}
public class DefaultRequestTraceWriter<T> implements RequestTraceWriter<T> {
...
@Override
public void write(T header, final TraceId traceId, final String host) {
Assert.requireNonNull(traceId, "traceId must not be null");
if (isDebug) {
logger.debug("Set request header. traceId={}, applicationName={}, serverTypeCode={}, applicationNamespace={}", traceId, applicationName, serverTypeCode, applicationNamespace);
}
clientHeaderAdaptor.setHeader(header, Header.HTTP_TRACE_ID.toString(), traceId.getTransactionId());
clientHeaderAdaptor.setHeader(header, Header.HTTP_SPAN_ID.toString(), String.valueOf(traceId.getSpanId()));
clientHeaderAdaptor.setHeader(header, Header.HTTP_PARENT_SPAN_ID.toString(), String.valueOf(traceId.getParentSpanId()));
clientHeaderAdaptor.setHeader(header, Header.HTTP_FLAGS.toString(), String.valueOf(traceId.getFlags()));
clientHeaderAdaptor.setHeader(header, Header.HTTP_PARENT_APPLICATION_NAME.toString(), applicationName);
clientHeaderAdaptor.setHeader(header, Header.HTTP_PARENT_APPLICATION_TYPE.toString(), Short.toString(serverTypeCode));
if (applicationNamespace != null) {
clientHeaderAdaptor.setHeader(header, Header.HTTP_PARENT_APPLICATION_NAMESPACE.toString(), applicationNamespace);
}
if (host != null) {
clientHeaderAdaptor.setHeader(header, Header.HTTP_HOST.toString(), host);
}
}
...
}
如果当前的Trace
的sampling = false
直接在http头部写入Pinpoint-Sampled=s0
, 其他信息都不传递。如果为true,则不设置Pinpoint-Sampled
字段,只设置其他trace相关字段,比如Header.HTTP_TRACE_ID
等。
对于采样的判断:
如果Trace
的sampling=false
,则该Trace
不支持记录span
以及spanEvent
,所以在需要使用span
以及spanEvent
的地方,需要去判断当前Trace
是否被采样, 比如:
final Trace trace = createTrace(request);
if (trace == null) {
return;
}
if (!trace.canSampled()) {
return;
}
....
final Trace trace = this.requestTraceReader.read(request);
if (trace.canSampled()) {
final SpanRecorder recorder = trace.getSpanRecorder();
// record root span
recorder.recordServiceType(this.serviceType);
recorder.recordApi(SERVLET_SYNC_METHOD_DESCRIPTOR);
this.serverRequestRecorder.record(recorder, request);
// record proxy HTTP header.
this.proxyHttpHeaderRecorder.record(recorder, request);
}
return trace;
本文从发送端和接收端两个角度分别简单介绍了pinpoint
的采样原理。