阿里云TimeStream系列--TimeStream聚合查询优化

ES在支持时序的典型查询,会遇到性能差,内存占用高的问题,针对时序场景典型查询,TimeStream增加了一个聚合算子time_series_aggregation以支持时序场景的典型查询,该聚合算子也成为TimeStream支持promQL的基础。

下面先来看下这个典型查询。

TSDB场景的查询,一般是先对时间线在一个时间窗口进行Downsampling计算(avg/max/min/sum/count/last/first),再对这些时间线进行聚合操作(Aggregator),比如按一些维度聚合后,求sum、max、avg。

image.png

比如以上图一个框的点数据为例,正确的做法是,先对一条线上的点进行Downsampling,再对时间线之间进行Aggregator,是一个两阶段操作。而ES目前对bucket的处理,只能把一个框的所有点,当成一样的数据做一次agg处理。

ES之前没有时间线id字段,无法完成对时间线间聚合的需求。ES TSDB功能可以自动生成_tsid用作时间线id,可以支持该需求。但是需要借助pipeline aggration实现,性能非常差。

这里再以具体数据说明下这个场景,假设一个分桶内有如下数据:

image.png

正确的做法是相对时间线做一次Downsampling处理,将一个分桶内的时刻点处理成一个点:

  • avg:a' = (a1+a2+a3)/3
  • first: a' = a1
  • last: a' = a3

然后再对downsample值进行聚合计算:

  • sum = a' + b' + c'
  • max = max(a', b', c')

但是ES agg算子只能对分桶数据做一次计算,比如sum的计算方式是:a1+a2+a3+b1+b2+b3+c1+c2+c3。

如果ES要支持这种功能,必须结合pipeline agg功能,这个会导致严重的查询问题。

比如上述的典型查询,用ES DSL描述为:

{
  "size": 0,
  "query": {
    "bool": {
      "filter": [
        {
          "range": {
            "@timestamp": {
              "from": xxx,
              "to": xxx
            }
          }
        },
        {
          // other query conditions
        }
      ]
    }
  },
  "aggregations": {
    "group": {
      "multi_terms": {
        "terms": [
          {
            "field": "dim1"
          },
          {
            "field": "dim2"
          },
          ......
        ]
      },
      "aggregations": {
        "time_bucket": {
          "date_histogram": {
            "field": "@timestamp",
            "interval": "1m"
          },
          "aggregations": {
            "tsid": {
              "time_series": {},
              "aggs": {
                "avg_value": {
                  "avg": {
                    "field": "metric"
                  }
                }
              }
            },
            "sum_value": {
              "sum_bucket": {
                "buckets_path": "tsid>avg_value"
              }
            }
          }
        }
      }
    }
  }
}

此DSL需要将时间线的全部数据,在协调节点进行聚合,才能进行下一步的multi_terms运算。

time_series_aggregation则是优化了这个场景,针对上面的DSL,使用time_series_aggregation的DSL语句如下:

"time_series_aggregation" : {
  "metric" : "xxx",
  "group" : [],
  "without" : [],
  "interval" : "10m",
  "offset" : "",
  "downsample" : {
    "range" : "10m",
    "function" : "sum"
  },
  "aggregator" : ""
}

time_series_aggregation主要思路来自于目前time_series collect数据时,为了保证按照indexing sort,在shard级别顺序遍历数据,ES实现了TimeSeriesIndexSearcher。

TimeSeriesIndexSearcher的思路类似归并排序,只是每个segment已经是排好序,省去了第一阶段。TimeSeriesIndexSearcher一次遍历全部的segment,然后比较每个segment的_tsid和@timestamp,按顺序取出最新的doc。

TimeSeriesIndexSearcher在遍历doc时已经从docvalue中获取了_tsid和@timestamp。所以在collect中,可以通过AggregationExecutionContext直接获取_tsid和@timestamp值,而不用再次从docvalue中获取。

然后_tsid是将全部维度的tagk,tagv序列化的值,所以反序列化_tsid可以获得时间线的全部维度。对time_series进行group by的时候,不用再去docvalue中获取其他列,可以直接从反序列化_tsid的结果中过滤。

然后由于是顺序遍历,所以在collect阶段遍历完一个_tsid,就可以直接进行downsample计算,然后将结果传递给aggregator算子,而不需要保留原始数据,这是不按_tsid做indexing sort做不到的。

这里要注意的地方就是虽然一个索引的_tsid只会在routing在一个shard中,但是多个索引是会出现重复_tsid,比如一次rollover后,_tsid就出现在前后2个索引中了。所以如果一个interval间隔跨了多个索引,这时候只能退化,datanode保留全部原始数据,在协调节点进行跟datanode一样的计算流程。这里time_series因为支持了start_time和end_time的时间分区,所以在进行分桶时,可以判断bucket是否会跨索引,如果bucket时间范围在start_time和end_time内,就能直接计算,如果跨了start_time和end_time,就保留tsid的原始数据,在协调节点计算。

性能优势

time_series_aggregation相比目前ES使用的aggs获取同样数据,性能优势主要来自于两点:

  • time_series_aggregation 直接从_tsid解析terms,而multi_terms需要从每个term列获取数据,节省了大量IO开销,同时获取docvalue的计算开销也少了。
  • time_series_aggregation可以顺序处理_tsid,所以除了跨索引的时间窗口,其他窗口的数据都能在datanode的collect阶段直接完成计算,而无需像pipeling agg那样,需要把全部的_tsid数据汇聚在协调节点计算。减少了大量内存,以及datanode build完整_tsid列表的过程。
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容