使用Prometheus

动机

最近需要将流程的耗时接入可观测系统,方便后续优化性能。为了统一技术栈,决定使用prometheus来接入。特别的是,我需要使用自定义的时间戳,由于PushGateway不支持自定义的时间戳,只能使用 pull 的方式了。

整体流程

1、构建 Collector
2、注册到 Register(也实现了 Gatherer 接口),其中会调用 CollectorDescribe方法。
3、通过http请求到 /metrics,使用 promhttp.Handler 来处理请求。
4、调用 RegisterGather方法,其中会调用 注册的CollectorCollect方法。
5、将 Metric 按名称分组为 dto.MetricFamily,编码后写入 ResponseWriter,返回给请求端。

实现细节

实现 Collector

type timestampedCollector struct {
    bufferChan chan prometheus.Metric
    desc       *prometheus.Desc
}

func newTimestampedCollector(bufferSize int, desc *prometheus.Desc) *timestampedCollector {
    metrics := make(chan prometheus.Metric, bufferSize)
    return &timestampedCollector{
        bufferChan: metrics,
        desc:       desc,
    }
}

func (t *timestampedCollector) add(m prometheus.Metric) {
    logger.Infof("add metric:%s", util.ToJson(m.Desc()))
    t.bufferChan <- m
}

func (t *timestampedCollector) Describe(c chan<- *prometheus.Desc) {
    c <- t.desc
}

func (t *timestampedCollector) Collect(c chan<- prometheus.Metric) {
    for {
        select {
        case m := <-t.bufferChan:
            c <- m
        case <-time.After(time.Second * 1):
            return
        }
    }
}

prometheus中有2种Collector,一种是uncheckedCollectors,还有一种是checkedCollectors,就是看你实现的CollectorDescribe方法中有没有添加prometheus.Desc

注册到Registry

构建prometheus.Desc

var totalDesc = prometheus.NewDesc("emr_exclude_bootstrap_duration_seconds",
    "流程去掉执行引导操作的时间",
    []string{"service", "region", "nodeCnt", "flowId"},
    nil)

NewDesc生成 prometheus.Desc的id,总的来说就是(fqName + constLabels的值列表) 合并起来的string的hash值

func (v2) NewDesc(fqName, help string, variableLabels ConstrainableLabels, constLabels Labels) *Desc {
        // 构建 Desc
        ......
        labelValues := make([]string, 1, len(constLabels)+1)
    labelValues[0] = fqName
        for _, labelName := range labelNames {
        labelValues = append(labelValues, constLabels[labelName])
    }
        xxh := xxhash.New()
    for _, val := range labelValues {
        xxh.WriteString(val)
        xxh.Write(separatorByteSlice)
    }
    d.id = xxh.Sum64() // 生成 Desc 的 id
}

NewDesc生成 prometheus.Desc的dimHash,总的来说就是(fqName + constLabels的值列表) 合并起来的string的hash值

func (v2) NewDesc(fqName, help string, variableLabels ConstrainableLabels, constLabels Labels) *Desc {
        // 构建 Desc
        ......
        labelNames := make([]string, 0, len(constLabels)+len(d.variableLabels.names))
        for labelName := range constLabels {
        labelNames = append(labelNames, labelName)
    }
      for _, label := range variableLabels {
        labelNames = append(labelNames, "$"+label)
    }
        xxh := xxhash.New()
    xxh.WriteString(help)
    xxh.Write(separatorByteSlice)
    for _, labelName := range labelNames {
        xxh.WriteString(labelName)
        xxh.Write(separatorByteSlice)
    }
    d.dimHash = xxh.Sum64()
}

注册到prometheus

totalCollector = newTimestampedCollector(1000, totalDesc)
if err := handleErr(prometheus.Register(totalCollector)); err != nil {
    panic(err)
}

伪代码如下

func (r *Registry) Register(c Collector) error {
        c.Describe(descChan)
        close(descChan)
        for desc := range descChan {
            1、desc.id不能重复
            2、desc.dimHash + desc.fqName 不能重复
        }
}

使用 promhttp.Handler 来处理请求

func main() {
    http.Handle("/metrics", promhttp.Handler())
        // 暴露自己的指标
    http.ListenAndServe(":13000", nil)
}

使用curl -XGET 'http://localhost:13000/metrics'来验证metric是否暴露成功。其中 http handler的链表结构为:

InstrumentHandlerCounter -- HandlerForTransactional内部方法

暴露指标

promhttp.Handler最核心的就是调用 RegistryGather方法,其中会调用注册的CollectorCollect方法来将指标拉取出来,接着主要是2步
1、聚合

MetricFamily 按照Metric fqName分组,相同Metric fqNameMetric会放入到 MetricFamily 的集合中

2、校验

1、指标名称 + lable的名字 + label的值 + 时间戳 不能重复
2、label不能重复

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容