一、Compile
编译源码请查看源码中的文档 How-to-build.md
-
首先确保你Maven的版本为3.6+,然后源码根目录执行下面命令
mvn clean package -DskipTests
-
编译成功后,将工程导入到 IDEA中,因为里面用到了proto,默认情况下IDE是识别不到这些代码的,如果你使用的是 IDEA工具,请安装下面插件。
确保target目录没有被隐藏,如果隐藏了,可通过 Setting - Editor - File Types 将 target 清除
二、OAP
OAP是SkyWalkingAgent 的服务端,启动一个基于H2存储简单的OAP服务器,运行 oap-server/server-starter中OAPServerStartUp类。
可以看到OAPServerStartUp类又调用了一个 OAPServerBootstrap 的类,这个类才是真正的入口类。
public class OAPServerStartUp {
public static void main(String[] args) {
OAPServerBootstrap.start();
}
}
OAPServerBootstrap.start()方法主要工作就是解析application.yml文件,加载配置,初始化各个模块。
public class OAPServerBootstrap {
public static void start() {
String mode = System.getProperty("mode");
RunningMode.setMode(mode);
ApplicationConfigLoader configLoader = new ApplicationConfigLoader();
ModuleManager manager = new ModuleManager();
try {
// 加载 application.yml 中的配置
ApplicationConfiguration applicationConfiguration = configLoader.load();
// 加载所有 Module 的实现类
manager.init(applicationConfiguration);
// 加载 Prometheus指标收集器
manager.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class)
.createGauge("uptime", "oap server start up time", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE)
// Set uptime to second
.setValue(System.currentTimeMillis() / 1000d);
if (RunningMode.isInitMode()) {
log.info("OAP starts up in init mode successfully, exit now...");
System.exit(0);
}
} catch (Throwable t) {
log.error(t.getMessage(), t);
System.exit(1);
}
}
}
上面的 manager.init() 方法,加载模块时,采用 Java SPI 机制,读取每个模块的 META-INF.services 下配置,获取实现类。
注意:值为
-
的模块不会被加载,例如下面模块将不会被加载,如果不注意的话会让你抓狂。详细请看我的文章 《Skywalking:定制化》node-exporter章节。prometheus-fetcher: selector: ${SW_PROMETHEUS_FETCHER:-}
public void init(
ApplicationConfiguration applicationConfiguration) throws ModuleNotFoundException, ProviderNotFoundException, ServiceNotProvidedException, CycleDependencyException, ModuleConfigException, ModuleStartException {
// 获取所有可用模块列表
String[] moduleNames = applicationConfiguration.moduleList();
// 加载ModuleDefine的实现类
ServiceLoader<ModuleDefine> moduleServiceLoader = ServiceLoader.load(ModuleDefine.class);
// 加载ModuleProvider的实现类
ServiceLoader<ModuleProvider> moduleProviderLoader = ServiceLoader.load(ModuleProvider.class);
HashSet<String> moduleSet = new HashSet<>(Arrays.asList(moduleNames));
for (ModuleDefine module : moduleServiceLoader) {
if (moduleSet.contains(module.name())) {
module.prepare(this, applicationConfiguration.getModuleConfiguration(module.name()), moduleProviderLoader);
loadedModules.put(module.name(), module);
moduleSet.remove(module.name());
}
}
// Finish prepare stage
isInPrepareStage = false;
if (moduleSet.size() > 0) {
throw new ModuleNotFoundException(moduleSet.toString() + " missing.");
}
BootstrapFlow bootstrapFlow = new BootstrapFlow(loadedModules);
bootstrapFlow.start(this);
bootstrapFlow.notifyAfterCompleted();
}
三、 Agent
Agent 作为代理端插件 ,协助 OAP 收集应用的日志,作为一个字节码插件 skywalking-agent.jar 完成这个工作,上面提到,当业务应用端部署时除了这个jar包,还需依赖 agent.config配置和其他插件 jar包。
public static void premain(String agentArgs, Instrumentation instrumentation) throws PluginException {
final PluginFinder pluginFinder;
//1. 初始化配置文件 agent.config,会在和skywalking-agent.jar同目录的config下查找
try {
SnifferConfigInitializer.initializeCoreConfig(agentArgs);
} catch (Exception e) {
// try to resolve a new logger, and use the new logger to write the error log here
LogManager.getLogger(SkyWalkingAgent.class)
.error(e, "SkyWalking agent initialized failure. Shutting down.");
return;
} finally {
// refresh logger again after initialization finishes
LOGGER = LogManager.getLogger(SkyWalkingAgent.class);
}
//2. 第一步: 读取agent.config的plugin.mount参数,从plugins,activations两个目录下加载插件
// 第二步: 每个插件下面都有一个skywalking-plugin.def文件(SPI模式),定义了此插件监控的各个点的类
try {
pluginFinder = new PluginFinder(new PluginBootstrap().loadPlugins());
} catch (AgentPackageNotFoundException ape) {
LOGGER.error(ape, "Locate agent.jar failure. Shutting down.");
return;
} catch (Exception e) {
LOGGER.error(e, "SkyWalking agent initialized failure. Shutting down.");
return;
}
//3. 初始化Byte Buddy字节码工具,此工具是新一代的字节码工具,性能优于ASM、javasisst
final ByteBuddy byteBuddy = new ByteBuddy().with(TypeValidation.of(Config.Agent.IS_OPEN_DEBUGGING_CLASS));
AgentBuilder agentBuilder = new AgentBuilder.Default(byteBuddy).ignore(
nameStartsWith("net.bytebuddy.")
.or(nameStartsWith("org.slf4j."))
.or(nameStartsWith("org.groovy."))
.or(nameContains("javassist"))
.or(nameContains(".asm."))
.or(nameContains(".reflectasm."))
.or(nameStartsWith("sun.reflect"))
.or(allSkyWalkingAgentExcludeToolkit())
.or(ElementMatchers.isSynthetic()));
JDK9ModuleExporter.EdgeClasses edgeClasses = new JDK9ModuleExporter.EdgeClasses();
try {
agentBuilder = BootstrapInstrumentBoost.inject(pluginFinder, instrumentation, agentBuilder, edgeClasses);
} catch (Exception e) {
LOGGER.error(e, "SkyWalking agent inject bootstrap instrumentation failure. Shutting down.");
return;
}
try {
agentBuilder = JDK9ModuleExporter.openReadEdge(instrumentation, agentBuilder, edgeClasses);
} catch (Exception e) {
LOGGER.error(e, "SkyWalking agent open read edge in JDK 9+ failure. Shutting down.");
return;
}
if (Config.Agent.IS_CACHE_ENHANCED_CLASS) {
try {
agentBuilder = agentBuilder.with(new CacheableTransformerDecorator(Config.Agent.CLASS_CACHE_MODE));
LOGGER.info("SkyWalking agent class cache [{}] activated.", Config.Agent.CLASS_CACHE_MODE);
} catch (Exception e) {
LOGGER.error(e, "SkyWalking agent can't active class cache.");
}
}
// 4. 初始化Agent工具
agentBuilder.type(pluginFinder.buildMatch())
.transform(new Transformer(pluginFinder))
.with(AgentBuilder.RedefinitionStrategy.RETRANSFORMATION)
.with(new RedefinitionListener())
.with(new Listener())
.installOn(instrumentation);
// 5. 启动所有插件服务,每个插件服务必须实现BootService接口类
try {
ServiceManager.INSTANCE.boot();
} catch (Exception e) {
LOGGER.error(e, "Skywalking agent boot failure.");
}
Runtime.getRuntime()
.addShutdownHook(new Thread(ServiceManager.INSTANCE::shutdown, "skywalking service shutdown thread"));
}
上面第5 步中 服务类,如果带@DefaultImplementor注解就是缺省的实现;如果带@OverrideImplementor注解,则会覆盖注解中的指定的某个实现类,从而产生一个新的实现类
@OverrideImplementor(TraceSegmentServiceClient.class)
public class KafkaTraceSegmentServiceClient implements BootService, IConsumer<TraceSegment>, TracingContextListener, KafkaConnectionStatusListener {
private static final ILog LOGGER = LogManager.getLogger(KafkaTraceSegmentServiceClient.class);
ServiceManager类,用来管理所有插件
private Map<Class, BootService> loadAllServices() {
Map<Class, BootService> bootedServices = new LinkedHashMap<>();
List<BootService> allServices = new LinkedList<>();
load(allServices);
for (final BootService bootService : allServices) {
Class<? extends BootService> bootServiceClass = bootService.getClass();
boolean isDefaultImplementor = bootServiceClass.isAnnotationPresent(DefaultImplementor.class);
if (isDefaultImplementor) {//有@DefaultImplementor注解
if (!bootedServices.containsKey(bootServiceClass)) {
bootedServices.put(bootServiceClass, bootService);
} else {
//ignore the default service
}
} else {//有@OverrideImplementor注解
OverrideImplementor overrideImplementor = bootServiceClass.getAnnotation(OverrideImplementor.class);
if (overrideImplementor == null) {
if (!bootedServices.containsKey(bootServiceClass)) {
bootedServices.put(bootServiceClass, bootService);
} else {
throw new ServiceConflictException("Duplicate service define for :" + bootServiceClass);
}
}
发送数据
大概分三类发送数据的客户端:
服务类客户端,如ServiceManagementClient
跟踪类客户端,如TraceSegmentServiceClient
指标类客户端,如MeterSender。
这三类客户端除前面讲的实现了BootService外,还实现了GRPCChannelListener接口,这个接口大概作用就是创建用于和服务端通信的通道。大概流程是在调用ServiceManager.boot()方法时,会调用每个服务的prepare()方法,收集通道监听器,然后遍历每个通道的statusChanged方法,初始化一个XxxServiceStub服务存根类,此类是由Protobuf序列化而成。
//实现了 GRPCChannelListener 类的prepare()、statusChanged()方法
@DefaultImplementor
public class MeterSender implements BootService, GRPCChannelListener {
private static final ILog LOGGER = LogManager.getLogger(MeterSender.class);
private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
private volatile MeterReportServiceGrpc.MeterReportServiceStub meterReportServiceStub;
@Override
public void prepare() {
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
}
public void send(Map<MeterId, BaseMeter> meterMap, MeterService meterService) {
if (status == GRPCChannelStatus.CONNECTED) {
StreamObserver<MeterData> reportStreamObserver = null;
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
try {
reportStreamObserver = meterReportServiceStub.withDeadlineAfter(
......
}
@Override
public void statusChanged(final GRPCChannelStatus status) {
if (GRPCChannelStatus.CONNECTED.equals(status)) {
Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
meterReportServiceStub = MeterReportServiceGrpc.newStub(channel);
} else {
meterReportServiceStub = null;
}
this.status = status;
}
}
Agent Instance
什么是Agent Instance呢?每通过skywalking-agent.jar启动一个应用,就会在OAP服务上注册一个服务,而这时就会在Agent端产生一个 Agent实例,它负责向OAP上报各种信息。ServiceManagementClient类就是做这些工作的,除了发送应用跟踪信息外,还会向OAP发送心跳等信息。
@DefaultImplementor
public class ServiceManagementClient implements BootService, Runnable, GRPCChannelListener {
private static final ILog LOGGER = LogManager.getLogger(ServiceManagementClient.class);
private static List<KeyStringValuePair> SERVICE_INSTANCE_PROPERTIES;
private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
private volatile ManagementServiceGrpc.ManagementServiceBlockingStub managementServiceBlockingStub;
private volatile ScheduledFuture<?> heartbeatFuture;
private volatile AtomicInteger sendPropertiesCounter = new AtomicInteger(0);
@Override
public void statusChanged(GRPCChannelStatus status) {
if (GRPCChannelStatus.CONNECTED.equals(status)) {
Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
managementServiceBlockingStub = ManagementServiceGrpc.newBlockingStub(channel);
} else {
managementServiceBlockingStub = null;
}
this.status = status;
}
@Override
public void prepare() {
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
SERVICE_INSTANCE_PROPERTIES = new ArrayList<>();
for (String key : Config.Agent.INSTANCE_PROPERTIES.keySet()) {
SERVICE_INSTANCE_PROPERTIES.add(KeyStringValuePair.newBuilder()
.setKey(key)
.setValue(Config.Agent.INSTANCE_PROPERTIES.get(key))
.build());
}
}
@Override
public void boot() {
heartbeatFuture = Executors.newSingleThreadScheduledExecutor(
new DefaultNamedThreadFactory("ServiceManagementClient")
).scheduleAtFixedRate(
new RunnableWithExceptionProtection(
this,
t -> LOGGER.error("unexpected exception.", t)
), 0, Config.Collector.HEARTBEAT_PERIOD,
TimeUnit.SECONDS
);
}
@Override
public void onComplete() {
}
@Override
public void shutdown() {
heartbeatFuture.cancel(true);
}
@Override
public void run() {
LOGGER.debug("ServiceManagementClient running, status:{}.", status);
if (GRPCChannelStatus.CONNECTED.equals(status)) {
try {
if (managementServiceBlockingStub != null) {
if (Math.abs(sendPropertiesCounter.getAndAdd(1)) % Config.Collector.PROPERTIES_REPORT_PERIOD_FACTOR == 0) {
managementServiceBlockingStub
.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
.reportInstanceProperties(InstanceProperties.newBuilder()
.setService(Config.Agent.SERVICE_NAME)
.setServiceInstance(Config.Agent.INSTANCE_NAME)
.addAllProperties(OSUtil.buildOSInfo(
Config.OsInfo.IPV4_LIST_SIZE))
.addAllProperties(SERVICE_INSTANCE_PROPERTIES)
.build());
} else {
final Commands commands = managementServiceBlockingStub.withDeadlineAfter(
GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
).keepAlive(InstancePingPkg.newBuilder()
.setService(Config.Agent.SERVICE_NAME)
.setServiceInstance(Config.Agent.INSTANCE_NAME)
.build());
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
}
}
} catch (Throwable t) {
LOGGER.error(t, "ServiceManagementClient execute fail.");
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t);
}
}
}
}
四、 Plugins
Skywalking中有大量的插件,正是这些插件帮助Skywalking建立起一个庞大的可观察森林。每个插件都有两个必须的类:
- 拦截点类(XxxInstrumentation),在那些方法上干活。
- 拦截器类(XxxInterceptor),具体干活的类。
拦截点
下面以Dubbo插件为例,DubboInstrumentation拦截点实现了三个方法:enhanceClass(增强类)、getConstructorsInterceptPoints(拦截构造方法)、getInstanceMethodsInterceptPoints(拦截实例方法)
public class DubboInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
//被拦截的类,也就是需要增强的类
private static final String ENHANCE_CLASS = "com.alibaba.dubbo.monitor.support.MonitorFilter";
//拦截器类
private static final String INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.dubbo.DubboInterceptor";
@Override
protected ClassMatch enhanceClass() {
return NameMatch.byName(ENHANCE_CLASS);
}
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return null;
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[] {
new InstanceMethodsInterceptPoint() {
// 获取匹配到的拦截方法,WitnessClass
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("invoke");
}
// 返回拦截器
@Override
public String getMethodsInterceptor() {
return INTERCEPT_CLASS;
}
// 是否要对原方法参数做修改
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}
}
如何解决多版本问题
当应用存在多个版本时,Skywalking为应用的每个版本都写一个相对应插件版本,例如针对 spring mvc不同版本有以下插件:
spring3.x => mvc-annotation-3.x-plugin
spring4.x => mvc-annotation-4.x-plugin
spring5.x => mvc-annotation-5.x-plugin
那么有个问题来了,比如我用spring4.x时,这时会把3个插件都加载上,如何值加载 mvc-annotation-4.x-plugin插件呢?这时用到了一个技术叫 WitnessClass,原理很简单,就是在每个插件中定义 WITHNESS_CLASSES变量,并定义一个能区分出版本独特的类。然后在应用启动时,在加载类集合里是否能找到WITHNESS_CLASSES变量定义的类,如果找对了,那么也找到了对应的版本的插件。
拦截器
DubboInterceptor用来定义对增强类做什么处理
public class DubboInterceptor implements InstanceMethodsAroundInterceptor {
/**
* <h2>Consumer:</h2> The serialized trace context data will
* inject to the {@link RpcContext#attachments} for transport to provider side.
* <p>
* <h2>Provider:</h2> The serialized trace context data will extract from
* {@link RpcContext#attachments}. current trace segment will ref if the serialize context data is not null.
*/
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
MethodInterceptResult result) throws Throwable {
Invoker invoker = (Invoker) allArguments[0];
Invocation invocation = (Invocation) allArguments[1];
RpcContext rpcContext = RpcContext.getContext();
boolean isConsumer = rpcContext.isConsumerSide();
URL requestURL = invoker.getUrl();
AbstractSpan span;
final String host = requestURL.getHost();
final int port = requestURL.getPort();
if (isConsumer) {
final ContextCarrier contextCarrier = new ContextCarrier();
span = ContextManager.createExitSpan(generateOperationName(requestURL, invocation), contextCarrier, host + ":" + port);
//invocation.getAttachments().put("contextData", contextDataStr);
//@see https://github.com/alibaba/dubbo/blob/dubbo-2.5.3/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcInvocation.java#L154-L161
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
rpcContext.getAttachments().put(next.getHeadKey(), next.getHeadValue());
if (invocation.getAttachments().containsKey(next.getHeadKey())) {
invocation.getAttachments().remove(next.getHeadKey());
}
}
} else {
ContextCarrier contextCarrier = new ContextCarrier();
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {
next = next.next();
next.setHeadValue(rpcContext.getAttachment(next.getHeadKey()));
}
span = ContextManager.createEntrySpan(generateOperationName(requestURL, invocation), contextCarrier);
}
Tags.URL.set(span, generateRequestURL(requestURL, invocation));
span.setComponent(ComponentsDefine.DUBBO);
SpanLayer.asRPCFramework(span);
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
Object ret) throws Throwable {
Result result = (Result) ret;
if (result != null && result.getException() != null) {
dealException(result.getException());
}
ContextManager.stopSpan();
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
Class<?>[] argumentsTypes, Throwable t) {
dealException(t);
}
/**
* Log the throwable, which occurs in Dubbo RPC service.
*/
private void dealException(Throwable throwable) {
AbstractSpan span = ContextManager.activeSpan();
span.log(throwable);
}
/**
* Format operation name. e.g. org.apache.skywalking.apm.plugin.test.Test.test(String)
*
* @return operation name.
*/
private String generateOperationName(URL requestURL, Invocation invocation) {
StringBuilder operationName = new StringBuilder();
String groupStr = requestURL.getParameter(Constants.GROUP_KEY);
groupStr = StringUtil.isEmpty(groupStr) ? "" : groupStr + "/";
operationName.append(groupStr);
operationName.append(requestURL.getPath());
operationName.append("." + invocation.getMethodName() + "(");
for (Class<?> classes : invocation.getParameterTypes()) {
operationName.append(classes.getSimpleName() + ",");
}
if (invocation.getParameterTypes().length > 0) {
operationName.delete(operationName.length() - 1, operationName.length());
}
operationName.append(")");
return operationName.toString();
}
/**
* Format request url. e.g. dubbo://127.0.0.1:20880/org.apache.skywalking.apm.plugin.test.Test.test(String).
*
* @return request url.
*/
private String generateRequestURL(URL url, Invocation invocation) {
StringBuilder requestURL = new StringBuilder();
requestURL.append(url.getProtocol() + "://");
requestURL.append(url.getHost());
requestURL.append(":" + url.getPort() + "/");
requestURL.append(generateOperationName(url, invocation));
return requestURL.toString();
}
}
五、GraphQL
GraphQL 是一种新的API 查询语言,由Facebook开源,对前端提供少许接口,就可以查询整个系统的数据,且可以实现按需返回。
后台需要先定义一个 schema.graphqls
的文件,下面定义了两个接口:
- findAuthorById 返回 Author对象,其中这个对象又关联了 Book对象
- saveBook 保存Book接口,它的输入参数由 input 来定义
#查询接口
type Query {
findAuthorById(id:Long!): Author
}
# 更新接口
type Mutation {
saveBook(input: BookInput!) : Book!
}
# Author 对象
type Author {
#作者Id
id: Long!
#创建时间
createdTime: String
#名
firstName: String
#姓
lastName: String
#该作者的所有书籍
books: [Book]
}
# 输入参数
input BookInput {
title: String!
isbn: String!
pageCount: Int
authorId: Long
}
# Book对象
type Book {
id: Long!
title: String!
isbn: String!
pageCount: Int
author: Author
}
对于查询类接口和 更新类接口继承的父类是不一样的,比如查询是继承GraphQLQueryResolver,更新是继承GraphQLMutationResolver
@Component
@AllArgsConstructor
public class Query implements GraphQLQueryResolver {
private AuthorRepo authorRepo;
public Author findAuthorById(Long id) {
return authorRepo.findAuthorById(id);
}
}
@Component
@AllArgsConstructor
public class Mutation implements GraphQLMutationResolver {
private BookRepo bookRepo;
public Book saveBook(BookInput input) {
Book book = new Book();
book.setTitle(input.getTitle());
book.setIsbn(input.getIsbn());
book.setPageCount(input.getPageCount());
book.setAuthorId(input.getAuthorId());
return bookRepo.save(book);
}
}
后台定义好了GraphQL 接口,前端请求 http://x.x.x.x/graphql ,输入请求参数:
{
findAuthorById(id: 3){
id
firstName
lastName
}
}
在Skywalking中,有一个插件: query-graphql-plugin,就相当于一个graphql,支撑整个 UI的查询,访问前端 Web时,F12看下发现调用的都是 …/graphql 的接口。
六、OpenTelemeTry
在了解OpenTelemeTry之前,先说说 OpenTracing 和 OpenCensus
OpenTracing:是CNCF的第三个项目,其目的是制定一套标准的分布式追踪协议,可谓是Tracing界的slf4j。
OpenCensus: 除了OpenTracing 链路跟踪外,还有一类 Metrics监控指标也是经常用到的,例如cpu、内存、硬盘、网络请求延迟、错误率、用户数、访问数、订单数等各类指标。没错OpenCensus就是将链路跟踪和指标监控都囊括了。
所谓分久必合,OpenTelemetry的横空出世,结束了监控界的纷争乱世,并以“可观察性”全新定义了监控技术,重塑了监控规范,实现了Metrics、Tracing、Logging 的大融合,俨然成了监控界的大佬。
Skywalking收集 VM (操作系统)指标,没有自己实现,是通过 prometheus 的 node-exporter 完成的,并借助OpenTelemeTry 将Skywalking 和 prometheus 整合到一块。
otel-collector-config.yaml
Receiver做为采集端,Exporter作为数据的输出端,Opentelemetry-collector充当一个中间媒介将两者统一起来
receivers:
prometheus:
config:
scrape_configs:
- job_name: 'otel-collector'
scrape_interval: 10s
static_configs:
- targets: [ 'vm-1:9100' ]
- targets: [ 'vm-2:9100' ]
- targets: [ 'vm-3:9100' ]
processors:
batch:
exporters:
opencensus:
endpoint: "oap:11800" # The OAP Server address
insecure: true
# Exports data to the console
logging:
logLevel: debug
service:
pipelines:
metrics:
receivers: [prometheus]
processors: [batch]
exporters: [opencensus,logging]
在Skywalking中是通过 proto文件方式来集成opentelemetry的,当然你可以通过 集成opentelemetry sdk来接收node-exporter的数据。Skywalking重写了 MetricsServiceGrpc.MetricsServiceImplBase的exporer方法,此方法就是专门接收collector发过来的数据,只不过它转了一次转换,将OpenTelemetry 数据格式转换为了Skywalking的meter格式。
@Slf4j
public class OCMetricHandler extends MetricsServiceGrpc.MetricsServiceImplBase implements Handler {
private List<PrometheusMetricConverter> metrics;
@Override public StreamObserver<ExportMetricsServiceRequest> export(
StreamObserver<ExportMetricsServiceResponse> responseObserver) {
return new StreamObserver<ExportMetricsServiceRequest>() {
private Node node;
private Map<String, String> nodeLabels = new HashMap<>();
@Override
public void onNext(ExportMetricsServiceRequest request) {
....
}
@Override public void onError(Throwable throwable) {
}
@Override public void onCompleted() {
responseObserver.onCompleted();
}
};
}
.....
@Override public void active(List<String> enabledRules,
MeterSystem service, GRPCHandlerRegister grpcHandlerRegister) {
List<Rule> rules;
try {
rules = Rules.loadRules("otel-oc-rules", enabledRules);
} catch (ModuleStartException e) {
log.warn("failed to load otel-oc-rules");
return;
}
if (rules.isEmpty()) {
return;
}
this.metrics = rules.stream().map(r ->
new PrometheusMetricConverter(r, service))
.collect(toList());
grpcHandlerRegister.addHandler(this);
}
}
Application
在Skywalking 源码工程中创建一个模块apm-webapp(已经存在的有问题,我就删掉了),然后把skywalking-ui工程下的dist目录拷贝到 resources下面,并改名为public,这样这个工程即可以访问 UI ,又能集成skywalking-agent.jar测试 。此工程已经上传到gitee上,请点击这里下载。
在工程启动 VM 参数添加
-javaagent:skywalking-agent\skywalking-agent.jar -Dskywalking.agent.service_name=webapp -Dskywalking.agent.instance_name=webapp -Dskywalking.collector.backend_service=127.0.0.1:11800
访问 UI