skywalking metrics 处理

概述

OAP 通信模型指的是后端 OAP 节点之间的分布式计算流模型

为什么不通过 agent 端直接通过服务发现调用 OAP 集群的服务, 转而通过 http 或 grpc 来实现呢, 有以下原因

  1. OAP 集群和业务系统可能是不同的VPC(专有网络 Virtual Private Cloud), 此时简单的服务发现不能很好的工作
  2. OAP 集群支持多种集群管理组件间的切换(etcd, cosul, k8s), 如果在探针端实现, 会使得探针臃肿且对更新使用造成不便
  3. 探针端对服务端的负载均衡则可以通过 envoy, nginx 来实现

Metrics 计算属于分布式聚合计算, 目前 OAP 计算流拆分为两个部分

  1. 数据接收和解析, 进行当前OAP节点内的数据聚合, 使用 OAL 或 硬编码方式
  2. 分布式聚合, 根据一定的路由规则, 将步骤1的数据路由到指定节点, 进行二次汇集, 这也是 OAP 节点需要服务发现的原因

根据以上两个部分, OAP 节点存在以下两种角色

  1. Receiver: 处理步骤1
  2. Aggregator: 处理步骤2

为了减少部署难度, 目前所有节点都会使用 Mixed 节点(包含 Receiver 和 Aggregator), 大规模部署情况下, 可以根据网络流量选择分离, 进行两级部署

Metrics流处理

目前只有 Metrics 类型的数据使用到了这块通信模型, 因为 Metrics 是计算资源消耗最大的分布式计算, 采用的是 hash select 的路由策略, 根据服务id, EndpointId 来选择对应的 OAP Server

路由策略现在不支持配置, 且 Rolling, ForeverFirst 暂无使用, v8.7.0 版本

==Metrics处理流程大致如下==

  1. oap 初始化: 加载 *.oal 生成对应的代码, 并注册到 DispatcherManagerMetricsStreamProcessor
  2. DispatcherManager 负责将 agent 传过来的数据进行转换(1 -> N), 并传输到 MetricsStreamProcessor 进行处理
  3. MetricsStreamProcessor 中提取类型对应的 MetricsAggregateWorker通过 grpc调用远程 或 本地 OAP 节点进行数据的存储

oal初始化和数据到 MetricsStreamProcessor 的流程之前已经有说明, 这里着重说明下 MetricsStreamProcessor 中的处理, 分以下部分

  1. 远程节点的注册和获取
  2. receiver汇集后的数据是如何发送的
  3. hour 和 day 的 向下采样(downsample) 是如何实现的
  4. Aggregator 收到数据后如何处理

远程节点的注册和获取

数据发送必须先涉及 oap Aggregator 节点的选择, 此块由 RemoteClientManager 负责

==获取部分==

初始化过程如下, 在 CoreModuleProider 中启动, 每5秒执行一次远程节点的对比, 不一样时进行重新构建, 远程节点分为 本地节点,远程节点两类(需要初始化grpc)

// RemoteClientManager#start
public void start() {
    Optional.ofNullable(sslContext).ifPresent(DynamicSslContext::start);
    Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this::refresh, 1, 5, TimeUnit.SECONDS);
}

void refresh() {
    ...
    try {
        // 获取 clusterModule 模块获取集群节点查询服务
        this.clusterNodesQuery = moduleDefineHolder.find(ClusterModule.NAME).provider().getService(ClusterNodesQuery.class);
        ...
        // 或者注册的所有节点
        List<RemoteInstance> instanceList = clusterNodesQuery.queryRemoteNodes();
        instanceList = distinct(instanceList);
        Collections.sort(instanceList);
        ...
        if (!compare(instanceList)) {
            // 重新构建 grpc client
            reBuildRemoteClients(instanceList);
        }

        printRemoteClientList();
    } catch (Throwable t) {
        LOGGER.error(t.getMessage(), t);
    }
}

private void reBuildRemoteClients(List<RemoteInstance> remoteInstances) {
    // 合并本地client 和远程 client 后进行处理
    ...
    remoteClientCollection.forEach((address, clientAction) -> {
        switch (clientAction.getAction()) {
            case Unchanged:
                newRemoteClients.add(clientAction.getRemoteClient());
                break;
            case Create:
                if (address.isSelf()) {
                    // 节点为本身, 直接添加
                    RemoteClient client = new SelfRemoteClient(moduleDefineHolder, address);
                    newRemoteClients.add(client);
                } else {
                    // 远程节点, 初始化grpc 连接
                    RemoteClient client;
                    client = new GRPCRemoteClient(moduleDefineHolder, address, 1, 3000, remoteTimeout, sslContext);
                    client.connect();
                    newRemoteClients.add(client);
                }
                break;
        }
    });
    Collections.sort(newRemoteClients);
    this.usingClients = ImmutableList.copyOf(newRemoteClients);
    // 后续已关闭节点的 grpc close 操作
    ...
}

