Skywalking-11:Skywalking查询协议——案例分析

以查询 Metrics 信息案例来分析 Skywalking 查询协议

基本概述

Skywalking 查询协议默认基于 GraphQL ,如果有需要也可以自定义扩展,提供一个实现了 org.apache.skywalking.oap.server.core.query.QueryModule 的查询模块即可。

截取 Skywalking UI 发送的请求

  • 请求路径
POST http://127.0.0.1:8080/graphql
  • 请求体
{
  "query": "query queryData($condition: MetricsCondition!, $duration: Duration!) {\n  readMetricsValues: readMetricsValues(condition: $condition, duration: $duration) {\n    label\n    values {\n      values {value}\n    }\n  }}",
  "variables": {
    "duration": {
      "start": "2021-07-03 1320",
      "end": "2021-07-03 1321",
      "step": "MINUTE"
    },
    "condition": {
      "name": "instance_jvm_thread_runnable_thread_count",
      "entity": {
        "scope": "ServiceInstance",
        "serviceName": "business-zone::projectA",
        "serviceInstanceName": "e8cf34a1d54a4058a8c98505877770e2@192.168.50.113",
        "normal": true
      }
    }
  }
}
  • 响应
{
  "data": {
    "readMetricsValues": {
      "values": {
        "values": [
          {
            "value": 22
          },
          {
            "value": 22
          }
        ]
      }
    }
  }
}

Skywalking 源码中找到对应 GraphQL 定义

打开 oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol 目录,使用请求体中的模板关键字 readMetricsValues 搜索
oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol/metrics-v2.graphqls 中找到对应的定义

extend type Query {
    # etc...
    # Read time-series values in the duration of required metrics
    readMetricsValues(condition: MetricsCondition!, duration: Duration!): MetricsValues!
    # etc...
}

输入参数定义

input MetricsCondition {
    # Metrics name, which should be defined in OAL script
    # Such as:
    # Endpoint_avg = from(Endpoint.latency).avg()
    # Then, `Endpoint_avg`
    name: String!
    # Follow entity definition description.
    entity: Entity!
}

input Entity {
    # 1. scope=All, no name is required.
    # 2. scope=Service, ServiceInstance and Endpoint, set neccessary serviceName/serviceInstanceName/endpointName
    # 3. Scope=ServiceRelation, ServiceInstanceRelation and EndpointRelation
    #    serviceName/serviceInstanceName/endpointName is/are the source(s)
    #    destServiceName/destServiceInstanceName/destEndpointName is/are destination(s)
    #    set necessary names of sources and destinations.
    scope: Scope!
    serviceName: String
    # Normal service is the service having installed agent or metrics reported directly.
    # Unnormal service is conjectural service, usually detected by the agent.
    normal: Boolean
    serviceInstanceName: String
    endpointName: String
    destServiceName: String
    # Normal service is the service having installed agent or metrics reported directly.
    # Unnormal service is conjectural service, usually detected by the agent.
    destNormal: Boolean
    destServiceInstanceName: String
    destEndpointName: String
}

# The Duration defines the start and end time for each query operation.
# Fields: `start` and `end`
#   represents the time span. And each of them matches the step.
#   ref https://www.ietf.org/rfc/rfc3339.txt
#   The time formats are
#       `SECOND` step: yyyy-MM-dd HHmmss
#       `MINUTE` step: yyyy-MM-dd HHmm
#       `HOUR` step: yyyy-MM-dd HH
#       `DAY` step: yyyy-MM-dd
#       `MONTH` step: yyyy-MM
# Field: `step`
#   represents the accurate time point.
# e.g.
#   if step==HOUR , start=2017-11-08 09, end=2017-11-08 19
#   then
#       metrics from the following time points expected
#       2017-11-08 9:00 -> 2017-11-08 19:00
#       there are 11 time points (hours) in the time span.
input Duration {
    start: String!
    end: String!
    step: Step!
}

enum Step {
    DAY
    HOUR
    MINUTE
    SECOND
}

返回结果定义

type MetricsValues {
    # Could be null if no label assigned in the query condition
    label: String
    # Values of this label value.
    values: IntValues
}

type IntValues {
    values: [KVInt!]!
}

type KVInt {
    id: ID!
    # This is the value, the caller must understand the Unit.
    # Such as:
    # 1. If ask for cpm metric, the unit and result should be count.
    # 2. If ask for response time (p99 or avg), the unit should be millisecond.
    value: Long!
}

使用 GraphQL IDEA 插件验证 Skywalking UI 的请求

使用“ GraphQLSkywalking 中的应用”一节中的方式,模仿“截取 Skywalking UI 发送的请求”一节中前端发送的请求

  • 请求模板
