open-falcon 聚合器aggregator代码解析

总结:aggregator聚合器就是从falcon_portal.cluster表中取出用户在页面上配置的表达式,然后解析后,通过api拿到对应机器组的所有机器,通过api查询graph数据算出一个值重新打回transfer作为一个新的点。

  • 定时从db中拿出所有的聚合器配置放到一个map中
  • 第一次启动时遍历聚合器map生成workers map 这两个map的key都是id+updatetime
  • 同时下一次拿出db生成map 对workers这个map进行增量更新 和删除操作删除是通过 worker.Quit chan通信的
  • workers这个map 通过 ticker跑cron 运行WorkerRun这个方法
  • WorkerRun这个方法解析分子分母的配置
  • 调用api 根据grp_id拿出所有机器列表
  • 调用graph的last接口拿出所有endpoint的counter 的值然后进行计算
  • 计算后重新打回 一个线程安全的双向链表队列
  • 另外一个goroutine异步pop队列中的值发生给 transfer的http接口(不是给agent用的rpc接口)
  • 机器量很多时获取机器列表和查询最新的值都是瓶颈
  • 我在想如果直接在transfer中直接做数据的聚合速度上不存在瓶颈

下面我们来看下代码:

  1. main.go中核心的两个地方
    //查询db 调api算值 push 到push的队列中
    go cron.UpdateItems()
    //从push队列push到transfer
    sender.StartSender()

2.看下go cron.UpdateItems()

func updateItems() {
        //从db中查询出结果
    items, err := db.ReadClusterMonitorItems()
    if err != nil {
        return
    }
        //对比key(id+uptime),将已经变更的项删除 
    deleteNoUseWorker(items)
    //启动新的worker
    createWorkerIfNeed(items)
}
//看下这个读db的func
func ReadClusterMonitorItems() (M map[string]*g.Cluster, err error){
   ......
   /*看到这个funcreturn的是个map key是 每个聚合项的id和他更新时间的字符串
   value 就是Cluster结构体指针
   type Cluster struct {
    Id          int64
    GroupId     int64
    Numerator   string
    Denominator string
    Endpoint    string
    Metric      string
    Tags        string
    DsType      string
    Step        int
    LastUpdate  time.Time
   }
   */
   M[fmt.Sprintf("%d%v", c.Id, c.LastUpdate)] = &c
   return M, err
}

3.看下 deleteNoUseWorker 和createWorkerIfNeed 这两个func都是围绕 Worker这个struct的进行增删

func deleteNoUseWorker(m map[string]*g.Cluster) {
    del := []string{}
    for key, worker := range Workers {
            //遍历已经创建的work,如果key在新的map中没有了说明这条记录在db中被更改或删除了
        //所以删掉它 给Workers这个map缩容
        if _, ok := m[key]; !ok {
               //将worker 中的Quit chan关闭 会调用ticker.stop 真正关闭 
            worker.Drop()
            del = append(del, key)
        }
    }

    for _, key := range del {
        delete(Workers, key)
    }
}

func createWorkerIfNeed(m map[string]*g.Cluster) {
 
    for key, item := range m {
        if _, ok := Workers[key]; !ok {
                //如果配置中step小于0 丢弃这条
            if item.Step <= 0 {
                log.Println("[W] invalid cluster(step <= 0):", item)
                continue
            }
                        //初始化worker     
            worker := NewWorker(item)
            Workers[key] = worker
            worker.Start()
        }
    }
}

4. 看下Worker这个结构体包含三个域

  • ticker作为一个计时器实现类似cron的功能每隔一段时间执行一次Start 中的func
  • ClusterItem作为每个聚合器的配置
  • Quit是一个chan用来外部关闭 key在新的map中没有了说明这条记录在db中被更改或删除了
type Worker struct {
    Ticker      *time.Ticker
    ClusterItem *g.Cluster
    Quit        chan struct{}
}

func NewWorker(ci *g.Cluster) Worker {
    w := Worker{}
    w.Ticker = time.NewTicker(time.Duration(ci.Step) * time.Second)
    w.Quit = make(chan struct{})
    w.ClusterItem = ci
    return w
}

func (this Worker) Start() {
    go func() {
        for {
            select {
            case <-this.Ticker.C:
                WorkerRun(this.ClusterItem)
            case <-this.Quit:
                if g.Config().Debug {
                    log.Println("[I] drop worker", this.ClusterItem)
                }
                this.Ticker.Stop()
                return
            }
        }
    }()
}

func (this Worker) Drop() {
    close(this.Quit)
}

var Workers = make(map[string]Worker)

到这里我们已经看明白聚合器的流程了:

  • 定时从db中拿出所有的聚合器配置放到一个map中
  • 第一次启动时遍历聚合器map生成workers map 这两个map的key都是id+updatetime
  • 同时下一次拿出db生成map 对workers这个map进行增量更新 和删除操作删除是通过 worker.Quit chan通信的
  • workers这个map 通过 ticker跑cron 运行WorkerRun这个方法

