ES在支持时序的典型查询,会遇到性能差,内存占用高的问题,针对时序场景典型查询,TimeStream增加了一个聚合算子time_series_aggregation以支持时序场景的典型查询,该聚合算子也成为TimeStream支持promQL的基础。
下面先来看下这个典型查询。
TSDB场景的查询,一般是先对时间线在一个时间窗口进行Downsampling计算(avg/max/min/sum/count/last/first),再对这些时间线进行聚合操作(Aggregator),比如按一些维度聚合后,求sum、max、avg。
比如以上图一个框的点数据为例,正确的做法是,先对一条线上的点进行Downsampling,再对时间线之间进行Aggregator,是一个两阶段操作。而ES目前对bucket的处理,只能把一个框的所有点,当成一样的数据做一次agg处理。
ES之前没有时间线id字段,无法完成对时间线间聚合的需求。ES TSDB功能可以自动生成_tsid用作时间线id,可以支持该需求。但是需要借助pipeline aggration实现,性能非常差。
这里再以具体数据说明下这个场景,假设一个分桶内有如下数据:
正确的做法是相对时间线做一次Downsampling处理,将一个分桶内的时刻点处理成一个点:
- avg:a' = (a1+a2+a3)/3
- first: a' = a1
- last: a' = a3
然后再对downsample值进行聚合计算:
- sum = a' + b' + c'
- max = max(a', b', c')
但是ES agg算子只能对分桶数据做一次计算,比如sum的计算方式是:a1+a2+a3+b1+b2+b3+c1+c2+c3。
如果ES要支持这种功能,必须结合pipeline agg功能,这个会导致严重的查询问题。
比如上述的典型查询,用ES DSL描述为:
{
"size": 0,
"query": {
"bool": {
"filter": [
{
"range": {
"@timestamp": {
"from": xxx,
"to": xxx
}
}
},
{
// other query conditions
}
]
}
},
"aggregations": {
"group": {
"multi_terms": {
"terms": [
{
"field": "dim1"
},
{
"field": "dim2"
},
......
]
},
"aggregations": {
"time_bucket": {
"date_histogram": {
"field": "@timestamp",
"interval": "1m"
},
"aggregations": {
"tsid": {
"time_series": {},
"aggs": {
"avg_value": {
"avg": {
"field": "metric"
}
}
}
},
"sum_value": {
"sum_bucket": {
"buckets_path": "tsid>avg_value"
}
}
}
}
}
}
}
}
此DSL需要将时间线的全部数据,在协调节点进行聚合,才能进行下一步的multi_terms运算。
time_series_aggregation则是优化了这个场景,针对上面的DSL,使用time_series_aggregation的DSL语句如下:
"time_series_aggregation" : {
"metric" : "xxx",
"group" : [],
"without" : [],
"interval" : "10m",
"offset" : "",
"downsample" : {
"range" : "10m",
"function" : "sum"
},
"aggregator" : ""
}
time_series_aggregation主要思路来自于目前time_series collect数据时,为了保证按照indexing sort,在shard级别顺序遍历数据,ES实现了TimeSeriesIndexSearcher。
TimeSeriesIndexSearcher的思路类似归并排序,只是每个segment已经是排好序,省去了第一阶段。TimeSeriesIndexSearcher一次遍历全部的segment,然后比较每个segment的_tsid和@timestamp,按顺序取出最新的doc。
TimeSeriesIndexSearcher在遍历doc时已经从docvalue中获取了_tsid和@timestamp。所以在collect中,可以通过AggregationExecutionContext直接获取_tsid和@timestamp值,而不用再次从docvalue中获取。
然后_tsid是将全部维度的tagk,tagv序列化的值,所以反序列化_tsid可以获得时间线的全部维度。对time_series进行group by的时候,不用再去docvalue中获取其他列,可以直接从反序列化_tsid的结果中过滤。
然后由于是顺序遍历,所以在collect阶段遍历完一个_tsid,就可以直接进行downsample计算,然后将结果传递给aggregator算子,而不需要保留原始数据,这是不按_tsid做indexing sort做不到的。
这里要注意的地方就是虽然一个索引的_tsid只会在routing在一个shard中,但是多个索引是会出现重复_tsid,比如一次rollover后,_tsid就出现在前后2个索引中了。所以如果一个interval间隔跨了多个索引,这时候只能退化,datanode保留全部原始数据,在协调节点进行跟datanode一样的计算流程。这里time_series因为支持了start_time和end_time的时间分区,所以在进行分桶时,可以判断bucket是否会跨索引,如果bucket时间范围在start_time和end_time内,就能直接计算,如果跨了start_time和end_time,就保留tsid的原始数据,在协调节点计算。
性能优势
time_series_aggregation相比目前ES使用的aggs获取同样数据,性能优势主要来自于两点:
- time_series_aggregation 直接从_tsid解析terms,而multi_terms需要从每个term列获取数据,节省了大量IO开销,同时获取docvalue的计算开销也少了。
- time_series_aggregation可以顺序处理_tsid,所以除了跨索引的时间窗口,其他窗口的数据都能在datanode的collect阶段直接完成计算,而无需像pipeling agg那样,需要把全部的_tsid数据汇聚在协调节点计算。减少了大量内存,以及datanode build完整_tsid列表的过程。