==注册部分==

获取过程如上述描述, 注册过程则在 CoreModuleProvider#start 中实行, 根据配置的角色名称, 判断是否为 Aggregator 类型或 Mix 类型, 如果是的话执行注册逻辑

public void start() throws ModuleStartException {
    if (CoreModuleConfig.Role.Mixed.name()
                                   .equalsIgnoreCase(
                                       moduleConfig.getRole())
        || CoreModuleConfig.Role.Aggregator.name()
                                           .equalsIgnoreCase(
                                               moduleConfig.getRole())) {
        RemoteInstance gRPCServerInstance = new RemoteInstance(gRPCServerInstanceAddress);
        this.getManager()
            .find(ClusterModule.NAME)
            .provider()
            .getService(ClusterRegister.class)
            .registerRemote(gRPCServerInstance);
    }
}

receiver 角色的发送

发送部分逻辑 MetricsStreamProcessor#in, 流程如下

  1. 找到持有远程连接 MetricsRemoteWorker 的 MetricsAggregateWorker
  2. MetricsAggregateWorker#in 消息进入轻量级队列
  3. 通过 DataCarrier 指定的Consumer 来消费this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get(name), new AggregatorConsumer());
  4. 链路最终调用到 MetricsAggregateWorker#onWork
private void onWork(List<Metrics> metricsList) {
    metricsList.forEach(metrics -> {
        aggregationCounter.inc();
        // 完成 receiver 阶段数据合并
        mergeDataCache.accept(metrics);
    });

    flush();
}

private void flush() {
    long currentTime = System.currentTimeMillis();
    if (currentTime - lastSendTime > l1FlushPeriod) {
        mergeDataCache.read().forEach(
            data -> {
                if (log.isDebugEnabled()) {
                    log.debug(data.toString());
                }
                // 此处 nextWorker 为持有 RemoteSenderService 的 MetricsRemoteWorker
                nextWorker.in(data);
            }
        );
        lastSendTime = currentTime;
    }
}

MetricsRemoteWorker 使用 RemoteSenderService#send, 进行消息的发送, 最终推送到 GRPCRemoteClient#push, 进入 GRPCRemoteClient 的轻量级队列进行消费处理

// GRPCRemoteClient#push 生产过程
@Override
public void push(String nextWorkerName, StreamData streamData) {
    RemoteMessage.Builder builder = RemoteMessage.newBuilder();
    // 下一个worker名称
    builder.setNextWorkerName(nextWorkerName);
    // 信息序列化
    builder.setRemoteData(streamData.serialize());

    this.getDataCarrier().produce(builder.build());
}

// 消费过程, 发送grpc消息都远程服务端
@Override
public void consume(List<RemoteMessage> remoteMessages) {
    try {
        StreamObserver<RemoteMessage> streamObserver = createStreamObserver();
        for (RemoteMessage remoteMessage : remoteMessages) {
            remoteOutCounter.inc();
            streamObserver.onNext(remoteMessage);
        }
        streamObserver.onCompleted();
    } catch (Throwable t) {
        remoteOutErrorCounter.inc();
        log.error(t.getMessage(), t);
    }
}

receiver角色 使用的 RemoteClient 如果是本地地址, 则不进行grpc调用, 简化流程直接调用到 MetricsPersistentWorker#in 方法处理

Aggregator 接收处理

grpc 服务端的代码入口位于 RemoteServiceHandler#call

// 初始化 workerInstanceGetter, workerInstanceGetter 在 Metrics 的 MetricsStreamProcessor#create 过程中进行了服务注册
workerInstanceGetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceGetter.class);

// 根据 RemoteMessage.nextWorkerName 来获取已注册的 worker, 具体的worker为 
RemoteHandleWorker handleWorker = workerInstanceGetter.get(nextWorkerName);
if (handleWorker != null) {
    AbstractWorker nextWorker = handleWorker.getWorker();
    StreamData streamData = handleWorker.getStreamDataClass().newInstance();
    // 信息反序列化
    streamData.deserialize(remoteData);
    nextWorker.in(streamData);
}

workerInstanceGetter 中注册的是 MetricsPersistentWorker, 用于数据的持久化部分, 这样就和 Receiver 的角色的职责独立出来

MetricsPersistentWorker 负责的职责较多, 例如 警告的处理, 数据的向下采样(天, 小时), 而 MetricsPersistentWorker#in 职责比较简单, 就将数据放入缓存中

// MetricsPersistentWorker#onWork
void onWork(List<INPUT> input) {
    cache.write(input);
}

数据持久化则通过 PersistenceTimer 完成初始化和执行