query queryData($condition: MetricsCondition!, $duration: Duration!) {
    readMetricsValues: readMetricsValues(duration: $duration, condition: $condition) {
        label values { values { id value }}
    }
}
  • 请求参数
{
  "duration": {
    "start": "2021-07-03 1400",
    "end": "2021-07-03 1401", 
    "step": "MINUTE"
  },
  "condition": {
    "name": "instance_jvm_thread_runnable_thread_count",
    "entity": {
      "scope": "ServiceInstance",
      "serviceName": "business-zone::projectA",
      "serviceInstanceName": "e8cf34a1d54a4058a8c98505877770e2@192.168.50.113",
      "normal": true
    }
  }
}
  • 响应结果
{
  "data": {
    "readMetricsValues": {
      "values": {
        "values": [
          {
            "id": "202107031400_YnVzaW5lc3Mtem9uZTo6cHJvamVjdEE=.1_ZThjZjM0YTFkNTRhNDA1OGE4Yzk4NTA1ODc3NzcwZTJAMTkyLjE2OC41MC4xMTM=",
            "value": 22
          },
          {
            "id": "202107031401_YnVzaW5lc3Mtem9uZTo6cHJvamVjdEE=.1_ZThjZjM0YTFkNTRhNDA1OGE4Yzk4NTA1ODc3NzcwZTJAMTkyLjE2OC41MC4xMTM=",
            "value": 22
          }
        ]
      }
    }
  }
}
file
file

PS:如果不使用模板的方式,写查询语句是会有代码提示的

query queryData {
    readMetricsValues(
        duration: {start: "2021-07-03 1400",end: "2021-07-03 1401", step: MINUTE},
        condition: {
            name: "instance_jvm_thread_runnable_thread_count",
            entity: {
                scope: ServiceInstance,
                serviceName: "business-zone::projectA",
                serviceInstanceName: "e8cf34a1d54a4058a8c98505877770e2@192.168.50.113",
                normal: true
            }
        }
    ) {
        label values{ values{ id value }}
    }
}

如何将 GraphQL Schema 文件加载到程序中

搜索 metrics-v2.graphqls ,在 oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/GraphQLQueryProvider.java 找到加载代码

    // 初始化GraphQL引擎
    @Override
    public void prepare() throws ServiceNotProvidedException, ModuleStartException {
        GraphQLSchema schema = SchemaParser.newParser()
                                           // etc...
                                           .file("query-protocol/metrics-v2.graphqls")
                                           .resolvers(new MetricsQuery(getManager())) // MetricsQuery 是 com.coxautodev.graphql.tools.GraphQLQueryResolver 接口实现类
                                           // etc...
                                           .build()
                                           .makeExecutableSchema();
        this.graphQL = GraphQL.newGraphQL(schema).build();
    }

org.apache.skywalking.oap.query.graphql.resolver.MetricsQuery 类中,找到 readMetricsValues 方法

    /**
     * Read time-series values in the duration of required metrics
     */
    public MetricsValues readMetricsValues(MetricsCondition condition, Duration duration) throws IOException {
        if (MetricsType.UNKNOWN.equals(typeOfMetrics(condition.getName())) || !condition.getEntity().isValid()) {
            final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
            MetricsValues values = new MetricsValues();
            pointOfTimes.forEach(pointOfTime -> {
                String id = pointOfTime.id(
                    condition.getEntity().isValid() ? condition.getEntity().buildId() : "ILLEGAL_ENTITY"
                );
                final KVInt kvInt = new KVInt();
                kvInt.setId(id);
                kvInt.setValue(0);
                values.getValues().addKVInt(kvInt);
            });
            return values;
        }
        return getMetricsQueryService().readMetricsValues(condition, duration);
    }

    private MetricsQueryService getMetricsQueryService() {
        if (metricsQueryService == null) {
            this.metricsQueryService = moduleManager.find(CoreModule.NAME)
                                                    .provider()
                                                    .getService(MetricsQueryService.class);
        }
        return metricsQueryService;
    }

org.apache.skywalking.oap.server.core.query.MetricsQueryService#readMetricsValues

    /**
     * Read time-series values in the duration of required metrics
     */
    public MetricsValues readMetricsValues(MetricsCondition condition, Duration duration) throws IOException {
        return getMetricQueryDAO().readMetricsValues(
            condition, ValueColumnMetadata.INSTANCE.getValueCName(condition.getName()), duration);
    }

    private IMetricsQueryDAO getMetricQueryDAO() {
        if (metricQueryDAO == null) {
            metricQueryDAO = moduleManager.find(StorageModule.NAME).provider().getService(IMetricsQueryDAO.class);
        }
        return metricQueryDAO;
    }

查看Extend storage文档, IMetricsQueryDAO 为指标查询数据访问对象

# Implement all DAOs
# Here is the list of all DAO interfaces in storage
IServiceInventoryCacheDAO
IServiceInstanceInventoryCacheDAO
IEndpointInventoryCacheDAO
INetworkAddressInventoryCacheDAO
IBatchDAO
StorageDAO
IRegisterLockDAO
ITopologyQueryDAO
IMetricsQueryDAO
ITraceQueryDAO
IMetadataQueryDAO
IAggregationQueryDAO
IAlarmQueryDAO
IHistoryDeleteDAO
IMetricsDAO
IRecordDAO
IRegisterDAO
ILogQueryDAO
ITopNRecordsQueryDAO
IBrowserLogQueryDAO

