aggregator版本
VERSION = "0.0.4"
aggregator组件功能
集群聚合模块。聚合某集群下的所有机器的某个指标的值,提供一种集群视角的监控体验。
aggregator组件逻辑图
aggregator配置操作
main入口函数分析
func main() {
cfg := flag.String("c", "cfg.json", "configuration file")
version := flag.Bool("v", false, "show version")
help := flag.Bool("h", false, "help")
flag.Parse()
if *version {
fmt.Println(g.VERSION)
os.Exit(0)
}
if *help {
flag.Usage()
os.Exit(0)
}
//全局配置文件解析
g.ParseConfig(*cfg) 【参考详细分析】
//mysql client连接初始化
db.Init() 【参考详细分析】
//HTTP API服务监听与处理
go http.Start() 【参考详细分析】
//周期缓存与更新集群监控配置策略
go cron.UpdateItems() 【参考详细分析】
// sdk configuration
sender.Debug = g.Config().Debug
sender.PostPushUrl = g.Config().Api.PushApi
//发送集群聚合监控值至Transfer
//聚合与计算后的数据重新回到了Agent,继续后续的生命周期
sender.StartSender() 【参考详细分析】
//系统信息注册与资源释放
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigs
fmt.Println()
os.Exit(0)
}()
select {}
}
g.ParseConfig(*cfg) 全局配置解析(同其它组件,请参考其它组件分析)
type GlobalConfig struct {
Debug bool `json:"debug"`
Http *HttpConfig `json:"http"`
Database *DatabaseConfig `json:"database"`
Api *ApiConfig `json:"api"`
}
db.Init() 数据库Client连接初始化
func Init() {
var err error
DB, err = sql.Open("mysql", g.Config().Database.Addr) //open mysql
if err != nil {
log.Fatalln("open db fail:", err)
}
DB.SetMaxIdleConns(g.Config().Database.Idle)
err = DB.Ping()
if err != nil {
log.Fatalln("ping db fail:", err)
}
}
http.Start() HTTP服务监听与处理
# HTTP路由初始化
func init() {
configCommonRoutes() //组件公共API路由,可参考HBS模块
configProcRoutes() //统计API路由
}
# HTTP服务启动
func Start() {
if !g.Config().Http.Enabled {
return
}
addr := g.Config().Http.Listen //全局监听地址配置
if addr == "" {
return
}
s := &http.Server{
Addr: addr,
MaxHeaderBytes: 1 << 30,
}
log.Println("http listening", addr)
log.Fatalln(s.ListenAndServe()) //监听与处理
}
##统计API "/items"查看集群配置
func configProcRoutes() {
http.HandleFunc("/items", func(w http.ResponseWriter, r *http.Request) {
items, err := db.ReadClusterMonitorItems() //查看集群配置
if err != nil {
w.Write([]byte(err.Error()))
return
}
for _, v := range items {
w.Write([]byte(v.String()))
w.Write([]byte("\n"))
}
})
}
cron.UpdateItems() 周期缓存与更新集群监控配置策略
# 更新集群监控配置项,公开入口函数方法
func UpdateItems() {
for {
updateItems() //内部方法调用
d := time.Duration(g.Config().Database.Interval) * time.Second
time.Sleep(d) //default Interval 55 sec
}
}
## 更新集群监控配置项
func updateItems() {
items, err := db.ReadClusterMonitorItems() //从DB加载aggregator配置项到内存
if err != nil {
return
}
deleteNoUseWorker(items) //清理worker
createWorkerIfNeed(items) //创建worker
}
## 从DB加载aggregator配置项到内存
func ReadClusterMonitorItems() (M map[string]*g.Cluster, err error) {
M = make(map[string]*g.Cluster)
sql := "SELECT `id`, `grp_id`, `numerator`, `denominator`, `endpoint`, `metric`, `tags`, `ds_type`, `step`, `last_update` FROM `cluster`" //
cfg := g.Config()
ids := cfg.Database.Ids //ids配置检测
if len(ids) != 2 {
log.Fatalln("ids configuration error")
}
//ids定义,可以通过多实例配置不同的id区间处理监控配置
if ids[0] != -1 && ids[1] != -1 {
sql = fmt.Sprintf("%s WHERE `id` >= %d and `id` <= %d", sql, ids[0], ids[1])
} else {
if ids[0] != -1 {
sql = fmt.Sprintf("%s WHERE `id` >= %d", sql, ids[0])
}
if ids[1] != -1 {
sql = fmt.Sprintf("%s WHERE `id` <= %d", sql, ids[1])
}
}
if cfg.Debug {
log.Println(sql)
}
rows, err := DB.Query(sql) //sql
if err != nil {
log.Println("[E]", err)
return M, err
}
defer rows.Close()
for rows.Next() {
var c g.Cluster
err = rows.Scan(&c.Id, &c.GroupId, &c.Numerator, &c.Denominator, &c.Endpoint, &c.Metric, &c.Tags, &c.DsType, &c.Step, &c.LastUpdate) //DB查询
if err != nil {
log.Println("[E]", err)
continue
}
M[fmt.Sprintf("%d%v", c.Id, c.LastUpdate)] = &c //保存MAP
}
return M, err
}
//集群监控配置结构体
type Cluster struct {
Id int64
GroupId int64
Numerator string //分子
Denominator string //分母
Endpoint string
Metric string
Tags string
DsType string
Step int //汇报周期
LastUpdate time.Time
}
var Workers = make(map[string]Worker)
## 如果监控配置已由用户取消或删除,将清理内存中的定时Worker
func deleteNoUseWorker(m map[string]*g.Cluster) {
del := []string{}
for key, worker := range Workers {
if _, ok := m[key]; !ok {
worker.Drop()
del = append(del, key)
}
}
for _, key := range del {
delete(Workers, key)
}
}
## 创建集群监控Worker
func createWorkerIfNeed(m map[string]*g.Cluster) {
for key, item := range m {
if _, ok := Workers[key]; !ok {
if item.Step <= 0 {
log.Println("[W] invalid cluster(step <= 0):", item)
continue
}
worker := NewWorker(item) //为每个监控配置项,创建Worker
Workers[key] = worker
worker.Start() //执行与启动Worker
}
}
}
//Worker结构体
type Worker struct {
Ticker *time.Ticker //定时器
ClusterItem *g.Cluster //集群监控配置项
Quit chan struct{} //退出信号Channel
}
### 实例化Worker
func NewWorker(ci *g.Cluster) Worker {
w := Worker{}
w.Ticker = time.NewTicker(time.Duration(ci.Step) * time.Second) //集群监控配置定时器(秒)
w.Quit = make(chan struct{})
w.ClusterItem = ci
return w
}
### 执行与启动Worker
func (this Worker) Start() {
go func() {
for {
select {
case <-this.Ticker.C: //定时触发
WorkerRun(this.ClusterItem) //启动【参考详细分析】
case <-this.Quit: //停止信号
if g.Config().Debug {
log.Println("[I] drop worker", this.ClusterItem)
}
this.Ticker.Stop() //关闭定时
return
}
}
}()
}
func (this Worker) Drop() {
close(this.Quit) //回收worker channel
}
WorkerRun(this.ClusterItem) 启动与执行集群监控
【核心逻辑分析】
func WorkerRun(item *g.Cluster) {
debug := g.Config().Debug
//cleanParam去空格、回车换行符、制表符
numeratorStr := cleanParam(item.Numerator)
denominatorStr := cleanParam(item.Denominator)
//expressionValid表达式合法校验
if !expressionValid(numeratorStr) || !expressionValid(denominatorStr) {
log.Println("[W] invalid numerator or denominator", item)
return
}
//needCompute解析包含"$("为计算公式
needComputeNumerator := needCompute(numeratorStr)
needComputeDenominator := needCompute(denominatorStr)
if !needComputeNumerator && !needComputeDenominator {
log.Println("[W] no need compute", item)
return
}
//parse解析操作数、操作符、操作模式
numeratorOperands, numeratorOperators, numeratorComputeMode := parse(numeratorStr, needComputeNumerator)
denominatorOperands, denominatorOperators, denominatorComputeMode := parse(denominatorStr, needComputeDenominator)
//operatorsValid解析支持"+"or"-"计算操作符
if !operatorsValid(numeratorOperators) || !operatorsValid(denominatorOperators) {
log.Println("[W] operators invalid", item)
return
}
//通过组ID,使用API查询主机名
hostnames, err := sdk.HostnamesByID(item.GroupId)
if err != nil || len(hostnames) == 0 {
return
}
now := time.Now().Unix()
//API查询最后一次采集点(分子和分母的所有操作数)
valueMap, err := queryCounterLast(numeratorOperands, denominatorOperands, hostnames, now-int64(item.Step*2), now)
if err != nil {
log.Println("[E]", err, item)
return
}
var numerator, denominator float64
var validCount int
for _, hostname := range hostnames {
var numeratorVal, denominatorVal float64
var err error
//需求计算的分子进行compute操作
if needComputeNumerator {
numeratorVal, err = compute(numeratorOperands, numeratorOperators, numeratorComputeMode, hostname, valueMap)
if debug && err != nil {
log.Printf("[W] [hostname:%s] [numerator:%s] id:%d, err:%v", hostname, item.Numerator, item.Id, err)
} else if debug {
log.Printf("[D] [hostname:%s] [numerator:%s] id:%d, value:%0.4f", hostname, item.Numerator, item.Id, numeratorVal)
}
if err != nil {
continue
}
}
//需求计算的分母进行compute操作
if needComputeDenominator {
denominatorVal, err = compute(denominatorOperands, denominatorOperators, denominatorComputeMode, hostname, valueMap)
if debug && err != nil {
log.Printf("[W] [hostname:%s] [denominator:%s] id:%d, err:%v", hostname, item.Denominator, item.Id, err)
} else if debug {
log.Printf("[D] [hostname:%s] [denominator:%s] id:%d, value:%0.4f", hostname, item.Denominator, item.Id, denominatorVal)
}
if err != nil {
continue
}
}
if debug {
log.Printf("[D] hostname:%s numerator:%0.4f denominator:%0.4f per:%0.4f\n", hostname, numeratorVal, denominatorVal, numeratorVal/denominatorVal)
}
numerator += numeratorVal
denominator += denominatorVal
validCount += 1
}
//不需求计算的分子
if !needComputeNumerator {
if numeratorStr == "$#" {
numerator = float64(validCount)
} else {
numerator, err = strconv.ParseFloat(numeratorStr, 64)
if err != nil {
log.Printf("[E] strconv.ParseFloat(%s) fail %v, id:%d", numeratorStr, err, item.Id)
return
}
}
}
//不需要计算的分母
if !needComputeDenominator {
if denominatorStr == "$#" {
denominator = float64(validCount)
} else {
denominator, err = strconv.ParseFloat(denominatorStr, 64)
if err != nil {
log.Printf("[E] strconv.ParseFloat(%s) fail %v, id:%d", denominatorStr, err, item.Id)
return
}
}
}
if denominator == 0 {
log.Println("[W] denominator == 0, id:", item.Id)
return
}
if validCount == 0 {
log.Println("[W] validCount == 0, id:", item.Id)
return
}
if debug {
log.Printf("[D] hostname:all numerator:%0.4f denominator:%0.4f per:%0.4f\n", numerator, denominator, numerator/denominator)
}
sender.Push(item.Endpoint, item.Metric, item.Tags, numerator/denominator, item.DsType, int64(item.Step)) //将最后计算出来分子数与分母数比值以及相关信息push缓存入内存队列
}
### 分片解析配置分子或分母计算公式
func parse(expression string, needCompute bool) (operands []string, operators []string, computeMode string) {
if !needCompute {
return
}
// 字串分片
// 如:($(cpu.busy)+$(cpu.idle)-$(cpu.nice))>80
// Split后: 1 [(]
// 2 [$(cpu.busy)]
// 3 [+]
// 4 [$(cpu.idle)]
// 5 [-]
// 6 [)>80]
splitCounter, _ := regexp.Compile(`[\$\(\)]+`)
items := splitCounter.Split(expression, -1)
count := len(items)
for i, val := range items[1 : count-1] {
if i%2 == 0 {
operands = append(operands, val) //操作数(复数位置)
} else {
operators = append(operators, val) //操作符 (单数位置)
}
}
computeMode = items[count-1] // 计算模式[)>80]
return
}
###API查询主机名列表
func HostnamesByID(group_id int64) ([]string, error) {
uri := fmt.Sprintf("%s/api/v1/hostgroup/%d", g.Config().Api.PlusApi, group_id)
req, err := requests.CurlPlus(uri, "GET", "aggregator", g.Config().Api.PlusApiToken,
map[string]string{}, map[string]string{})
if err != nil {
return []string{}, err
}
type RESP struct {
HostGroup f.HostGroup `json:"hostgroup"`
Hosts []f.Host `json:"hosts"`
}
resp := &RESP{}
err = req.ToJson(&resp)
if err != nil {
return []string{}, err
}
hosts := []string{}
for _, x := range resp.Hosts {
hosts = append(hosts, x.Hostname)
}
return hosts, nil
}
### 查询最后一次采集点数据
func queryCounterLast(numeratorOperands, denominatorOperands, hostnames []string, begin, end int64) (map[string]float64, error) {
counters := []string{}
counters = append(counters, numeratorOperands...)
counters = append(counters, denominatorOperands...)
resp, err := sdk.QueryLastPoints(hostnames, counters) //API查询最后一次采集点数据
if err != nil {
return map[string]float64{}, err
}
ret := make(map[string]float64)
for _, res := range resp {
v := res.Value
if v.Timestamp < begin || v.Timestamp > end {
continue
}
ret[res.Endpoint+res.Counter] = float64(v.Value) //存放map
}
return ret, nil
}
#### API查询最后一次采集点数据
func QueryLastPoints(endpoints, counters []string) (resp []*cmodel.GraphLastResp, err error) {
cfg := g.Config()
uri := fmt.Sprintf("%s/api/v1/graph/lastpoint", cfg.Api.PlusApi) //HTTP请求URL
var req *httplib.BeegoHttpRequest
headers := map[string]string{"Content-type": "application/json"}
req, err = requests.CurlPlus(uri, "POST", "aggregator", cfg.Api.PlusApiToken, //HTTP POST请求定义
headers, map[string]string{})
if err != nil {
return
}
req.SetTimeout(time.Duration(cfg.Api.ConnectTimeout)*time.Millisecond,
time.Duration(cfg.Api.RequestTimeout)*time.Millisecond)
body := []*cmodel.GraphLastParam{}
for _, e := range endpoints {
for _, c := range counters {
body = append(body, &cmodel.GraphLastParam{e, c})
}
}
b, err := json.Marshal(body)
if err != nil {
return
}
req.Body(b)
err = req.ToJson(&resp) //调用与返回
if err != nil {
return
}
return resp, nil
}
### 计算公式结果
func compute(operands []string, operators []string, computeMode string, hostname string, valMap map[string]float64) (val float64, err error) {
count := len(operands)
if count == 0 {
return val, errors.New("counter not found")
}
vals := queryOperands(operands, hostname, valMap) //查询缓存的值
if len(vals) != count {
return val, errors.New("value invalid")
}
sum := vals[0]
for i, v := range vals[1:] {
if operators[i] == "+" {
sum += v
} else {
sum -= v
}
} // +-运算
//计算模式(逻辑运算)
if computeMode != "" {
if compareSum(sum, computeMode) {
val = 1
}
} else {
val = sum
}
return val, nil
}
### 计算模式字串解析和逻辑运算,返回真/假结果
func compareSum(sum float64, computeMode string) bool {
regMatch, _ := regexp.Compile(`([><=]+)([\d\.]+)`)
match := regMatch.FindStringSubmatch(computeMode) //分组正则取值
mode := match[1] //逻辑运算符字串
val, _ := strconv.ParseFloat(match[2], 64) //转数值
switch {
case mode == ">" && sum > val:
case mode == "<" && sum < val:
case mode == "=" && sum == val:
case mode == ">=" && sum >= val:
case mode == "<=" && sum <= val:
default:
return false
}
return true
}
### 依据主机名和操作数名称,查询缓存map的监控采集值(原API获取缓存)
func queryOperands(counters []string, endpoint string, valMap map[string]float64) []float64 {
ret := []float64{}
for _, counter := range counters {
if v, ok := valMap[endpoint+counter]; ok {
ret = append(ret, v)
}
}
return ret
}
### 分子/分母表达式合法性校验
func expressionValid(val string) bool {
// use chinese character?
if strings.Contains(val, "(") || strings.Contains(val, ")") {
return false
}
if val == "$#" {
return true
}
// e.g. $(cpu.busy)
// e.g. $(cpu.busy)+$(cpu.idle)-$(cpu.nice)
matchMode0 := `^(\$\([^\(\)]+\)[+-])*\$\([^\(\)]+\)$`
if ok, err := regexp.MatchString(matchMode0, val); err == nil && ok {
return true
}
// e.g. $(cpu.busy)>=80
matchMode1 := `^\$\([^\(\)]+\)(>|=|<|>=|<=)\d+(\.\d+)?$`
if ok, err := regexp.MatchString(matchMode1, val); err == nil && ok {
return true
}
// e.g. ($(cpu.busy)+$(cpu.idle)-$(cpu.nice))>80
matchMode2 := `^\((\$\([^\(\)]+\)[+-])*\$\([^\(\)]+\)\)(>|=|<|>=|<=)\d+(\.\d+)?$`
if ok, err := regexp.MatchString(matchMode2, val); err == nil && ok {
return true
}
// e.g. 纯数字
matchMode3 := `^\d+$`
if ok, err := regexp.MatchString(matchMode3, val); err == nil && ok {
return true
}
return false
}
## 结构化数据并放入队列顶部
func Push(endpoint, metric, tags string, val interface{}, counterType string, step_and_ts ...int64) {
md := MakeMetaData(endpoint, metric, tags, val, counterType, step_and_ts...) //格式化元数据
MetaDataQueue.PushFront(md) //入队列
}
### 格式化元数据
func MakeMetaData(endpoint, metric, tags string, val interface{}, counterType string, step_and_ts ...int64) *model.JsonMetaData {
md := model.JsonMetaData{
Endpoint: endpoint,
Metric: metric,
Tags: tags,
Value: val,
CounterType: counterType,
}
argc := len(step_and_ts)
if argc == 0 {
md.Step = 60
md.Timestamp = time.Now().Unix()
} else if argc == 1 {
md.Step = step_and_ts[0]
md.Timestamp = time.Now().Unix()
} else if argc == 2 {
md.Step = step_and_ts[0]
md.Timestamp = step_and_ts[1]
}
return &md //返回结构
}
sender.StartSender() 周期性从队列获取集群聚合值并通过Agent HTTP API POST Push到Transfer。
sender.PostPushUrl = g.Config().Api.PushApi
func StartSender() {
go startSender() //线程处理发送
}
func startSender() {
for {
L := MetaDataQueue.PopBack(LIMIT) //出列
if len(L) == 0 {
time.Sleep(time.Millisecond * 200) //等待周期
continue
}
err := PostPush(L) //发送数据
if err != nil {
log.Println("[E] push to transfer fail", err)
}
}
}
//:1988 Agent接口Push数据到Transfer
// 聚合与计算后的数据重新回到了Agent,继续后续的生命周期
func PostPush(L []*model.JsonMetaData) error {
bs, err := json.Marshal(L)
if err != nil {
return err
}
bf := bytes.NewBuffer(bs)
//HTTP POST请求
resp, err := http.Post(PostPushUrl, "application/json", bf)
if err != nil {
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body) //响应体读取
if err != nil {
return err
}
content := string(body)
if resp.StatusCode != 200 {
return fmt.Errorf("status code %d != 200, response: %s", resp.StatusCode, content)
}
if Debug {
log.Println("[D] response:", content)
}
return nil
}
技术经验借鉴
- 计算公式字串规则正则处理与实现
- 通过数据库内ID字段区间来做多实例处理数据