skywalking agent 初始化
核心概念
- BootService: 启动各个grpc客户端接口, 包含完整生命周期, prepare -> boot -> onComplete -> shutdown, 包含 TraceSegmentServiceClient, ServiceManagementClient 等等
- ServiceManager: 管理各种 BootService
入口类: SkyWalkingAgent, 通过使用 javaagent 的引导 SkyWalkingAgent#premain 方法进入, 并调用 ServiceManager.INSTANCE.boot(); 来启动各种 BootService
public static void premain(String agentArgs, Instrumentation instrumentation) throws PluginException {
...
try {
// 简单的调用各个booService 进行 prepare -> boot -> onComplete 操作
ServiceManager.INSTANCE.boot();
} catch (Exception e) {
LOGGER.error(e, "Skywalking agent boot failure.");
}
// 注册关闭钩子
Runtime.getRuntime()
.addShutdownHook(new Thread(ServiceManager.INSTANCE::shutdown, "skywalking service shutdown thread"));
...
}
java agent
使用 javaagent 需要几个步骤:
- 定义一个 MANIFEST.MF 文件,必须包含 Premain-Class 选项,通常也会加入Can-Redefine-Classes 和 Can-Retransform-Classes 选项。
- 创建一个Premain-Class 指定的类,类中包含 premain 方法,方法逻辑由用户自己确定。
- 将 premain 的类和 MANIFEST.MF 文件打成 jar 包。
- 使用参数 -javaagent:/jar包路径=[agentArgs 参数] 启动要代理的方法。
==premain== 方法用于 main 执行之前的预处理, 用于类的增强
字段如下描述, args 参数通过 -javaagent:xxx.jar==yyy 传入字符串, 如果需要修改字节码, 必须使用方式1, JVM 会优先加载 1 签名的方法,加载成功忽略 2,如果1 没有,加载 2 方法
// 1
public static void premain(String args, Instrumentation instrumentation)
// 2
public static void premain(String args)
探针 , 服务端的通信流程
上报信息分为 "注册通信" , "数据上报" 两部分
agent 包中和服务端的相关 grpc 通信服务类
- EventReportServiceClient: 服务事件上报(服务启动, 关闭)
- LogReportServiceClient: 日志上报
- ServiceManagementClient: 服务实例信息, 心跳上报
- TraceSegmentServiceClient: TraceSegment 上报
- JVMMetricsSender: jvm 信息上报
注册通信
用于上报服务的相关信息, 包含 服务名称, 实例信息, 核心逻辑在 ServiceManagementClient#run,
心跳定时器初始化在 prepare 中完成, 30 秒 执行一次
@Override
public void boot() {
heartbeatFuture = Executors.newSingleThreadScheduledExecutor(
new DefaultNamedThreadFactory("ServiceManagementClient")
).scheduleAtFixedRate(
new RunnableWithExceptionProtection(
this,
t -> LOGGER.error("unexpected exception.", t)
), 0, Config.Collector.HEARTBEAT_PERIOD,
TimeUnit.SECONDS
);
}
上报主体逻辑, 心跳部分 keepAlive 每30秒执行1次, 而实例信息上报 30(collector.heartbeat_period
) * 10(collector.properties_report_period_factor
) 一次, 这有效避免了服务端信息丢失时, 无法收集的情况
if (Math.abs(sendPropertiesCounter.getAndAdd(1)) % Config.Collector.PROPERTIES_REPORT_PERIOD_FACTOR == 0) {
managementServiceBlockingStub
.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
.reportInstanceProperties(InstanceProperties.newBuilder()
// 服务名称则是通过 agent.config 配置文件写入
.setService(Config.Agent.SERVICE_NAME)
// 实例名称如果没配置的话, 则会在 ServiceInstanceGenerator#prepare 中自动生成
.setServiceInstance(Config.Agent.INSTANCE_NAME)
// 服务属性通过 agent.config 配置文件写入
.addAllProperties(OSUtil.buildOSInfo(
Config.OsInfo.IPV4_LIST_SIZE))
.addAllProperties(SERVICE_INSTANCE_PROPERTIES)
.addAllProperties(LoadedLibraryCollector.buildJVMInfo())
.build());
} else {
final Commands commands = managementServiceBlockingStub.withDeadlineAfter(
GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
).keepAlive(InstancePingPkg.newBuilder()
.setService(Config.Agent.SERVICE_NAME)
.setServiceInstance(Config.Agent.INSTANCE_NAME)
.build());
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
}
实例属性的 Protobuf 消息定义
message InstanceProperties {
string service = 1;
string serviceInstance = 2;
repeated KeyStringValuePair properties = 3;
}
message KeyStringValuePair {
string key = 1;
string value = 2;
}
// grpc 服务定义
service ManagementService {
// 上报实例消息
rpc reportInstanceProperties (InstanceProperties) returns (Commands) {
}
// 保持心跳
rpc keepAlive (InstancePingPkg) returns (Commands) {
}
}
Endpoint收集部分呢, 8.7.0 未找到书中提及的 Endpoint 发送, 看起来是和 Trace 一起发送解析了, 这块到服务端再看下
jvm 信息上报
主体代码入口 JVMService#run, 执行流程
- JVMService#boot 中定义了名称为 JVMService-produce 的 Executor(每 1 秒执行一次) , 对应生产者
- 定义 JVMService-consume 的 Executor(每 1 秒执行一次) , 类为 JVMMetricsSender, 对应消费者
- JVMService#run 收集 jvm 信息 封装成 JVMMetric, 塞到 JVMMetricsSender#LinkedBlockingQueue 中完成生产行为
- JVMMetricsSender#run 将消息通过 grpc发送到服务端, 完成消费行为
数据上报
TraceSegment 的上报通过 TraceSegmentServiceClient 完成, 这块上个部分已大体有说明了, 这里主要关注下 初始化部分
@Override
public void boot() {
lastLogTime = System.currentTimeMillis();
segmentUplinkedCounter = 0;
segmentAbandonedCounter = 0;
// 初始化, channel 为 5, buffer 为300 的 DataCarrier, 使用失败重试策略
carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE, BufferStrategy.IF_POSSIBLE);
// 初始化 ConsumerThread 进行 TraceSegment 发送
carrier.consume(this, 1);
}
@Override
public void onComplete() {
// 添加监听器到 TracingContext, 用于 Trace 完成时通知执行 consume
TracingContext.ListenerManager.add(this);
}
@Override
public void shutdown() {
TracingContext.ListenerManager.remove(this);
carrier.shutdownConsumers();
}