一、问题背景
当写入大量的原始数据后,你可能经常希望查询降精度采样后的数据以进行查看或者分析。在某些场景下,这一降精度采样的数据在未来可能需要多次使用到,而且重复计算相同的汇总是比较浪费的。InfluxDB提供了一个高级特性连续查询Continuous Query,它允许你预先计算好这些代价昂贵的查询到另外一个时间序列数据库中。
InfluxDB提供了数据保留策略RP(Retention Policy),以保证数据库中存储的数据量维持在一定数量级。对于超过过期规则规定的数据会被定时删除。 如果我们不想完全将这些数据删除掉,而是通过某种方式将其长时间保留下来,就需要使用连续查询CQ(Continuous Queries),将数据保存到另外一个拥有更长保留策略的表中存储。因此,连续查询CQ通常配合数据保留策略RP一起使用。
连续查询经典的使用场景是,将原始数据表中的数据,通过将精度采样后存储到一个保存时间更长的表中,在保证存储占用率低的同时,也保证了老数据的有迹可循,尽管这些老数据的精度不是很高,但是也可以根据系统实际需求扩展出按小时采样后的表、按天采样后的表以及按月采样后的表,已达到不同业务对于不同精度数据长时间保留的需求。
下面举一些简单的案例,都是将A表中的数据查询出来之后存储到B表中:
select percentile(value, 95) from response_times group by time(5m)
into response_times.percentiles.5m.95
select count(type) from events group by time(10m), type
into events.count_per_type.10m
二、基本概念
2.1 定义
InfluxDB的连续查询是在数据库中自动定时启动的一组语句,语句中必须包含 SELECT
关键词和 GROUP BY time()
关键词。 InfluxDB会将查询结果放在指定的数据表中。
2.2 目的
使用连续查询是最优的降低采样率的方式,连续查询和存储策略搭配使用将会大大降低InfluxDB的存储空间的占用量,使其维持在一个相对稳定的水平。 而且使用连续查询后,数据会存放到指定的数据表中,这样就为以后统计不同精度的数据提供了便利。
三、命令说明
3.1 常用指令
- 查看所有已经存在的所有连续查询CQ:
SHOW CONTINUOUS QUERIES
说明:InfluxDB的java SDK中暂未提供判断某个CQ是否存在的方法,可以先通过上述执行进行查询,再在结果集中进行比较判断。下面提供一个具体实现的Demo:
public boolean continuousQueryExists(String database, String cqName) {
String sql = "SHOW CONTINUOUS QUERIES";
QueryResult result = query(sql);
List<QueryResult.Series> seriesList = result.getResults().get(0).getSeries();
if (seriesList != null) {
for (QueryResult.Series series : seriesList) {
if (database.equals(series.getName())) {
List<List<Object>> continuousQueryList = series.getValues();
if (continuousQueryList == null) {
return false;
} else {
for (List<Object> queryResult : continuousQueryList) {
if (cqName.equals(queryResult.get(0))) {
return true;
}
}
}
}
}
}
return false;
}
- 创建一个新的CQ
CREATE CONTINUOUS QUERY <cq_name> ON <database_name>
[RESAMPLE [EVERY <interval>] [FOR <interval>]]
BEGIN SELECT <function>(<stuff>)[,<function>(<stuff>)] INTO <different_measurement>
FROM <current_measurement> [WHERE <stuff>] GROUP BY time(<interval>)[,<stuff>]
END
- 删除一个指定的CQ:
DROP CONTINUOUS QUERY <cq_name> ON <database_name>
四、项目实战
下面,我们以一个真实的项目场景来介绍CQ具体如何使用,相关内容来自于本人实际工作中相关开发经历的抽象总结。
首先,以命令行的形式在数据库testDB
上新建一个名为rp_5_year的RP,,保存时间为5年:
CREATE RETENTION POLICY "rp_5_year" ON "testDB" DURATION 1825d REPLICATION 3 SHARD DURATION 7d;
接下来创建连续查询CQ,命令行语句如下:
CREATE CONTINUOUS QUERY cq_{measurement} ON testDB RESAMPLE EVERY 1h FOR 2h
BEGIN
SELECT mean(*) INTO testDB.rp_5_year.{measurement}_hour FROM testDB.default_policy.{measurement} GROUP BY time(1h),* FILL(none)
END
说明:
cq执行时间和你group by(interval)中的interval有关,执行时间点是interval的倍数时间点,例:现在13:02,interval是1h, 第一次执行就是时间就是14:00, 如果interval是1d,第一次执行时间点就是6/1 00:00 。
这里for 参数设置为2h是为了解决延迟到达的数据在计算点存在偏差,第二次计算写入时通过覆盖的方式进行数据订正。
查询备份数据的时候,因为备份数据使用了非默认的RP,因此在查询的时候一定要指定RP,否则查不出数据,例如:
select * from ncm.rp_5_year..{measurement}
实际使用中,我们将一些常用的操作封装成service层的方法进行调用,下面给出封装后的service部分的代码:
接口定义:TSDBservice.java
/**
* 创建连续查询CQ
*
* @param measurement 备份表名
*/
void createContinuousQuery(String measurement);
/**
* 封装默认数据库中指定表的默认连续查询是否存在
*
* @param measurement 待查询的数据表
* @return
*/
boolean continuousQueryExists(String measurement);
/**
* 封装指定数据库上的连续查询是否存在
*
* @param database 待查询的数据库
* @param cqName 连续查询CQ名称
* @return
*/
boolean continuousQueryExists(String database, String cqName);
/**
* 删除连续查询
*
* @param databaseName 待查询的数据库
* @param cqName 连续查询CQ名称
*/
void dropContinuousQuery(String databaseName, String cqName);
方法实现:TSDBserviceImpl.java
@Override
public void createContinuousQuery(String measurement) {
String cqName = String.format("%s_%s", CONTINUOUS_QUERY_NAME_PREFIX, measurement);
String originMeasurement = String.format("\"%s\".\"%s\".\"%s\"", database, defaultRetentionPolicy, measurement);
String cqMeasurement = String.format("\"%s\".\"%s\".\"%s_hour\"", database, extensionRetentionPolicy, measurement);
String sql = String.format("CREATE CONTINUOUS QUERY \"%s\" ON \"%s\" RESAMPLE EVERY 1h FOR 2h BEGIN SELECT MEAN(*) INTO %s FROM %s GROUP BY time(1h),* FILL(none) END",
cqName, database, cqMeasurement, originMeasurement);
this.query(sql);
}
@Override
public boolean continuousQueryExists(String measurement) {
String cqName = String.format("cq_%s", measurement);
return continuousQueryExists(database, cqName);
}
@Override
public boolean continuousQueryExists(String database, String cqName) {
String sql = "SHOW CONTINUOUS QUERIES";
QueryResult result = query(sql);
List<QueryResult.Series> seriesList = result.getResults().get(0).getSeries();
if (seriesList != null) {
for (QueryResult.Series series : seriesList) {
if (database.equals(series.getName())) {
List<List<Object>> continuousQueryList = series.getValues();
if (continuousQueryList == null) {
return false;
} else {
for (List<Object> queryResult : continuousQueryList) {
if (cqName.equals(queryResult.get(0))) {
return true;
}
}
}
}
}
}
return false;
}
@Override
public void dropContinuousQuery(String databaseName, String cqName) {
String sql = String.format("DROP CONTINUOUS QUERY \"%s\" ON \"%s\"", cqName, databaseName);
this.query(sql);
}
最佳实践
在InfluxDB中,通常将数据保留策略RP和连续查询CQ结合起来使用会达到更好的效果。例如,将写入的原始数据表A的保留时间定为三个月,设定一个CQ,按照一个小时聚合后转存到另外一张表B中,该表的数据保留时间为一年。在上层查询的时候做一个根据查询起始时间为主要判断依据的查询路由,如果查询三个月内的数据就去查询表A,否则就去查询表B。这样实现高低搭配,以满足不同场景下的查询需求。