5.下面看下最重要的方法 WorkerRun

func WorkerRun(item *g.Cluster) {
    debug := g.Config().Debug
    /*
    Numerator代表分子    例如 $(cpu.user)+$(cpu.system) 代表求cpu.user和cpu.system的和
    Denominator代表分母  例如 $# 代表所有机器
    */
        //cleanParam去除\r等字符
    numeratorStr := cleanParam(item.Numerator)
    denominatorStr := cleanParam(item.Denominator)
        //判断分子分母是否合法
    if !expressionValid(numeratorStr) || !expressionValid(denominatorStr) {
        log.Println("[W] invalid numerator or denominator", item)
        return
    }
        //判断分子分母是否需要计算
    needComputeNumerator := needCompute(numeratorStr)
    needComputeDenominator := needCompute(denominatorStr)
    //如果分子分母都不需要计算就不需要用到聚合器了
    if !needComputeNumerator && !needComputeDenominator {
        log.Println("[W] no need compute", item)
        return
    }
        //比如分子是这样的: "($(cpu.busy)+$(cpu.idle)-$(cpu.nice))>80"
    //那么parse的返回值为 [cpu.busy cpu.idle cpu.nice] [+ -] >80
    numeratorOperands, numeratorOperators, numeratorComputeMode := parse(numeratorStr, needComputeNumerator)
    denominatorOperands, denominatorOperators, denominatorComputeMode := parse(denominatorStr, needComputeDenominator)

    if !operatorsValid(numeratorOperators) || !operatorsValid(denominatorOperators) {
        log.Println("[W] operators invalid", item)
        return
    }
    /*add retry for gethostname bygid
    这里源码是动过sdk根据group_id查找组里面机器列表
    这里我进行了两点优化:
    1.sdk调用时没有加重试,http失败导致这次没有get到机器所以这个点就不算了导致断点
    2.原来的接口在机器量超过1k时就效率就会很慢 2w+机器需要8s,看了代码是用orm进行了多次查询而且附带了很多别的信息
    这里我只需要group_id对应endpoint_list所以我写了一个新的接口用一条raw_sql进行查询
    测试2w+的机器0.2s就能返回
    */
    retry_limit :=3
    r_s :=0
    var hostnames []string
    for r_s <retry_limit{
        hostnames_tmp, err_tmp := sdk.HostnamesByID(item.GroupId)
        if err_tmp != nil {
            log.Println("[E] get hostlist err",err_tmp)
            r_s+=1
            time.Sleep(time.Second)
        }else{
            hostnames = hostnames_tmp
            break
        }
    }
    //没有机器当然不用算了
    if len(hostnames)==0{
        log.Println("[E] get 0 record hostname item:",item)
        return
    }

    now := time.Now().Unix()

    /*这里是调用graph/lastpoint这个api 查询最近一个点的数据
    1.机器是上面查到的主机列表
    2.counter这里做了合并 把所有要查的metirc都放在一个请求里面查询了
    3.查询的时候在api那边做了for循环 逐个item查询 估计这里也会拖慢速度
    4.查完之后计算下值推到发送队列
    */
    valueMap, err := queryCounterLast(numeratorOperands, denominatorOperands, hostnames, now-int64(item.Step*2), now)
    if err != nil {
        log.Println("[E] get queryCounterLast", err, item)
        return
    }

    ..........
    sender.Push(item.Endpoint, item.Metric, item.Tags, numerator/denominator, item.DsType, int64(item.Step))
}

6.最后看下发送的代码

  • MetaDataQueue是个线程安全的双向链表
  • 上面说的WorkerRun方法中会将转化好的监控项数据PushFront入链表
  • startSender这个goroutine 每200毫秒会将队列中的数据取出发送到transfer的http接口
func Push(endpoint, metric, tags string, val interface{}, counterType string, step_and_ts ...int64) {
    md := MakeMetaData(endpoint, metric, tags, val, counterType, step_and_ts...)
    MetaDataQueue.PushFront(md)
}

const LIMIT = 200

var MetaDataQueue = NewSafeLinkedList()
var PostPushUrl string
var Debug bool

func StartSender() {
    go startSender()
}

func startSender() {
    for {
        L := MetaDataQueue.PopBack(LIMIT)
        if len(L) == 0 {
            time.Sleep(time.Millisecond * 200)
            continue
        }

        err := PostPush(L)
        if err != nil {
            log.Println("[E] push to transfer fail", err)
        }
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,470评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,393评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,577评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,176评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,189评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,155评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,041评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,903评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,319评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,539评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,703评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,417评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,013评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,664评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,818评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,711评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,601评论 2 353