对于已经使用过 InfluxDB 的朋友来说,连续查询也不是什么新鲜概念了,但是对于刚开始使用它的朋友来说,连续查询还需要从头到尾撸一遍。
一、什么是连续查询
连续查询 Continuous queries(CQ) 是 InfluxQL 的一种查询类型。它会按照用户指定的查询规则,自动地、周期地查询实时数据并执行指定运算,然后将查询结果保存在一张指定的表中。
通过创建连续查询,用户可以指定InfluxDB执行连续查询的时间间隔、单次查询的时间范围以及查询规则。InfluxDB会根据用户指定的规则,定期地将过去一段时间内的原始时序数据以用户所期望的方式保存至新的结果表中,从而降低存储数据的时间精度,大大减少新表的数据量。同时,将查询结果保存在指定的数据表中,也便于用户直接查询所关心的内容,从而降低查询的运算复杂度,提升查询效率。
InfluxDB原理与实战
通过上面两段的描述,连续查询有点类似于关系数据库中的视图,连续查询和视图都可以简化查询、提高查询效率,但和视图又不完全一样。
关系数据库中的视图是一种虚拟存在的表:
数据库中只存放了视图的定义,而没有存放视图中的数据,这些数据存放在原来的表中;因此视图不占存储空间;
使用视图查询数据时,数据库系统会从原来的表中取出对应的数据;
视图中的数据依赖于原来表中的数据,一旦表中数据发生改变,显示在视图中的数据也会发生改变。
时序数据库中的连续查询:
连续查询会生成新的表;因此会占用存储空间;
使用连续查询查询数据时,会从连续查询生成的表查询数据;
由于不依赖原来表中的数据,一旦原来表中的数据发生改变(被删除),连续查询表中的数据不会跟着改变;
连续查询生成的表也可制定存储策略,不指定则使用默认的存储策略。
二、连续查询的使用场景
使用连续查询是最优的降低采样率的方式,连续查询和存储策略搭配使用将会大大降低InfluxDB的存储空间的占用量,使其维持在一个相对稳定的水平。而且使用连续查询后,数据会存放到指定的数据表中,这样就为以后统计不同精度的数据提供了便利。
比如说,要做一个曲线图,这个曲线图需要按照秒、分、小时对收集到的测量值求平均值/最大值/最小值,查询指定时间段内的,且一查就是几百条甚至上千条,把查询出来的数据在曲线图上展示。这样的场景就特别适合连续查询。
三、连续查询的语法
1、创建连续查询
1.1 基本语法
# 基本语法
CREATE CONTINUOUS QUERY <cq_name> ON <database_name>
BEGIN
<cq_query>
END
cq_name :该条连续查询的名字;database_name:连续查询所在数据库的名字;
cq_query:具体的连续查询语句,cq_query语法是InfluxQL协议格式的。
# cq_query 语法
SELECT <function[s]> INTO <destination_measurement>
FROM <measurement> [WHERE <stuff>]
GROUP BY time(<interval>)[,<tag_key[s]>]
function[s]:要查询的字段及数据处理的内置函数,可以是聚合函数,也可以是选择函数,看业务需要;
destination_measurement:保存查询结果的目标表;若目标表不存在,InfluxDB自动创建;目标表不可重复,否则会报错;
measurement:连续查询语句所查询的目标表;
stuff:具体的查询条件,可选参数;
interval:连续查询语句执行的时间间隔与查询的时间范围;
tag_key[s]:归类的标签字段,可选参数。
1.2 高级语法
# 高级语法
CREATE CONTINUOUS QUERY <cq_name> ON <database_name>
RESAMPLE EVERY <interval> FOR <interval>
BEGIN
<cq_query>
END
RESAMPLE EVERY :采样执行频次。如RESAMPLE EVERY 30m:表示30分钟执行一次。RESAMPLE FOR :采样时间范围。如RESAMPLE FOR 60m:时间范围 = now() - for间隔(60m)。RESAMPLE EVERY 30m FOR 60m:表示每30分钟执行一次60分钟内的数据计算。
如果 FOR
间隔小于执行的时间间隔,就会报错;所以必须大于等于时间间隔。
2、查询连续查询
# 查询连续查询
SHOW CONTINUOUS QUERIES
3、删除连续查询
# 删除连续查询
DROP CONTINUOUS QUERY <cq_name> ON <database_name>
注意:连续查询一旦创建就不能修改,需要删除之后重新创建。
四、简单使用
我们有一个现成的 InfluxQL 语句,它是这么写的:
select max(*::field) from mtest where companyId='002' and deviceId='002' group by time(1m) fill(0) order by time desc limit 10
从 mtest 中查询,查询条件 tag:companyId 和 deviceId 都为 002,找出1分钟为一组的最大值,最近的 10 条记录,没有数据的填充0。
1、先整理出 cq_query,并查询成功
# cq_query
select max(*::field) from mtest group by time(1m),* fill(0)
由于查询条件companyId和deviceId是可变的,所以先去掉,放到连续查询里加;
为了减少查询量,暂时加上 limit 10,查询成功后再去掉。
2、创建连续查询
CREATE CONTINUOUS QUERY "cq_name" ON "test"
RESAMPLE EVERY 10s FOR 2m
BEGIN
select max(*::field) into "cq_mtest" from mtest group by time(1m),* fill(0)
END
创建连续查询,每隔10s执行一次,统计2分钟之内的数据,这里是为了方便测试。
创建后查询一下,确认生成新的measurement:
如果没有生成一个新的measurement,这说明创建连续查询失败。
3、通过创建的连续查询查询数据
select * from cq_mtest where companyId='002' and deviceId='002' order by time desc limit 10;
连续查询结果图,没有数据的直接填充0。
4、查询已经存在的连续查询
# 查询连续查询
SHOW CONTINUOUS QUERIES
查看已经创建的连续查询:
连续查询也是有存储策略的,在创建时不标注时使用默认的存储策略。
5、删除连续查询
# 删除连续查询
DROP CONTINUOUS QUERY cq_name ON test
连续查询需要和存储策略结合使用,如果使用数据库默认的存储策略,上面示例的连续查询将会创建失败,这是需要注意的地方。
官方文档:https://docs.influxdata.com/influxdb/v1.8/query_language/continuous_queries