通过类图,可以看出 IMetricsQueryDAO 实现类有 ESES7InfluxDBSQL 四种

file

如何将 GraphQL 引擎注册到 Jetty 服务

    // 注册GraphQL查询处理器至Jetty服务
    @Override
    public void start() throws ServiceNotProvidedException, ModuleStartException {
        JettyHandlerRegister service = getManager().find(CoreModule.NAME)
                                                   .provider()
                                                   .getService(JettyHandlerRegister.class);
        service.addHandler(new GraphQLQueryHandler(config.getPath(), graphQL));
    }

通过分析 GraphQLQueryProvider 该类,发现就是 QueryModule (查询模块)的 Provider (提供)类

由此,也验证了在“基本概述”一节的说法:

Skywalking 查询协议默认基于 GraphQL ,如果有需要也可以自定义扩展,提供一个实现了 org.apache.skywalking.oap.server.core.query.QueryModule 的查询模块即可。

    @Override
    public String name() {
        return "graphql";
    }

    @Override
    public Class<? extends ModuleDefine> module() {
        return QueryModule.class;
    }
package org.apache.skywalking.oap.query.graphql;

import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.reflect.TypeToken;
import graphql.ExecutionInput;
import graphql.ExecutionResult;
import graphql.GraphQL;
import graphql.GraphQLError;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.server.library.server.jetty.JettyJsonHandler;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RequiredArgsConstructor
public class GraphQLQueryHandler extends JettyJsonHandler {

    private static final Logger LOGGER = LoggerFactory.getLogger(GraphQLQueryHandler.class);

    private static final String QUERY = "query";
    private static final String VARIABLES = "variables";
    private static final String DATA = "data";
    private static final String ERRORS = "errors";
    private static final String MESSAGE = "message";

    private final Gson gson = new Gson();
    private final Type mapOfStringObjectType = new TypeToken<Map<String, Object>>() {
    }.getType();

    private final String path;

    private final GraphQL graphQL;

    @Override
    public String pathSpec() {
        return path;
    }

    @Override
    protected JsonElement doGet(HttpServletRequest req) {
        throw new UnsupportedOperationException("GraphQL only supports POST method");
    }

    @Override
    protected JsonElement doPost(HttpServletRequest req) throws IOException {
        BufferedReader reader = new BufferedReader(new InputStreamReader(req.getInputStream()));
        String line;
        StringBuilder request = new StringBuilder();
        while ((line = reader.readLine()) != null) {
            request.append(line);
        }

        JsonObject requestJson = gson.fromJson(request.toString(), JsonObject.class);

        return execute(requestJson.get(QUERY)
                                  .getAsString(), gson.fromJson(requestJson.get(VARIABLES), mapOfStringObjectType));
    }

    private JsonObject execute(String request, Map<String, Object> variables) {
        try {
            ExecutionInput executionInput = ExecutionInput.newExecutionInput()
                                                          .query(request)
                                                          .variables(variables)
                                                          .build();
            // 使用GraphQL引擎获取查询结果
            ExecutionResult executionResult = graphQL.execute(executionInput);
            LOGGER.debug("Execution result is {}", executionResult);
            // 封装返回结果
            Object data = executionResult.getData();
            List<GraphQLError> errors = executionResult.getErrors();
            JsonObject jsonObject = new JsonObject();
            if (data != null) {
                jsonObject.add(DATA, gson.fromJson(gson.toJson(data), JsonObject.class));
            }

            if (CollectionUtils.isNotEmpty(errors)) {
                JsonArray errorArray = new JsonArray();
                errors.forEach(error -> {
                    JsonObject errorJson = new JsonObject();
                    errorJson.addProperty(MESSAGE, error.getMessage());
                    errorArray.add(errorJson);
                });
                jsonObject.add(ERRORS, errorArray);
            }
            return jsonObject;
        } catch (final Throwable e) {
            LOGGER.error(e.getMessage(), e);
            JsonObject jsonObject = new JsonObject();
            JsonArray errorArray = new JsonArray();
            JsonObject errorJson = new JsonObject();
            errorJson.addProperty(MESSAGE, e.getMessage());
            errorArray.add(errorJson);
            jsonObject.add(ERRORS, errorArray);
            return jsonObject;
        }
    }
}

Webapp 网关转发 GraphQL 请求至 OAP

v8.6.0 及之前,网关都是 zuulv8.7.0 及之后替换成了 Spring Cloud Gateway 。因为这块不是这篇文章的重点,这里不再赘述

总结

Skywalking 的查询协议默认使用通用性很强的 GraphQL 实现,客户端可以通过 GraphQL 协议很方便的选取自己需要的数据。
对应 Skywalking 这种模式相对固定、变更不频繁的查询需求来说,还是挺适合的。

参考文档

分享并记录所学所见

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,377评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,390评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,967评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,344评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,441评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,492评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,497评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,274评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,732评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,008评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,184评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,837评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,520评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,156评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,407评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,056评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,074评论 2 352

推荐阅读更多精彩内容