public enum PersistenceTimer {
    INSTANCE;
    public void start(ModuleManager moduleManager, CoreModuleConfig moduleConfig) {
        ...
        // moduleConfig.getPersistentPeriod() 默认值 25, 25秒执行一次数据的批量存储
        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(
                         new RunnableWithExceptionProtection(() -> extractDataAndSave(batchDAO), t -> log
                             .error("Extract data and save failure.", t)), 5, moduleConfig.getPersistentPeriod(),
                         TimeUnit.SECONDS
                     );
        ...
    }
    
    private void extractDataAndSave(IBatchDAO batchDAO) {
        ...
        List<PersistenceWorker<? extends StorageData>> persistenceWorkers = new ArrayList<>();
        persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
        persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());

        CountDownLatch countDownLatch = new CountDownLatch(persistenceWorkers.size());
        persistenceWorkers.forEach(worker -> {
             prepareExecutorService.submit(() -> {
                ...
                 // 预处理阶段
                try (HistogramMetrics.Timer timer = prepareLatency.createTimer()) {
                    if (log.isDebugEnabled()) {
                        log.debug("extract {} worker data and save", worker.getClass().getName());
                    }

                    innerPrepareRequests = worker.buildBatchRequests();

                    worker.endOfRound();
                } catch (Throwable e) {
                    log.error(e.getMessage(), e);
                }

                // 执行阶段
                try (HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer()) {
                    if (CollectionUtils.isNotEmpty(innerPrepareRequests)) {
                        batchDAO.flush(innerPrepareRequests);
                    }
                } catch (Throwable e) {
                    log.error(e.getMessage(), e);
                }
                ...
             });
        }
    }
}

public abstract class PersistenceWorker<INPUT extends StorageData> extends AbstractWorker<INPUT> {
    public List<PrepareRequest> buildBatchRequests() {
        final List<INPUT> dataList = getCache().read();
        return prepareBatch(dataList);
    }
}

因此在 MetricsPersistentWorker 中最重要的阶段为 MetricsPersistentWorker#prepareBatch

@Override
public List<PrepareRequest> prepareBatch(Collection<Metrics> lastCollection) {
    if (persistentCounter++ % persistentMod != 0) {
        return Collections.EMPTY_LIST;
    }

    long start = System.currentTimeMillis();
    if (lastCollection.size() == 0) {
        return Collections.EMPTY_LIST;
    }

    /*
     * Hard coded the max size. This only affect the multiIDRead if the data doesn't hit the cache.
     */
    int maxBatchGetSize = 2000;
    final int batchSize = Math.min(maxBatchGetSize, lastCollection.size());
    List<Metrics> metricsList = new ArrayList<>();
    List<PrepareRequest> prepareRequests = new ArrayList<>(lastCollection.size());
    // 这里使用天和小时的进行向下采样处理
    for (Metrics data : lastCollection) {
        transWorker.ifPresent(metricsTransWorker -> metricsTransWorker.in(data));

        metricsList.add(data);

        if (metricsList.size() == batchSize) {
            flushDataToStorage(metricsList, prepareRequests);
        }
    }

    if (metricsList.size() > 0) {
        flushDataToStorage(metricsList, prepareRequests);
    }

    if (prepareRequests.size() > 0) {
        log.debug(
            "prepare batch requests for model {}, took time: {}, size: {}", model.getName(),
            System.currentTimeMillis() - start, prepareRequests.size()
        );
    }
    return prepareRequests;
}

数据的合并处理

合并流程 MergableBufferedData#accept

public class MergableBufferedData<METRICS extends Metrics> implements BufferedData<METRICS> {
...
    // 此处 METRICS 是泛型, 继承 Metrics
    public void accept(final METRICS data) {
        final String id = data.id();
        final METRICS existed = buffer.get(id);
        // 首次收到数据
        if (existed == null) {
            buffer.put(id, data);
        } else {
        // 进行数据合并, 如果需要丢弃则具体实现返回 false, 一般为true
        final boolean isAbandoned = !existed.combine(data);
        if (isAbandoned) {
            buffer.remove(id);
        }
    }
...
}

Metrics#combine 为实际的合并结果集的方法, 具体有不同的实现, 以下用 AvgFunction 说明

public abstract class AvgFunction extends Metrics implements AcceptableValue<Long>, LongValueHolder {
    ...
     @Entrance
    public final void combine(@SourceFrom long summation, @ConstOne long count) {
        // 合并总量
        this.summation += summation;
        // 合并个数
        this.count += count;
    }

    @Override
    public final boolean combine(Metrics metrics) {
        AvgFunction longAvgMetrics = (AvgFunction) metrics;
        combine(longAvgMetrics.summation, longAvgMetrics.count);
        return true;
    }
    ...
}
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容