skywalking agent 初始化, 数据上报

skywalking agent 初始化

核心概念

  • BootService: 启动各个grpc客户端接口, 包含完整生命周期, prepare -> boot -> onComplete -> shutdown, 包含 TraceSegmentServiceClient, ServiceManagementClient 等等
  • ServiceManager: 管理各种 BootService

入口类: SkyWalkingAgent, 通过使用 javaagent 的引导 SkyWalkingAgent#premain 方法进入, 并调用 ServiceManager.INSTANCE.boot(); 来启动各种 BootService

public static void premain(String agentArgs, Instrumentation instrumentation) throws PluginException {
    ...
    try {
    // 简单的调用各个booService 进行 prepare -> boot -> onComplete 操作
        ServiceManager.INSTANCE.boot();
    } catch (Exception e) {
        LOGGER.error(e, "Skywalking agent boot failure.");
    }
    // 注册关闭钩子
    Runtime.getRuntime()
            .addShutdownHook(new Thread(ServiceManager.INSTANCE::shutdown, "skywalking service shutdown thread"));
    ...
}

java agent

使用 javaagent 需要几个步骤:

  1. 定义一个 MANIFEST.MF 文件,必须包含 Premain-Class 选项,通常也会加入Can-Redefine-Classes 和 Can-Retransform-Classes 选项。
  2. 创建一个Premain-Class 指定的类,类中包含 premain 方法,方法逻辑由用户自己确定。
  3. 将 premain 的类和 MANIFEST.MF 文件打成 jar 包。
  4. 使用参数 -javaagent:/jar包路径=[agentArgs 参数] 启动要代理的方法。

==premain== 方法用于 main 执行之前的预处理, 用于类的增强

字段如下描述, args 参数通过 -javaagent:xxx.jar==yyy 传入字符串, 如果需要修改字节码, 必须使用方式1, JVM 会优先加载 1 签名的方法,加载成功忽略 2,如果1 没有,加载 2 方法

// 1 
public static void premain(String args, Instrumentation instrumentation)
// 2
public static void premain(String args)

探针 , 服务端的通信流程

上报信息分为 "注册通信" , "数据上报" 两部分

agent 包中和服务端的相关 grpc 通信服务类

  • EventReportServiceClient: 服务事件上报(服务启动, 关闭)
  • LogReportServiceClient: 日志上报
  • ServiceManagementClient: 服务实例信息, 心跳上报
  • TraceSegmentServiceClient: TraceSegment 上报
  • JVMMetricsSender: jvm 信息上报

注册通信

用于上报服务的相关信息, 包含 服务名称, 实例信息, 核心逻辑在 ServiceManagementClient#run,

心跳定时器初始化在 prepare 中完成, 30 秒 执行一次

@Override
public void boot() {
    heartbeatFuture = Executors.newSingleThreadScheduledExecutor(
            new DefaultNamedThreadFactory("ServiceManagementClient")
    ).scheduleAtFixedRate(
            new RunnableWithExceptionProtection(
                    this,
                    t -> LOGGER.error("unexpected exception.", t)
            ), 0, Config.Collector.HEARTBEAT_PERIOD,
            TimeUnit.SECONDS
    );
}

上报主体逻辑, 心跳部分 keepAlive 每30秒执行1次, 而实例信息上报 30(collector.heartbeat_period) * 10(collector.properties_report_period_factor) 一次, 这有效避免了服务端信息丢失时, 无法收集的情况

if (Math.abs(sendPropertiesCounter.getAndAdd(1)) % Config.Collector.PROPERTIES_REPORT_PERIOD_FACTOR == 0) {
    managementServiceBlockingStub
        .withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
        .reportInstanceProperties(InstanceProperties.newBuilder()
                                                    // 服务名称则是通过 agent.config 配置文件写入
                                                    .setService(Config.Agent.SERVICE_NAME)
                                                    // 实例名称如果没配置的话, 则会在 ServiceInstanceGenerator#prepare 中自动生成
                                                    .setServiceInstance(Config.Agent.INSTANCE_NAME)
                                                    // 服务属性通过 agent.config 配置文件写入
                                                    .addAllProperties(OSUtil.buildOSInfo(
                                                        Config.OsInfo.IPV4_LIST_SIZE))
                                                    .addAllProperties(SERVICE_INSTANCE_PROPERTIES)
                                                    .addAllProperties(LoadedLibraryCollector.buildJVMInfo())
                                                    .build());
} else {
    final Commands commands = managementServiceBlockingStub.withDeadlineAfter(
        GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
    ).keepAlive(InstancePingPkg.newBuilder()
                               .setService(Config.Agent.SERVICE_NAME)
                               .setServiceInstance(Config.Agent.INSTANCE_NAME)
                               .build());

    ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
}

实例属性的 Protobuf 消息定义

message InstanceProperties {
    string service = 1;
    string serviceInstance = 2;
    repeated KeyStringValuePair properties = 3;
}

message KeyStringValuePair {
    string key = 1;
    string value = 2;
}

// grpc 服务定义
service ManagementService {
    // 上报实例消息
    rpc reportInstanceProperties (InstanceProperties) returns (Commands) {
    }

    // 保持心跳
    rpc keepAlive (InstancePingPkg) returns (Commands) {

    }
}

Endpoint收集部分呢, 8.7.0 未找到书中提及的 Endpoint 发送, 看起来是和 Trace 一起发送解析了, 这块到服务端再看下

jvm 信息上报

主体代码入口 JVMService#run, 执行流程

  1. JVMService#boot 中定义了名称为 JVMService-produce 的 Executor(每 1 秒执行一次) , 对应生产者
  2. 定义 JVMService-consume 的 Executor(每 1 秒执行一次) , 类为 JVMMetricsSender, 对应消费者
  3. JVMService#run 收集 jvm 信息 封装成 JVMMetric, 塞到 JVMMetricsSender#LinkedBlockingQueue 中完成生产行为
  4. JVMMetricsSender#run 将消息通过 grpc发送到服务端, 完成消费行为

数据上报

TraceSegment 的上报通过 TraceSegmentServiceClient 完成, 这块上个部分已大体有说明了, 这里主要关注下 初始化部分

 @Override
public void boot() {
    lastLogTime = System.currentTimeMillis();
    segmentUplinkedCounter = 0;
    segmentAbandonedCounter = 0;
    // 初始化, channel 为 5, buffer 为300 的 DataCarrier, 使用失败重试策略
    carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE, BufferStrategy.IF_POSSIBLE);
    // 初始化 ConsumerThread 进行 TraceSegment 发送
    carrier.consume(this, 1);
}

@Override
public void onComplete() {
    // 添加监听器到 TracingContext, 用于 Trace 完成时通知执行 consume
    TracingContext.ListenerManager.add(this);
}

@Override
public void shutdown() {
    TracingContext.ListenerManager.remove(this);
    carrier.shutdownConsumers();
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容