说在前面: 这个list记录了博主在学习IoTDB[1]期间的总结和思考,欢迎一起讨论学习哈
相关介绍可以参考list的第一篇博客:[IoTDB 学习笔记] [part1] 介绍
TsFile 基本结构
TsFile 是 IoTDB 的底层数据文件,一种专门为时间序列数据设计的列式文件格式。TsFile的设计细节具体可以参考[2]。
总的来说,TsFile文件主要包含数据(Chunk
)和元数据(Metadata
)两个部分。可以认为对于我们观测的设备网络中,每个设备包含若干个测点,每个测点包含一个时间序列Timeseries
,时间序列记录了该测点各时间戳下的值,时间序列的值记录在Chunk
当中;对应的,其中的元数据也分为三部分:ChunkMetadata
,TimeseriesMetadata
和TsFileMetadata
。一个TsFile
的示例结构如图 1 [2]所示,该文件包含两个设备d1与d2,每个设备包含三个测点s1,s2和s3,每个设备的每个测点对应一个时间序列,每个时间序列包含两个Chunk
。
查询流程
以查 d1.s1 为例
- 反序列化
TsFileMetadata
,得到 d1.s1 的TimeseriesMetadata
的位置- 反序列化得到 d1.s1 的
TimeseriesMetadata
- 根据 d1.s1 的
TimeseriesMetadata
,反序列化其所有ChunkMetadata
- 根据 d1.s1 的每一个
ChunkMetadata
,读取其Chunk
数据
每个设备中的数据存储在一个ChunkGroup
中,每个ChunkGroup
由若干Chunk
组成,Chunk
由ChunkHeader
和若干Page
一起构成。
所有元数据索引节点构成了一颗元数据索引树,包含传感器索引层以及可能的设备索引层,各层包含内部节点和叶节点两种类型的节点,分别为:INTERNAL_MEASUREMENT
,LEAF_MEASUREMENT
,INTERNAL_DEVICE
,LEAF_DEVICE
。其中LEAF_MEASUREMENT
指向TimeseriesMetadata
,如图 2 [2]所示。
max_degree_of_index_node
为索引树节点的最大子节点数目。当总设备数目不超过max_degree_of_index_node
时,不存在设备索引层,如图 3 [2]所示。
写流程
TsFile 的写入流程如图 4 [3]所示:
其中,文件的写入主要分为三种操作,在图上用 1、2、3 标注:
1、写内存缓冲区
2、持久化ChunkGroup
3、关闭文件
写内存缓冲区
TsFile 文件层的写入接口有两种:
- 写入一个设备一个时间戳的多个测点
- 写入一个设备多个时间戳的多个测点
当调用写接口时,这个设备的数据会交给对应的 ChunkGroupWriter
,其中的每个测点会交给对应的 ChunkWriter
进行写入。ChunkWriter
完成编码和打包(生成 Page
)
持久化 ChunkGroup
当内存中的数据达到一定阈值,会触发持久化操作。每次持久化会把当前内存中所有设备的数据全部持久化到磁盘的 TsFile 文件中。每个设备对应一个 ChunkGroup
,每个测点对应一个 Chunk
。持久化完成后会在内存中缓存对应的元数据信息,以供查询和生成文件尾部 metadata。
关闭文件
根据内存中缓存的元数据,生成 TsFileMetadata
追加到文件尾部,最后关闭文件。
正如 TsFile 基本结构 中所述,生成 TsFileMetadata
的过程中比较重要的一步是建立元数据索引 (MetadataIndex
) 树,以使得检索时间序列数据时可以不用读取所有的TimeseriesMetadata
以减少 I/O 操作。
建立元数据索引树的算法大致如下:
- 从索引树的底层开始构建。在传感器索引层,对于每个设备,我们首先初始化其叶节点(类型为
LEAF_MEASUREMENT
),然后对于每个TimeseriesMetadata
,在序列化后,将其加入叶节点中,当一个节点中的TimeseriesMetadata
达到MAX_DEGREE_OF_INDEX_NODE
后,将这个节点加入设备queue
中,并重新初始化一个叶节点用于继续存放接下来的TimeseriesMetadata
,然后重复这个过程,直到所有TimeseriesMetadata
全部添加完成。然后对于queue
中的叶节点,逐层地生成上层节点(INTERNAL_MEASUREMENT
),具体方法与上述方法类似,每个上层节点中最多包含MAX_DEGREE_OF_INDEX_NODE
个子节点,不断重复地生成上级节点,直到仅剩一个节点时,即为需要的根节点。 - 然后,判断设备数目是否超过
MAX_DEGREE_OF_INDEX_NODE
,如若未超过,则直接生成元数据索引树的根节点(子节点集包含 1 中每个设备对应的根节点序列化后转化成的索引项);如若超过,则需要生成设备索引层级,具体方法与 1 类似,在初始化queue
后,先生成类型为LEAF_DEVICE
的节点,每个节点最多包含MAX_DEGREE_OF_INDEX_NODE
个子级索引。然后按照和 1 类似的方法逐层生成上层节点,直到生成最终的根节点。
源码解析
此处引用的为v0.12.0[4]的代码
接口调用
利用TSRecord写
IoTDB 提供了一个TSRecord工具,TSRecord记录了一个设备在一个时间戳下的若干测点信息。
直接写入新文件时
// 生成 TsFile 存储文件,path为对应的disk存储路径
File f = FSFactoryProducer.getFSFactory().getFile(path);
// 生成 tsfile writer
try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {
// 在 tsfile writer 中注册测点信息 这里 包含了 0,1,2,3 四个设备,每个设备包含 1,2,3 3 个测点
for (int i = 0; i < 4; i++) {
tsFileWriter.registerTimeseries(
new Path(Constant.DEVICE_PREFIX + i, Constant.SENSOR_1),
new MeasurementSchema(Constant.SENSOR_1, TSDataType.INT64, TSEncoding.RLE));
tsFileWriter.registerTimeseries(
new Path(Constant.DEVICE_PREFIX + i, Constant.SENSOR_2),
new MeasurementSchema(Constant.SENSOR_2, TSDataType.INT64, TSEncoding.RLE));
tsFileWriter.registerTimeseries(
new Path(Constant.DEVICE_PREFIX + i, Constant.SENSOR_3),
new MeasurementSchema(Constant.SENSOR_3, TSDataType.INT64, TSEncoding.RLE));
}
// 生成时间序列信息,设备0在时间戳为0,4,8..时各测点有值,均与时间戳的值相同,设备1,2,3分别在时间戳为1,5,9..;2,6,10..;3,7,11时各测点有值,且均与时间戳的值相同
for (int i = 0; i < 100; i++) {
// 生成设备(i % 4)在时间戳为i时的记录
TSRecord tsRecord = new TSRecord(i, Constant.DEVICE_PREFIX + (i % 4));
// 生成时间戳为i时设备(i % 4)的各测点的信息
DataPoint dPoint1 = new LongDataPoint(Constant.SENSOR_1, i);
DataPoint dPoint2 = new LongDataPoint(Constant.SENSOR_2, i);
DataPoint dPoint3 = new LongDataPoint(Constant.SENSOR_3, i);
// 将生成的测点信息加入设备记录
tsRecord.addTuple(dPoint1);
tsRecord.addTuple(dPoint2);
tsRecord.addTuple(dPoint3);
// 将设备(i % 4)在时间戳为i时的记录加入tsFile中
tsFileWriter.write(tsRecord);
}
} catch (Exception e) {
logger.error("meet error in TsFileWrite ", e);
}
续写TsFile时
// 基本流程与直接写入新文件时一致,不同的是在生成writer的时候使用:
// f 为读入的TsFile文件:File f = FSFactoryProducer.getFSFactory().getFile(path);
// 为了续写f中存储的TsFile,这里使用了ForceAppendTsFileWriter,初始化的时候读取了f中存储的TsFile相关信息
ForceAppendTsFileWriter fwriter = new ForceAppendTsFileWriter(f);
// 截取f中truncatePosition及之前的内容(截去尾部元数据),以便添加新的内容
fwriter.doTruncate();
利用tablet写
IoTDB 提供了一个tablet工具,tablet记录了一个设备的多个测点的信息,按照一种表格的形式表示,这些测点具有相同的时间戳序列,因此可以应用在测点具有相同时间戳序列(每个时间戳下各个测点都具有值)的设备中。
// 生成 TsFile 存储文件,path为对应的disk存储路径
File f = FSFactoryProducer.getFSFactory().getFile(path);
if (f.exists() && !f.delete()) {
throw new RuntimeException("can not delete " + f.getAbsolutePath());
}
// 初始化模式
Schema schema = new Schema();
// 此处展示了10个测点(一个设备),每个测点1000000个时间序列值的例子
String device = Constant.DEVICE_PREFIX + 1;
String sensorPrefix = "sensor_";
int rowNum = 1000000;
int sensorNum = 10;
// 用于初始化Tablet
List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
// 将测点加入模式
for (int i = 0; i < sensorNum; i++) {
IMeasurementSchema measurementSchema =
new MeasurementSchema(sensorPrefix + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF);
measurementSchemas.add(measurementSchema);
// 用于初始化TsFilewriter时注册各测点
schema.registerTimeseries(
new Path(device, sensorPrefix + (i + 1)),
new MeasurementSchema(sensorPrefix + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF));
}
// 生成TsFileWriter,并注册各测点
try (TsFileWriter tsFileWriter = new TsFileWriter(f, schema)) {
// 初始化Tablet
Tablet tablet = new Tablet(device, measurementSchemas);
long[] timestamps = tablet.timestamps;
Object[] values = tablet.values;
// 生成的时间序列初始时间戳以及初始值(各时间戳下的测点的值随着时间戳的增长而增长)
long timestamp = 1;
long value = 1000000L;
// 为Tablet赋值
for (int r = 0; r < rowNum; r++, value++) {
// 记录时间戳的序列,每次增加1(其值为row),value也每次加1
int row = tablet.rowSize++;
timestamps[row] = timestamp++;
// 将value赋给每个sensor的当前时间戳(row)的位置
for (int i = 0; i < sensorNum; i++) {
long[] sensor = (long[]) values[i];
sensor[row] = value;
}
// 当时间戳数目达到阈值(tablet.getMaxRowNumber())时,将当前的tablet写入TsFile,并清空tablet以继续重复该流程来将剩余时间序列信息写入tsfile
if (tablet.rowSize == tablet.getMaxRowNumber()) {
tsFileWriter.write(tablet);
tablet.reset();
}
}
// 如果最后还有没有写入TsFile的内容,将其写入
if (tablet.rowSize != 0) {
tsFileWriter.write(tablet);
tablet.reset();
}
}
流程解析
在写内存缓冲区部分中,我们了解到在写入TsFile时具有两种接口:
- 写入一个设备一个时间戳多个测点
- 写入一个设备多个时间戳多个测点
分别对应上述展示的接口调用部分中的基于TSRecord和Tablet的写入方法。
我们了解到,当调用 write 接口时,这个设备的数据会交给对应的 ChunkGroupWriter,其中的每个测点会交给对应的 ChunkWriter 进行写入。ChunkWriter 完成编码和打包(生成 Page)。
查看TsFile writer 中写入方法write,我们发现:
写内存缓冲区
// 用于基于TSRecord的写入方法
public boolean write(TSRecord record) throws IOException, WriteProcessException {
// 确保对应的groupwriter和chunkwriter存在,不存在则生成
checkIsTimeSeriesExist(record);
// 获取所要写入的设备的groupwriter,并调用其来写入所要写入的datapoint
groupWriters.get(record.deviceId).write(record.time, record.dataPointList);
++recordCount;
return checkMemorySizeAndMayFlushChunks();
}
// 用于基于Tablet的写入方法
public boolean write(Tablet tablet) throws IOException, WriteProcessException {
// 确保对应的groupwriter和chunkwriter存在,不存在则生成
checkIsTimeSeriesExist(tablet);
// 获取所要写入的设备的groupwriter,并调用其来写入所要导入的Tablet
groupWriters.get(tablet.deviceId).write(tablet);
recordCount += tablet.rowSize;
return checkMemorySizeAndMayFlushChunks();
}
持久化 ChunkGroup
// 检查占用的内存是否超过阈值(chunkGroupSizeThreshold),如若超过,则调用flushAllChunkGroups()将其存至OutputStream中,并清空各group writer及其下chunk writer中数据,以进行剩余数据的输入
// 返回false表示所占存储空间未达阈值,true则表示达到
private boolean checkMemorySizeAndMayFlushChunks() throws IOException {
// recordCountForNextMemCheck为data point数目阈值的最小值,此处初步放缩估计内存,以进行加速
if (recordCount >= recordCountForNextMemCheck) {
//
long memSize = calculateMemSizeForAllGroup();
assert memSize > 0;
if (memSize > chunkGroupSizeThreshold) {
LOG.debug("start to flush chunk groups, memory space occupy:{}", memSize);
// 更新recordCountForNextMemCheck
recordCountForNextMemCheck = recordCount * chunkGroupSizeThreshold / memSize;
// 存至OutputStream,并清空writer数据
return flushAllChunkGroups();
} else {
// 更新recordCountForNextMemCheck
recordCountForNextMemCheck = recordCount * chunkGroupSizeThreshold / memSize;
return false;
}
}
return false;
}
// 存至OutputStream,并清空各group writer及其下chunk writer中数据,以进行剩余数据的输入
public boolean flushAllChunkGroups() throws IOException {
if (recordCount > 0) {
for (Map.Entry<String, IChunkGroupWriter> entry : groupWriters.entrySet()) {
String deviceId = entry.getKey();
IChunkGroupWriter groupWriter = entry.getValue();
// 载入当前设备
fileWriter.startChunkGroup(deviceId);
long pos = fileWriter.getPos();
// 转移该设备下的pages
long dataSize = groupWriter.flushToFileWriter(fileWriter);
if (fileWriter.getPos() - pos != dataSize) {
throw new IOException(
String.format(
"Flushed data size is inconsistent with computation! Estimated: %d, Actual: %d",
dataSize, fileWriter.getPos() - pos));
}
fileWriter.endChunkGroup();
}
// 清空其余相关记录
reset();
}
return false;
}
关闭文件
fileWriter.endFile() 中,生成了path (测点) 到chunkMetadata 的映射 chunkMetadataListMap,flushMetadataIndex() 中基于 chunkMetadataListMap 生成了设备到TimeseriesMetadata 的映射 deviceTimeseriesMetadataMap。
public void close() throws IOException {
LOG.info("start close file");
// 写入剩余的ChunkGroup
flushAllChunkGroups();
// 根据内存中缓存的元数据,生成 TsFileMetadata 追加到文件尾部
fileWriter.endFile();
}
private MetadataIndexNode flushMetadataIndex(Map<Path, List<IChunkMetadata>> chunkMetadataListMap)
throws IOException {
deviceTimeseriesMetadataMap = new LinkedHashMap<>();
// device -> TimeseriesMetaDataList 的映射
for (Map.Entry<Path, List<IChunkMetadata>> entry : chunkMetadataListMap.entrySet()) {
Path path = entry.getKey();
String device = path.getDevice();
// 生成 TimeseriesMetaData
PublicBAOS publicBAOS = new PublicBAOS();
TSDataType dataType = entry.getValue().get(entry.getValue().size() - 1).getDataType();
Statistics seriesStatistics = Statistics.getStatsByType(dataType);
int chunkMetadataListLength = 0;
boolean serializeStatistic = (entry.getValue().size() > 1);
// flush chunkMetadataList
for (IChunkMetadata chunkMetadata : entry.getValue()) {
if (!chunkMetadata.getDataType().equals(dataType)) {
continue;
}
chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS, serializeStatistic);
seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
}
TimeseriesMetadata timeseriesMetadata =
new TimeseriesMetadata(
(byte)
((serializeStatistic ? (byte) 1 : (byte) 0) | entry.getValue().get(0).getMask()),
chunkMetadataListLength,
path.getMeasurement(),
dataType,
seriesStatistics,
publicBAOS);
deviceTimeseriesMetadataMap
.computeIfAbsent(device, k -> new ArrayList<>())
.add(timeseriesMetadata);
}
return MetadataIndexConstructor.constructMetadataIndex(deviceTimeseriesMetadataMap, out);
}
然后通过MetadataIndexConstructor.constructMetadataIndex() 结合存储在输出流中的信息生成元数据树。
public static MetadataIndexNode constructMetadataIndex(
Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap, TsFileOutput out)
throws IOException {
Map<String, MetadataIndexNode> deviceMetadataIndexMap = new TreeMap<>();
// 对于每个设备,有:
for (Entry<String, List<TimeseriesMetadata>> entry : deviceTimeseriesMetadataMap.entrySet()) {
if (entry.getValue().isEmpty()) {
continue;
}
// 传感器索引层,初始化此设备的索引节点的队列
Queue<MetadataIndexNode> measurementMetadataIndexQueue = new ArrayDeque<>();
TimeseriesMetadata timeseriesMetadata;
// 传感器索引层,初始化传感器索引层级的叶子节点
MetadataIndexNode currentIndexNode =
new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
// 对于每个 TimeseriesMetadata,有:
for (int i = 0; i < entry.getValue().size(); i++) {
timeseriesMetadata = entry.getValue().get(i);
// 每隔 MAX_DEGREE_OF_INDEX_NODE 个,加一条 entry 到 currentIndexNode 中
if (i % config.getMaxDegreeOfIndexNode() == 0) {
// 每当 currentIndexNode 中攒够 MAX_DEGREE_OF_INDEX_NODE 个 entry 后
if (currentIndexNode.isFull()) {
// 将 currentIndexNode 加入 queue 中
addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
// 并将 currentIndexNode 指向一个新的 MetadataIndexNode
currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
}
currentIndexNode.addEntry(
new MetadataIndexEntry(timeseriesMetadata.getMeasurementId(), out.getPosition()));
}
// 序列化 timeseriesMetadata
timeseriesMetadata.serializeTo(out.wrapAsStream());
}
// 将 currentIndexNode 加入 queue 中
addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
// 根据 queue 中已经存储的叶子节点,逐层生成上层节点,直至最终的根节点,并将"设备-根节点"对应的映射加入 deviceMetadataIndexMap 中
deviceMetadataIndexMap.put(
entry.getKey(),
generateRootNode(
measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
}
// 判断设备数是否超过 MAX_DEGREE_OF_INDEX_NODE,如果未超过则可以直接形成元数据索引树的根节点并返回
if (deviceMetadataIndexMap.size() <= config.getMaxDegreeOfIndexNode()) {
MetadataIndexNode metadataIndexNode =
new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
// 对于 deviceMetadataIndexMap 中的每一个 entry,有:
for (Map.Entry<String, MetadataIndexNode> entry : deviceMetadataIndexMap.entrySet()) {
// 将其转化成一个索引项,加入到 metadataIndexNode 中
metadataIndexNode.addEntry(new MetadataIndexEntry(entry.getKey(), out.getPosition()));
// 序列化 entry
entry.getValue().serializeTo(out.wrapAsStream());
}
// 设置根节点的 endOffset 并返回
metadataIndexNode.setEndOffset(out.getPosition());
return metadataIndexNode;
}
// 如若不然,则生成设备索引层
// 初始化存放设备索引层级节点的队列 queue
Queue<MetadataIndexNode> deviceMetadaIndexQueue = new ArrayDeque<>();
// 初始化设备索引层级的叶子节点 currentIndexNode
MetadataIndexNode currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
// 对于 deviceMetadataIndexMap 中的每一个 entry,有:
for (Map.Entry<String, MetadataIndexNode> entry : deviceMetadataIndexMap.entrySet()) {
// 每当 currentIndexNode 中攒够 MAX_DEGREE_OF_INDEX_NODE 个 entry 后,
if (currentIndexNode.isFull()) {
addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadaIndexQueue, out);
// 将 currentIndexNode 指向一个新的 MetadataIndexNode
currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
}
// 将其转化成一个索引项,加入到 currentIndexNode 中
currentIndexNode.addEntry(new MetadataIndexEntry(entry.getKey(), out.getPosition()));
// 序列化 entry
entry.getValue().serializeTo(out.wrapAsStream());
}
// 将 currentIndexNode 加入 queue 中
addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadaIndexQueue, out);
// 根据 queue 中已经存储的叶子节点,逐层生成上层节点,直至最终的根节点
MetadataIndexNode deviceMetadataIndexNode =
generateRootNode(deviceMetadaIndexQueue, out, MetadataIndexNodeType.INTERNAL_DEVICE);
// 设置根节点的 endOffset 并返回
deviceMetadataIndexNode.setEndOffset(out.getPosition());
return deviceMetadataIndexNode;
}
对于其中的generateRootNode()
,有:
// 该方法需要将队列中的 MetadataIndexNode 形成树级结构,并返回根节点
private static MetadataIndexNode generateRootNode(
Queue<MetadataIndexNode> metadataIndexNodeQueue, TsFileOutput out, MetadataIndexNodeType type)
throws IOException {
int queueSize = metadataIndexNodeQueue.size();
MetadataIndexNode metadataIndexNode;
// 根据需要的类型 type 初始化 currentIndexNode
MetadataIndexNode currentIndexNode = new MetadataIndexNode(type);
// 当队列中有多余一个节点时 (不是仅剩根节点时),循环处理队列,对于队列中存在的每个节点,有:
while (queueSize != 1) {
for (int i = 0; i < queueSize; i++) {
metadataIndexNode = metadataIndexNodeQueue.poll();
// 每当 currentIndexNode 中攒够 MAX_DEGREE_OF_INDEX_NODE 个 entry 后
if (currentIndexNode.isFull()) {
// 将 currentIndexNode 加入 queue 中
addCurrentIndexNodeToQueue(currentIndexNode, metadataIndexNodeQueue, out);
// 并将 currentIndexNode 指向一个新的 MetadataIndexNode
currentIndexNode = new MetadataIndexNode(type);
}
// 将其转化成一个索引项,加入到 currentIndexNode 中
currentIndexNode.addEntry(
new MetadataIndexEntry(metadataIndexNode.peek().getName(), out.getPosition()));
// 序列化 metadataIndexNode
metadataIndexNode.serializeTo(out.wrapAsStream());
}
// 将 currentIndexNode 加入 queue 中
addCurrentIndexNodeToQueue(currentIndexNode, metadataIndexNodeQueue, out);
// 并将 currentIndexNode 指向一个新的 MetadataIndexNode
currentIndexNode = new MetadataIndexNode(type);
queueSize = metadataIndexNodeQueue.size();
}
// 队列中只剩下一个节点时,返回队列中最终剩余的节点 (即为根节点)
return metadataIndexNodeQueue.poll();
}
读流程
过滤条件和查询表达式
TsFile 查询时的过滤器Filter定义了12种基本类型如下,详细定义可以参考[5]:
等于 | 大于 | 大于等于 | 小于 | 小于等于 | 不等于 |
---|---|---|---|---|---|
TimeEq | TimeGt | TimeGtEq | TimeLt | TimeLtEq | TimeNotEq |
ValueEq | ValueGt | ValueGtEq | ValueLt | ValueLtEq | ValueNotEq |
Filter
整体上分为一元条件UnaryFilter
以及二元条件BinaryFilter
,二元条件存在逻辑与关系AndFilter
以及逻辑或关系OrFilter
两种条件间关系。
IoTDB中提供了两种查询表达式:SingleSeriesExpression
以及GlobalTimeExpression
。SingleSeriesExpression
针对一个时间序列进行过滤,而GlobalTimeExpression
的过滤条件则应用于全部时间序列。因此,SingleSeriesExpression
包含path
和filter
两个参数,path
用于确定针对的时间序列,filter
则表示查询的过滤条件。而GlobalTimeExpression
则只包含filter
。
查询过滤条件(IExpression)为执行查询时的表达式,有如下的定义:
IExpression := SingleSeriesExpression | GlobalTimeExpression | AndExpression | OrExpression
AndExpression := IExpression && IExpression
OrExpression := IExpression || IExpression
然而这些IExpression并不全部可以直接应用于查询,需要利用特殊的优化算法转化为可执行的表达式,满足以下任一条件的查询表达式为可执行表达式:
IExpression
为单一的GlobalTimeExpression
IExpression
为单一的SingleSeriesExpression
IExpression
为AndExpression
,且叶子节点均为 SingleSeriesExpressionIExpression
为OrExpression
,且叶子节点均为SingleSeriesExpression
上述优化算法大致如下:
首先,根据上述的定义,一个不是可执行表达式的查询表达式应是AndExpression
或者OrExpression
,且同时包含GlobalTimeExpression
与 SingleSeriesExpression
。
因此,我们主要的主要问题是处理GlobalTimeExpression
与SingleSeriesExpression
的查询统一问题,选择的解决办法是把GlobalTimeExpression
的查询条件投影到全部待查询的path
(测点)中,然后就可以化归成 SingleSeriesExpression
间的合并,这里我们的 IExpression 代指由若干条SingleSeriesExpression
合并而成的条件,因此有:对于IExpression
间和合并直接根据两者的关系按照生成AndExpression
或者 ORExpression
的方法处理,生成一个新的 IExpression (SingleSeriesExpression
直接作为IExpression
处理);
而对于GlobalTimeExpression
间的合并,则直接根据两者的关系按照生成AndExpression
或者 ORExpression
的方法处理,生成一个新的 GlobaLTimeFilter
;
对于GlobalTimeExpression
和IExpression
的合并,我们按照上述的映射方法化归问题,生成新的 IExpression
,值得注意的是,如若两者间的关系是AND,则映射的path
即为被合并的IExpression
中包含的有效 path,而如若关系式OR,则需首先查询 GlobalTimeExpression
的投影集合。
IoTDB对于上述的算法提供了optimize()
方法来进行实现(包含combineTwoGlobalTimeExpression()
,handleOneGlobalTimeExpressionr()
和MergeIExpression()
的实现方法,详细定义可以参照[5])
TsFile 查询执行过程
TsFile 文件层查询接口只包含原始数据查询,根据是否包含值过滤条件,可以将查询分为两类“无过滤条件或仅包含时间过滤条件查询”和“包含值过滤条件的查询”。这里重点介绍其中“无过滤条件或仅包含时间过滤条件查询”,“包含值过滤条件的查询”的详细介绍可以参照[5]。
为了执行以上两类查询,有两套查询流程:
- 归并查询
生成多个 reader,按照 time 对齐,返回结果集。
- 连接查询
根据查询条件生成满足过滤条件的时间戳,通过满足条件的时间戳查询投影列的值,返回结果集。
归并查询
归并查询对于每一个时间序列都构建了一个FileSeriesReader
,基于最小堆来实现数据的合并。
查询过程中,系统根据所有的FileSeriesReader
生成一个DataSetWithoutTimeGenerator
,由于每个FileSeriesReader
会按照时间戳从小到大的顺序迭代地返回数据点,所以可以采用“多路归并”对所有FileSeriesReader
的结果进行按时间戳对齐。
详细的算法描述如下[5]:
(1) 创建一个最小堆,堆里面存储“时间戳”,该堆将按照每个时间戳的大小进行组织。
(2) 初始化堆,依次访问每一个 FileSeriesReader,如果该 FileSeriesReader 中还有数据点,则获取数据点的时间戳并放入堆中。此时每个时间序列最多有1个时间戳被放入到堆中,即该序列最小的时间戳。
(3) 如果堆的 size > 0,获取堆顶的时间戳,记为t,并将其在堆中删除,进入步骤(4);如果堆的 size 等于0,则跳到步骤(5),结束数据合并过程。
(4) 创建新的 RowRecord。依次遍历每一条时间序列。在处理其中一条时间序列时,如果该序列没有更多的数据点,则将该列标记为 null 并添加在 RowRecord 中;否则,判断最小的时间戳是否与 t 相同,若不相同,则将该列标记为 null 并添加在 RowRecord 中。若相同,将该数据点添加在 RowRecord 中,同时判断该时间序列是否有新的数据点,若存在,则将下一个时间戳 t' 添加在堆中,并将 t' 设为当前时间序列的最小时间戳。最后,返回步骤(3)。
(5) 结束数据合并过程。
连接查询
连接查询生成满足“选择条件”的时间戳、查询被投影列在对应时间戳下的数据点、合成RowRecord
,即针对满足条件的时间戳,将其依次投影至各时间序列上,记录满足条件的数据。
主要流程如下[5]:
(1) 根据 QueryExpression,初始化时间戳计算模块 TimeGeneratorImpl
(2) 为每个被投影的时间序列创建 FileSeriesReaderByTimestamp
(3) 如果“时间戳计算模块”中还有下一个时间戳,则计算出下一个时间戳 t ,进入步骤(4);否则,结束查询。
(4) 根据 t,在每个时间序列上使用 FileSeriesReaderByTimestamp 组件获取在时间戳 t 下的数据点;如果在该时间戳下没有对应的数据点,则用 null 表示。
(5) 将步骤(4)中得到的所有数据点合并成一个 RowRecord,此时得到一条查询结果,返回步骤(3)计算下一个查询结果。
查询流程
TsFileExecutor
接收一个QueryExpression
,执行该查询并返回相应的 QueryDataSet
。
其基本流程如下:
(1)接收一个 QueryExpression
(2)如果无过滤条件,执行归并查询。如果该 QueryExpression 包含 Filter(过滤条件),则通过 ExpressionOptimizer 对该 QueryExpression 的 Filter 进行优化。如果是 GlobalTimeExpression,执行归并查询。如果包含值过滤,交给 ExecutorWithTimeGenerator 执行连接查询。
(3) 生成对应的 QueryDataSet,迭代地生成 RowRecord,将查询结果返回。
源码分析
此处引用的为v0.12.0[4]的代码
这里未介绍 value filter 相关的查询,详细的介绍可以参考[5],代码可以参考[4]。
接口调用
// 这里展示了一个具有 0,1,2,3 4个设备,每个设备具有1,2,3 3个测点的TsFile的例子
// 初始化 TsFile reader
try (TsFileSequenceReader reader = new TsFileSequenceReader(path);
ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader)) {
无 filter
// 将查询映射到设备0的各个 path (测点)上
ArrayList<Path> paths = new ArrayList<>();
paths.add(new Path("device_0", "sensor_1"));
paths.add(new Path("device_0", "sensor_2"));
paths.add(new Path("device_0", "sensor_3"));
// 不使用 filter的情况,应该查询出所有设备0某一测点具有有效值的数据点对应的时间戳下的设备0的测点1,2,3的情况
queryAndPrint(paths, readTsFile, null);
}
存在 time filter
// 使用 time filter (4 <= time <= 10) 的情况,应该选出时间戳在[4,10]间的有效记录
IExpression timeFilter =
BinaryExpression.and(
new GlobalTimeExpression(TimeFilter.gtEq(4L)),
new GlobalTimeExpression(TimeFilter.ltEq(10L)));
queryAndPrint(paths, readTsFile, timeFilter);
其中,查询执行部分接口调用如下:
// 其中,paths 为查询的全体测点集,statement 为查询条件
private static void queryAndPrint(
ArrayList<Path> paths, ReadOnlyTsFile readTsFile, IExpression statement) throws IOException {
// 根据查询条件生成queryExpression
QueryExpression queryExpression = QueryExpression.create(paths, statement);
// 执行查询,如若存在过滤条件 (filter) 则先进行优化
QueryDataSet queryDataSet = readTsFile.query(queryExpression);
// 完成查询,并输出全部查询结果
while (queryDataSet.hasNext()) {
System.out.println(queryDataSet.next());
}
System.out.println("------------");
}
另外值得一提的是IoTDB还提供了可以根据 offset 逐 byte 读取 TsFile 的接口:
marker = reader.readMarker()
流程解析
如上文所述,查询过程的主要流程包括生成QueryExpression
,(对于存在filter
时) 优化, 查询 (归并查询 / 连接查询)。我们发现,在生成QueryExpression
后,由readTsFile.query()
生成queryDataSet
,再迭代地生成RowRecord
。
查看readTsFile.query()
模块有:
public QueryDataSet query(QueryExpression queryExpression) throws IOException {
return tsFileExecutor.execute(queryExpression);
}
public QueryDataSet execute(QueryExpression queryExpression) throws IOException {
// 使用 bloom filter 判断检索对象 path 是否存在
BloomFilter bloomFilter = metadataQuerier.getWholeFileMetadata().getBloomFilter();
List<Path> filteredSeriesPath = new ArrayList<>();
if (bloomFilter != null) {
for (Path path : queryExpression.getSelectedSeries()) {
if (bloomFilter.contains(path.getFullPath())) {
filteredSeriesPath.add(path);
}
}
queryExpression.setSelectSeries(filteredSeriesPath);
}
// 加载metadata
metadataQuerier.loadChunkMetaDatas(queryExpression.getSelectedSeries());
// 存在 filter 时
if (queryExpression.hasQueryFilter()) {
try {
// 获取查询表达式
IExpression expression = queryExpression.getExpression();
// 获取可执行查询表达式
IExpression regularIExpression =
ExpressionOptimizer.getInstance()
.optimize(expression, queryExpression.getSelectedSeries());
// 更新为对应的可执行表达式
queryExpression.setExpression(regularIExpression);
// 当可执行表达式为 GlobalTimeExpression 时,及全由 GlobalTimeExpression 组成时,执行归并查询
if (regularIExpression instanceof GlobalTimeExpression) {
return execute(
queryExpression.getSelectedSeries(), (GlobalTimeExpression) regularIExpression);
} else {
// 反之,执行连接查询
return new ExecutorWithTimeGenerator(metadataQuerier, chunkLoader)
.execute(queryExpression);
}
} catch (QueryFilterOptimizationException | NoMeasurementException e) {
throw new IOException(e);
}
} else {
try {
// 没有 filter 时,执行归并查询
return execute(queryExpression.getSelectedSeries());
} catch (NoMeasurementException e) {
throw new IOException(e);
}
}
}
// 其中对于存在 GlobalTimeExpression 的 filter 时,以及无 filter 时,有:
private QueryDataSet executeMayAttachTimeFiler(
List<Path> selectedPathList, GlobalTimeExpression timeExpression)
throws IOException, NoMeasurementException {
List<AbstractFileSeriesReader> readersOfSelectedSeries = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
for (Path path : selectedPathList) {
List<IChunkMetadata> chunkMetadataList = metadataQuerier.getChunkMetaDataList(path);
AbstractFileSeriesReader seriesReader;
if (chunkMetadataList.isEmpty()) {
seriesReader = new EmptyFileSeriesReader();
dataTypes.add(metadataQuerier.getDataType(path));
} else {
// 没有filter 时
if (timeExpression == null) {
seriesReader = new FileSeriesReader(chunkLoader, chunkMetadataList, null);
} else {
// 存在 GlobalTimeExpression 的 filter 时,将 GlobalTimeExpression 的条件加入每一个时间序列的 reader 中
seriesReader =
new FileSeriesReader(chunkLoader, chunkMetadataList, timeExpression.getFilter());
}
dataTypes.add(chunkMetadataList.get(0).getDataType());
}
readersOfSelectedSeries.add(seriesReader);
}
// 开始归并查询
return new DataSetWithoutTimeGenerator(selectedPathList, dataTypes, readersOfSelectedSeries);
}
归并查询
// 初始化堆
private void initHeap() throws IOException {
hasDataRemaining = new ArrayList<>();
batchDataList = new ArrayList<>();
timeHeap = new PriorityQueue<>();
timeSet = new HashSet<>();
// 初始化堆,依次访问每个 series reader
for (int i = 0; i < paths.size(); i++) {
AbstractFileSeriesReader reader = readers.get(i);
// 如果没有数据点时
if (!reader.hasNextBatch()) {
batchDataList.add(new BatchData());
hasDataRemaining.add(false);
} else {
// 如果还有数据点,将其加入list
batchDataList.add(reader.nextBatch());
hasDataRemaining.add(true);
}
}
// 获取 list 中有效数据点中时间戳放入堆中
for (BatchData data : batchDataList) {
if (data.hasCurrent()) {
timeHeapPut(data.currentTime());
}
}
}
在queryDataSet.hasNext()
中判断堆的size是否大于0,
如若大于0,有:
public RowRecord nextWithoutConstraint() throws IOException {
// 获取堆顶记录的最小时间
long minTime = timeHeapGet();
// 新建 RowRecord
RowRecord record = new RowRecord(minTime);
// 访问每一个选中的时间序列
for (int i = 0; i < paths.size(); i++) {
Field field = new Field(dataTypes.get(i));
// 没有更多的数据点时,标记为null
if (!hasDataRemaining.get(i)) {
record.addField(null);
continue;
}
// 获取数据点
BatchData data = batchDataList.get(i);
// 如果当前的最小时间戳等于该测点当前时间戳
if (data.hasCurrent() && data.currentTime() == minTime) {
// 添加该数据点
putValueToField(data, field);
// 移动到下一个时间戳
data.next();
// 没有下一个时间戳时
if (!data.hasCurrent()) {
AbstractFileSeriesReader reader = readers.get(i);
// 判断有无nextBatch,如有,则判断其中有无新的时间戳,如有则将其加入堆中
if (reader.hasNextBatch()) {
data = reader.nextBatch();
if (data.hasCurrent()) {
batchDataList.set(i, data);
timeHeapPut(data.currentTime());
} else {
hasDataRemaining.set(i, false);
}
} else {
hasDataRemaining.set(i, false);
}
} else {
// 如果有下一个时间戳,将其加入堆中
timeHeapPut(data.currentTime());
}
record.addField(field);
} else {
// 如果两个时间不相同,则将其标志为 null
record.addField(null);
}
}
return record;
}
连接查询
public DataSetWithTimeGenerator execute(QueryExpression queryExpression) throws IOException {
// 获取查询表达式
IExpression expression = queryExpression.getExpression();
List<Path> selectedPathList = queryExpression.getSelectedSeries();
// 根据查询表达式获取时间戳计算单元
TimeGenerator timeGenerator = new TsFileTimeGenerator(expression, chunkLoader, metadataQuerier);
List<Boolean> cached =
markFilterdPaths(expression, selectedPathList, timeGenerator.hasOrNode());
// 为每个被投影的时间序列创建 FileSeriesReaderByTimestamp
List<FileSeriesReaderByTimestamp> readersOfSelectedSeries = new ArrayList<>();
List<TSDataType> dataTypes = new ArrayList<>();
Iterator<Boolean> cachedIterator = cached.iterator();
Iterator<Path> selectedPathIterator = selectedPathList.iterator();
// 如果“时间戳计算模块”中还有下一个时间戳,则计算出下一个时间戳
while (cachedIterator.hasNext()) {
boolean cachedValue = cachedIterator.next();
Path selectedPath = selectedPathIterator.next();
List<IChunkMetadata> chunkMetadataList = metadataQuerier.getChunkMetaDataList(selectedPath);
if (chunkMetadataList.size() != 0) {
dataTypes.add(chunkMetadataList.get(0).getDataType());
// 如果该时间戳下没有满足 filter 的数据点,则将各对应的测点下该时间戳处设为null
if (cachedValue) {
readersOfSelectedSeries.add(null);
continue;
}
FileSeriesReaderByTimestamp seriesReader =
new FileSeriesReaderByTimestamp(chunkLoader, chunkMetadataList);
// 将该 FileSeriesReaderByTimestamp 组件加入 readersOfSelectedSeries 以便获取在该时间戳下的数据点
readersOfSelectedSeries.add(seriesReader);
} else {
// 如果 selectedPath 为空
selectedPathIterator.remove();
cachedIterator.remove();
}
}
// 返回 DataSetWithTimeGenerator
return new DataSetWithTimeGenerator(
selectedPathList, cached, dataTypes, timeGenerator, readersOfSelectedSeries);
}
// 根据上述“时间戳计算模块”中算出的时间戳,对于每一个时间戳,分别在每个时间序列上使用之前记录的 FileSeriesReaderByTimestamp 组件获取在该时间戳下的数据点,并合并成一个 RowRecord,得到一个查询结果
public RowRecord nextWithoutConstraint() throws IOException {
// 获取下一个“时间戳计算模块”中算出的时间戳
long timestamp = timeGenerator.next();
RowRecord rowRecord = new RowRecord(timestamp);
// 对于每个时间序列,获取在该时间戳下的数据点
for (int i = 0; i < paths.size(); i++) {
// 如果是存在 filter 作用在该时间序列的情况
if (cached.get(i)) {
// 利用 time generator 中的 reader 获取对应数据点
Object value = timeGenerator.getValue(paths.get(i));
// 加入 RowRecord
rowRecord.addField(value, dataTypes.get(i));
continue;
}
// 如果是不存在 filter 作用在该时间序列的情况
// 利用 time generator 中的 reader 获取对应数据点
FileSeriesReaderByTimestamp fileSeriesReaderByTimestamp = readers.get(i);
Object value = fileSeriesReaderByTimestamp.getValueInTimestamp(timestamp);
// 加入 RowRecord
rowRecord.addField(value, dataTypes.get(i));
}
return rowRecord;
}