[IoTDB 学习笔记] [part 2] TsFile 的基本读写

说在前面: 这个list记录了博主在学习IoTDB[1]期间的总结和思考,欢迎一起讨论学习哈
相关介绍可以参考list的第一篇博客:[IoTDB 学习笔记] [part1] 介绍

TsFile 基本结构

TsFile 是 IoTDB 的底层数据文件,一种专门为时间序列数据设计的列式文件格式。TsFile的设计细节具体可以参考[2]

总的来说,TsFile文件主要包含数据(Chunk)和元数据(Metadata)两个部分。可以认为对于我们观测的设备网络中,每个设备包含若干个测点,每个测点包含一个时间序列Timeseries,时间序列记录了该测点各时间戳下的值,时间序列的值记录在Chunk当中;对应的,其中的元数据也分为三部分:ChunkMetadataTimeseriesMetadataTsFileMetadata。一个TsFile的示例结构如图 1 [2]所示,该文件包含两个设备d1与d2,每个设备包含三个测点s1,s2和s3,每个设备的每个测点对应一个时间序列,每个时间序列包含两个Chunk

图 1 . TsFile的示例结构

查询流程
以查 d1.s1 为例

  • 反序列化 TsFileMetadata,得到 d1.s1 的 TimeseriesMetadata 的位置
  • 反序列化得到 d1.s1 的 TimeseriesMetadata
  • 根据 d1.s1 的 TimeseriesMetadata,反序列化其所有 ChunkMetadata
  • 根据 d1.s1 的每一个 ChunkMetadata,读取其 Chunk 数据

每个设备中的数据存储在一个ChunkGroup中,每个ChunkGroup由若干Chunk组成,ChunkChunkHeader和若干Page一起构成。

所有元数据索引节点构成了一颗元数据索引树,包含传感器索引层以及可能的设备索引层,各层包含内部节点和叶节点两种类型的节点,分别为:INTERNAL_MEASUREMENTLEAF_MEASUREMENTINTERNAL_DEVICELEAF_DEVICE。其中LEAF_MEASUREMENT指向TimeseriesMetadata,如图 2 [2]所示。
max_degree_of_index_node为索引树节点的最大子节点数目。当总设备数目不超过max_degree_of_index_node时,不存在设备索引层,如图 3 [2]所示。

图 2 . max_degree_of_index_node为10 150个设备 每个设备中有150个传感器
图 3 . max_degree_of_index_node为10 5个设备 每个设备中有5个传感器

写流程

TsFile 的写入流程如图 4 [3]所示:

图 4 . 每个设备对应一个 ChunkGroupWriter 每个传感器对应一个 ChunkWriter

其中,文件的写入主要分为三种操作,在图上用 1、2、3 标注:
1、写内存缓冲区
2、持久化 ChunkGroup
3、关闭文件

写内存缓冲区

TsFile 文件层的写入接口有两种:

  • 写入一个设备一个时间戳的多个测点
  • 写入一个设备多个时间戳的多个测点

当调用写接口时,这个设备的数据会交给对应的 ChunkGroupWriter,其中的每个测点会交给对应的 ChunkWriter 进行写入。ChunkWriter 完成编码和打包(生成 Page

持久化 ChunkGroup

当内存中的数据达到一定阈值,会触发持久化操作。每次持久化会把当前内存中所有设备的数据全部持久化到磁盘的 TsFile 文件中。每个设备对应一个 ChunkGroup,每个测点对应一个 Chunk。持久化完成后会在内存中缓存对应的元数据信息,以供查询和生成文件尾部 metadata。

关闭文件

根据内存中缓存的元数据,生成 TsFileMetadata 追加到文件尾部,最后关闭文件。

正如 TsFile 基本结构 中所述,生成 TsFileMetadata 的过程中比较重要的一步是建立元数据索引 (MetadataIndex) 树,以使得检索时间序列数据时可以不用读取所有的TimeseriesMetadata以减少 I/O 操作。

建立元数据索引树的算法大致如下:

  1. 从索引树的底层开始构建。在传感器索引层,对于每个设备,我们首先初始化其叶节点(类型为LEAF_MEASUREMENT),然后对于每个TimeseriesMetadata,在序列化后,将其加入叶节点中,当一个节点中的TimeseriesMetadata达到MAX_DEGREE_OF_INDEX_NODE后,将这个节点加入设备queue中,并重新初始化一个叶节点用于继续存放接下来的TimeseriesMetadata,然后重复这个过程,直到所有TimeseriesMetadata全部添加完成。然后对于queue中的叶节点,逐层地生成上层节点(INTERNAL_MEASUREMENT),具体方法与上述方法类似,每个上层节点中最多包含MAX_DEGREE_OF_INDEX_NODE个子节点,不断重复地生成上级节点,直到仅剩一个节点时,即为需要的根节点。
  2. 然后,判断设备数目是否超过MAX_DEGREE_OF_INDEX_NODE,如若未超过,则直接生成元数据索引树的根节点(子节点集包含 1 中每个设备对应的根节点序列化后转化成的索引项);如若超过,则需要生成设备索引层级,具体方法与 1 类似,在初始化queue后,先生成类型为LEAF_DEVICE的节点,每个节点最多包含MAX_DEGREE_OF_INDEX_NODE个子级索引。然后按照和 1 类似的方法逐层生成上层节点,直到生成最终的根节点。

源码解析

此处引用的为v0.12.0[4]的代码

接口调用

利用TSRecord写

IoTDB 提供了一个TSRecord工具,TSRecord记录了一个设备在一个时间戳下的若干测点信息。

直接写入新文件时

// 生成 TsFile 存储文件,path为对应的disk存储路径
File f = FSFactoryProducer.getFSFactory().getFile(path);

// 生成 tsfile writer
try (TsFileWriter tsFileWriter = new TsFileWriter(f)) {

      // 在 tsfile writer 中注册测点信息 这里 包含了 0,1,2,3 四个设备,每个设备包含 1,2,3 3 个测点
      for (int i = 0; i < 4; i++) {
        tsFileWriter.registerTimeseries(
            new Path(Constant.DEVICE_PREFIX + i, Constant.SENSOR_1),
            new MeasurementSchema(Constant.SENSOR_1, TSDataType.INT64, TSEncoding.RLE));
        tsFileWriter.registerTimeseries(
            new Path(Constant.DEVICE_PREFIX + i, Constant.SENSOR_2),
            new MeasurementSchema(Constant.SENSOR_2, TSDataType.INT64, TSEncoding.RLE));
        tsFileWriter.registerTimeseries(
            new Path(Constant.DEVICE_PREFIX + i, Constant.SENSOR_3),
            new MeasurementSchema(Constant.SENSOR_3, TSDataType.INT64, TSEncoding.RLE));
      }
      // 生成时间序列信息,设备0在时间戳为0,4,8..时各测点有值,均与时间戳的值相同,设备1,2,3分别在时间戳为1,5,9..;2,6,10..;3,7,11时各测点有值,且均与时间戳的值相同
      for (int i = 0; i < 100; i++) {
        // 生成设备(i % 4)在时间戳为i时的记录
        TSRecord tsRecord = new TSRecord(i, Constant.DEVICE_PREFIX + (i % 4));
        // 生成时间戳为i时设备(i % 4)的各测点的信息
        DataPoint dPoint1 = new LongDataPoint(Constant.SENSOR_1, i);
        DataPoint dPoint2 = new LongDataPoint(Constant.SENSOR_2, i);
        DataPoint dPoint3 = new LongDataPoint(Constant.SENSOR_3, i);
        // 将生成的测点信息加入设备记录
        tsRecord.addTuple(dPoint1);
        tsRecord.addTuple(dPoint2);
        tsRecord.addTuple(dPoint3);

        // 将设备(i % 4)在时间戳为i时的记录加入tsFile中
        tsFileWriter.write(tsRecord);
      }
    } catch (Exception e) {
      logger.error("meet error in TsFileWrite ", e);
    }

续写TsFile时

// 基本流程与直接写入新文件时一致,不同的是在生成writer的时候使用:

// f 为读入的TsFile文件:File f = FSFactoryProducer.getFSFactory().getFile(path);
// 为了续写f中存储的TsFile,这里使用了ForceAppendTsFileWriter,初始化的时候读取了f中存储的TsFile相关信息
ForceAppendTsFileWriter fwriter = new ForceAppendTsFileWriter(f);
// 截取f中truncatePosition及之前的内容(截去尾部元数据),以便添加新的内容
fwriter.doTruncate();

利用tablet写

IoTDB 提供了一个tablet工具,tablet记录了一个设备的多个测点的信息,按照一种表格的形式表示,这些测点具有相同的时间戳序列,因此可以应用在测点具有相同时间戳序列(每个时间戳下各个测点都具有值)的设备中。

// 生成 TsFile 存储文件,path为对应的disk存储路径
File f = FSFactoryProducer.getFSFactory().getFile(path);
if (f.exists() && !f.delete()) {
        throw new RuntimeException("can not delete " + f.getAbsolutePath());
      }

// 初始化模式
Schema schema = new Schema();

// 此处展示了10个测点(一个设备),每个测点1000000个时间序列值的例子
String device = Constant.DEVICE_PREFIX + 1;
String sensorPrefix = "sensor_";
int rowNum = 1000000;
int sensorNum = 10;

// 用于初始化Tablet
List<IMeasurementSchema> measurementSchemas = new ArrayList<>();
// 将测点加入模式
for (int i = 0; i < sensorNum; i++) {
IMeasurementSchema measurementSchema =
            new MeasurementSchema(sensorPrefix + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF);
measurementSchemas.add(measurementSchema);
// 用于初始化TsFilewriter时注册各测点
schema.registerTimeseries(
            new Path(device, sensorPrefix + (i + 1)),
            new MeasurementSchema(sensorPrefix + (i + 1), TSDataType.INT64, TSEncoding.TS_2DIFF));
      }

// 生成TsFileWriter,并注册各测点
try (TsFileWriter tsFileWriter = new TsFileWriter(f, schema)) {

        // 初始化Tablet
        Tablet tablet = new Tablet(device, measurementSchemas);

        long[] timestamps = tablet.timestamps;
        Object[] values = tablet.values;

        // 生成的时间序列初始时间戳以及初始值(各时间戳下的测点的值随着时间戳的增长而增长)
        long timestamp = 1;
        long value = 1000000L;
        
        // 为Tablet赋值
        for (int r = 0; r < rowNum; r++, value++) {
          // 记录时间戳的序列,每次增加1(其值为row),value也每次加1
          int row = tablet.rowSize++;
          timestamps[row] = timestamp++;
          // 将value赋给每个sensor的当前时间戳(row)的位置
          for (int i = 0; i < sensorNum; i++) {
            long[] sensor = (long[]) values[i];
            sensor[row] = value;
          }
          // 当时间戳数目达到阈值(tablet.getMaxRowNumber())时,将当前的tablet写入TsFile,并清空tablet以继续重复该流程来将剩余时间序列信息写入tsfile
          if (tablet.rowSize == tablet.getMaxRowNumber()) {
            tsFileWriter.write(tablet);
            tablet.reset();
          }
        }
        // 如果最后还有没有写入TsFile的内容,将其写入
        if (tablet.rowSize != 0) {
          tsFileWriter.write(tablet);
          tablet.reset();
        }
      }

流程解析

写内存缓冲区部分中,我们了解到在写入TsFile时具有两种接口:

  • 写入一个设备一个时间戳多个测点
  • 写入一个设备多个时间戳多个测点
    分别对应上述展示的接口调用部分中的基于TSRecord和Tablet的写入方法。

我们了解到,当调用 write 接口时,这个设备的数据会交给对应的 ChunkGroupWriter,其中的每个测点会交给对应的 ChunkWriter 进行写入。ChunkWriter 完成编码和打包(生成 Page)。

查看TsFile writer 中写入方法write,我们发现:

写内存缓冲区

// 用于基于TSRecord的写入方法
public boolean write(TSRecord record) throws IOException, WriteProcessException {
    // 确保对应的groupwriter和chunkwriter存在,不存在则生成
    checkIsTimeSeriesExist(record);
    // 获取所要写入的设备的groupwriter,并调用其来写入所要写入的datapoint
    groupWriters.get(record.deviceId).write(record.time, record.dataPointList);
    ++recordCount;
    return checkMemorySizeAndMayFlushChunks();
  }

// 用于基于Tablet的写入方法
public boolean write(Tablet tablet) throws IOException, WriteProcessException {
    // 确保对应的groupwriter和chunkwriter存在,不存在则生成
    checkIsTimeSeriesExist(tablet);
    // 获取所要写入的设备的groupwriter,并调用其来写入所要导入的Tablet
    groupWriters.get(tablet.deviceId).write(tablet);
    recordCount += tablet.rowSize;
    return checkMemorySizeAndMayFlushChunks();
  }

持久化 ChunkGroup

// 检查占用的内存是否超过阈值(chunkGroupSizeThreshold),如若超过,则调用flushAllChunkGroups()将其存至OutputStream中,并清空各group writer及其下chunk writer中数据,以进行剩余数据的输入
// 返回false表示所占存储空间未达阈值,true则表示达到
private boolean checkMemorySizeAndMayFlushChunks() throws IOException {
    // recordCountForNextMemCheck为data point数目阈值的最小值,此处初步放缩估计内存,以进行加速
    if (recordCount >= recordCountForNextMemCheck) {
      // 
      long memSize = calculateMemSizeForAllGroup();
      assert memSize > 0;
      if (memSize > chunkGroupSizeThreshold) {
        LOG.debug("start to flush chunk groups, memory space occupy:{}", memSize);
        // 更新recordCountForNextMemCheck
        recordCountForNextMemCheck = recordCount * chunkGroupSizeThreshold / memSize;
        // 存至OutputStream,并清空writer数据
        return flushAllChunkGroups();
      } else {
        // 更新recordCountForNextMemCheck
        recordCountForNextMemCheck = recordCount * chunkGroupSizeThreshold / memSize;
        return false;
      }
    }
    return false;
  }

// 存至OutputStream,并清空各group writer及其下chunk writer中数据,以进行剩余数据的输入
public boolean flushAllChunkGroups() throws IOException {
    if (recordCount > 0) {
      for (Map.Entry<String, IChunkGroupWriter> entry : groupWriters.entrySet()) {
        String deviceId = entry.getKey();
        IChunkGroupWriter groupWriter = entry.getValue();
        // 载入当前设备
        fileWriter.startChunkGroup(deviceId);
        long pos = fileWriter.getPos();
        // 转移该设备下的pages
        long dataSize = groupWriter.flushToFileWriter(fileWriter);
        if (fileWriter.getPos() - pos != dataSize) {
          throw new IOException(
              String.format(
                  "Flushed data size is inconsistent with computation! Estimated: %d, Actual: %d",
                  dataSize, fileWriter.getPos() - pos));
        }
        fileWriter.endChunkGroup();
      }
      // 清空其余相关记录
      reset();
    }
    return false;
  }

关闭文件

fileWriter.endFile() 中,生成了path (测点) 到chunkMetadata 的映射 chunkMetadataListMap,flushMetadataIndex() 中基于 chunkMetadataListMap 生成了设备到TimeseriesMetadata 的映射 deviceTimeseriesMetadataMap。

public void close() throws IOException {
    LOG.info("start close file");
    // 写入剩余的ChunkGroup
    flushAllChunkGroups();
    // 根据内存中缓存的元数据,生成 TsFileMetadata 追加到文件尾部
    fileWriter.endFile();
  }

private MetadataIndexNode flushMetadataIndex(Map<Path, List<IChunkMetadata>> chunkMetadataListMap)
      throws IOException {

    deviceTimeseriesMetadataMap = new LinkedHashMap<>();
    // device -> TimeseriesMetaDataList 的映射
    for (Map.Entry<Path, List<IChunkMetadata>> entry : chunkMetadataListMap.entrySet()) {
      Path path = entry.getKey();
      String device = path.getDevice();

      // 生成 TimeseriesMetaData
      PublicBAOS publicBAOS = new PublicBAOS();
      TSDataType dataType = entry.getValue().get(entry.getValue().size() - 1).getDataType();
      Statistics seriesStatistics = Statistics.getStatsByType(dataType);

      int chunkMetadataListLength = 0;
      boolean serializeStatistic = (entry.getValue().size() > 1);
      // flush chunkMetadataList 
      for (IChunkMetadata chunkMetadata : entry.getValue()) {
        if (!chunkMetadata.getDataType().equals(dataType)) {
          continue;
        }
        chunkMetadataListLength += chunkMetadata.serializeTo(publicBAOS, serializeStatistic);
        seriesStatistics.mergeStatistics(chunkMetadata.getStatistics());
      }
      TimeseriesMetadata timeseriesMetadata =
          new TimeseriesMetadata(
              (byte)
                  ((serializeStatistic ? (byte) 1 : (byte) 0) | entry.getValue().get(0).getMask()),
              chunkMetadataListLength,
              path.getMeasurement(),
              dataType,
              seriesStatistics,
              publicBAOS);
      deviceTimeseriesMetadataMap
          .computeIfAbsent(device, k -> new ArrayList<>())
          .add(timeseriesMetadata);
    }

    return MetadataIndexConstructor.constructMetadataIndex(deviceTimeseriesMetadataMap, out);
  }

然后通过MetadataIndexConstructor.constructMetadataIndex() 结合存储在输出流中的信息生成元数据树。

public static MetadataIndexNode constructMetadataIndex(
      Map<String, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap, TsFileOutput out)
      throws IOException {
    Map<String, MetadataIndexNode> deviceMetadataIndexMap = new TreeMap<>();

    // 对于每个设备,有:
    for (Entry<String, List<TimeseriesMetadata>> entry : deviceTimeseriesMetadataMap.entrySet()) {
      if (entry.getValue().isEmpty()) {
        continue;
      }

      // 传感器索引层,初始化此设备的索引节点的队列
      Queue<MetadataIndexNode> measurementMetadataIndexQueue = new ArrayDeque<>();
      TimeseriesMetadata timeseriesMetadata;
      
      // 传感器索引层,初始化传感器索引层级的叶子节点
      MetadataIndexNode currentIndexNode =
          new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
       
      // 对于每个 TimeseriesMetadata,有:
      for (int i = 0; i < entry.getValue().size(); i++) {
        timeseriesMetadata = entry.getValue().get(i);
        // 每隔 MAX_DEGREE_OF_INDEX_NODE 个,加一条 entry 到 currentIndexNode 中
        if (i % config.getMaxDegreeOfIndexNode() == 0) {
          // 每当 currentIndexNode 中攒够 MAX_DEGREE_OF_INDEX_NODE 个 entry 后
          if (currentIndexNode.isFull()) {
            // 将 currentIndexNode 加入 queue 中
            addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);
            // 并将 currentIndexNode 指向一个新的 MetadataIndexNode
            currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_MEASUREMENT);
          }
          currentIndexNode.addEntry(
              new MetadataIndexEntry(timeseriesMetadata.getMeasurementId(), out.getPosition()));
        }
      
        // 序列化 timeseriesMetadata
        timeseriesMetadata.serializeTo(out.wrapAsStream());
      }
      // 将 currentIndexNode 加入 queue 中
      addCurrentIndexNodeToQueue(currentIndexNode, measurementMetadataIndexQueue, out);

      // 根据 queue 中已经存储的叶子节点,逐层生成上层节点,直至最终的根节点,并将"设备-根节点"对应的映射加入 deviceMetadataIndexMap 中
      deviceMetadataIndexMap.put(
          entry.getKey(),
          generateRootNode(
              measurementMetadataIndexQueue, out, MetadataIndexNodeType.INTERNAL_MEASUREMENT));
    }

    // 判断设备数是否超过 MAX_DEGREE_OF_INDEX_NODE,如果未超过则可以直接形成元数据索引树的根节点并返回
    if (deviceMetadataIndexMap.size() <= config.getMaxDegreeOfIndexNode()) {  
      MetadataIndexNode metadataIndexNode =
          new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
      
      // 对于 deviceMetadataIndexMap 中的每一个 entry,有:
      for (Map.Entry<String, MetadataIndexNode> entry : deviceMetadataIndexMap.entrySet()) {
        
        // 将其转化成一个索引项,加入到 metadataIndexNode 中
        metadataIndexNode.addEntry(new MetadataIndexEntry(entry.getKey(), out.getPosition()));

        // 序列化 entry
        entry.getValue().serializeTo(out.wrapAsStream());
      }
      
      // 设置根节点的 endOffset 并返回
      metadataIndexNode.setEndOffset(out.getPosition());
      return metadataIndexNode;
    }

    // 如若不然,则生成设备索引层
    // 初始化存放设备索引层级节点的队列 queue
    Queue<MetadataIndexNode> deviceMetadaIndexQueue = new ArrayDeque<>();
    // 初始化设备索引层级的叶子节点 currentIndexNode
    MetadataIndexNode currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);

    // 对于 deviceMetadataIndexMap 中的每一个 entry,有:
    for (Map.Entry<String, MetadataIndexNode> entry : deviceMetadataIndexMap.entrySet()) {
      // 每当 currentIndexNode 中攒够 MAX_DEGREE_OF_INDEX_NODE 个 entry 后,  
      if (currentIndexNode.isFull()) {
        addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadaIndexQueue, out);
        // 将 currentIndexNode 指向一个新的 MetadataIndexNode
        currentIndexNode = new MetadataIndexNode(MetadataIndexNodeType.LEAF_DEVICE);
      }
      // 将其转化成一个索引项,加入到 currentIndexNode 中
      currentIndexNode.addEntry(new MetadataIndexEntry(entry.getKey(), out.getPosition()));
      // 序列化 entry
      entry.getValue().serializeTo(out.wrapAsStream());
    }
    // 将 currentIndexNode 加入 queue 中
    addCurrentIndexNodeToQueue(currentIndexNode, deviceMetadaIndexQueue, out);
    // 根据 queue 中已经存储的叶子节点,逐层生成上层节点,直至最终的根节点
    MetadataIndexNode deviceMetadataIndexNode =
        generateRootNode(deviceMetadaIndexQueue, out, MetadataIndexNodeType.INTERNAL_DEVICE);
    // 设置根节点的 endOffset 并返回
    deviceMetadataIndexNode.setEndOffset(out.getPosition());
    return deviceMetadataIndexNode;
  }

对于其中的generateRootNode(),有:

// 该方法需要将队列中的 MetadataIndexNode 形成树级结构,并返回根节点
private static MetadataIndexNode generateRootNode(
      Queue<MetadataIndexNode> metadataIndexNodeQueue, TsFileOutput out, MetadataIndexNodeType type)
      throws IOException {
    int queueSize = metadataIndexNodeQueue.size();
    MetadataIndexNode metadataIndexNode;
    
    // 根据需要的类型 type 初始化 currentIndexNode
    MetadataIndexNode currentIndexNode = new MetadataIndexNode(type);
    
    // 当队列中有多余一个节点时 (不是仅剩根节点时),循环处理队列,对于队列中存在的每个节点,有:
    while (queueSize != 1) {
      for (int i = 0; i < queueSize; i++) {
        metadataIndexNode = metadataIndexNodeQueue.poll();
        // 每当 currentIndexNode 中攒够 MAX_DEGREE_OF_INDEX_NODE 个 entry 后
        if (currentIndexNode.isFull()) {

          // 将 currentIndexNode 加入 queue 中
          addCurrentIndexNodeToQueue(currentIndexNode, metadataIndexNodeQueue, out);

          // 并将 currentIndexNode 指向一个新的 MetadataIndexNode
          currentIndexNode = new MetadataIndexNode(type);
        }

        // 将其转化成一个索引项,加入到 currentIndexNode 中
        currentIndexNode.addEntry(
            new MetadataIndexEntry(metadataIndexNode.peek().getName(), out.getPosition()));
        
        // 序列化 metadataIndexNode
        metadataIndexNode.serializeTo(out.wrapAsStream());
      }

      // 将 currentIndexNode 加入 queue 中
      addCurrentIndexNodeToQueue(currentIndexNode, metadataIndexNodeQueue, out);
      
      // 并将 currentIndexNode 指向一个新的 MetadataIndexNode
      currentIndexNode = new MetadataIndexNode(type);
      queueSize = metadataIndexNodeQueue.size();
    }

    // 队列中只剩下一个节点时,返回队列中最终剩余的节点 (即为根节点)
    return metadataIndexNodeQueue.poll();
  }

读流程

过滤条件和查询表达式

TsFile 查询时的过滤器Filter定义了12种基本类型如下,详细定义可以参考[5]

等于 大于 大于等于 小于 小于等于 不等于
TimeEq TimeGt TimeGtEq TimeLt TimeLtEq TimeNotEq
ValueEq ValueGt ValueGtEq ValueLt ValueLtEq ValueNotEq

Filter整体上分为一元条件UnaryFilter以及二元条件BinaryFilter,二元条件存在逻辑与关系AndFilter以及逻辑或关系OrFilter两种条件间关系。

IoTDB中提供了两种查询表达式:SingleSeriesExpression以及GlobalTimeExpressionSingleSeriesExpression针对一个时间序列进行过滤,而GlobalTimeExpression的过滤条件则应用于全部时间序列。因此,SingleSeriesExpression包含pathfilter两个参数,path用于确定针对的时间序列,filter则表示查询的过滤条件。而GlobalTimeExpression则只包含filter

查询过滤条件(IExpression)为执行查询时的表达式,有如下的定义:

IExpression := SingleSeriesExpression | GlobalTimeExpression | AndExpression | OrExpression
AndExpression := IExpression && IExpression
OrExpression := IExpression || IExpression

然而这些IExpression并不全部可以直接应用于查询,需要利用特殊的优化算法转化为可执行的表达式,满足以下任一条件的查询表达式为可执行表达式:

  1. IExpression 为单一的 GlobalTimeExpression
  2. IExpression 为单一的 SingleSeriesExpression
  3. IExpressionAndExpression,且叶子节点均为 SingleSeriesExpression
  4. IExpressionOrExpression,且叶子节点均为 SingleSeriesExpression

上述优化算法大致如下:

首先,根据上述的定义,一个不是可执行表达式的查询表达式应是AndExpression或者OrExpression,且同时包含GlobalTimeExpressionSingleSeriesExpression

因此,我们主要的主要问题是处理GlobalTimeExpressionSingleSeriesExpression的查询统一问题,选择的解决办法是把GlobalTimeExpression的查询条件投影到全部待查询的path(测点)中,然后就可以化归成 SingleSeriesExpression间的合并,这里我们的 IExpression 代指由若干条SingleSeriesExpression合并而成的条件,因此有:对于IExpression间和合并直接根据两者的关系按照生成AndExpression或者 ORExpression的方法处理,生成一个新的 IExpression (SingleSeriesExpression直接作为IExpression处理);
而对于GlobalTimeExpression间的合并,则直接根据两者的关系按照生成AndExpression或者 ORExpression的方法处理,生成一个新的 GlobaLTimeFilter
对于GlobalTimeExpressionIExpression的合并,我们按照上述的映射方法化归问题,生成新的 IExpression,值得注意的是,如若两者间的关系是AND,则映射的path即为被合并的IExpression中包含的有效 path,而如若关系式OR,则需首先查询 GlobalTimeExpression的投影集合。
IoTDB对于上述的算法提供了optimize()方法来进行实现(包含combineTwoGlobalTimeExpression()handleOneGlobalTimeExpressionr()MergeIExpression()的实现方法,详细定义可以参照[5])

TsFile 查询执行过程

TsFile 文件层查询接口只包含原始数据查询,根据是否包含值过滤条件,可以将查询分为两类“无过滤条件或仅包含时间过滤条件查询”和“包含值过滤条件的查询”。这里重点介绍其中“无过滤条件或仅包含时间过滤条件查询”,“包含值过滤条件的查询”的详细介绍可以参照[5]
为了执行以上两类查询,有两套查询流程:

  • 归并查询

生成多个 reader,按照 time 对齐,返回结果集。

  • 连接查询

根据查询条件生成满足过滤条件的时间戳,通过满足条件的时间戳查询投影列的值,返回结果集。

归并查询

归并查询对于每一个时间序列都构建了一个FileSeriesReader,基于最小堆来实现数据的合并。
查询过程中,系统根据所有的FileSeriesReader生成一个DataSetWithoutTimeGenerator,由于每个FileSeriesReader会按照时间戳从小到大的顺序迭代地返回数据点,所以可以采用“多路归并”对所有FileSeriesReader的结果进行按时间戳对齐。

详细的算法描述如下[5]

(1) 创建一个最小堆,堆里面存储“时间戳”,该堆将按照每个时间戳的大小进行组织。
(2) 初始化堆,依次访问每一个 FileSeriesReader,如果该 FileSeriesReader 中还有数据点,则获取数据点的时间戳并放入堆中。此时每个时间序列最多有1个时间戳被放入到堆中,即该序列最小的时间戳。
(3) 如果堆的 size > 0,获取堆顶的时间戳,记为t,并将其在堆中删除,进入步骤(4);如果堆的 size 等于0,则跳到步骤(5),结束数据合并过程。
(4) 创建新的 RowRecord。依次遍历每一条时间序列。在处理其中一条时间序列时,如果该序列没有更多的数据点,则将该列标记为 null 并添加在 RowRecord 中;否则,判断最小的时间戳是否与 t 相同,若不相同,则将该列标记为 null 并添加在 RowRecord 中。若相同,将该数据点添加在 RowRecord 中,同时判断该时间序列是否有新的数据点,若存在,则将下一个时间戳 t' 添加在堆中,并将 t' 设为当前时间序列的最小时间戳。最后,返回步骤(3)。
(5) 结束数据合并过程。

连接查询

连接查询生成满足“选择条件”的时间戳、查询被投影列在对应时间戳下的数据点、合成RowRecord,即针对满足条件的时间戳,将其依次投影至各时间序列上,记录满足条件的数据。

主要流程如下[5]

(1) 根据 QueryExpression,初始化时间戳计算模块 TimeGeneratorImpl
(2) 为每个被投影的时间序列创建 FileSeriesReaderByTimestamp
(3) 如果“时间戳计算模块”中还有下一个时间戳,则计算出下一个时间戳 t ,进入步骤(4);否则,结束查询。
(4) 根据 t,在每个时间序列上使用 FileSeriesReaderByTimestamp 组件获取在时间戳 t 下的数据点;如果在该时间戳下没有对应的数据点,则用 null 表示。
(5) 将步骤(4)中得到的所有数据点合并成一个 RowRecord,此时得到一条查询结果,返回步骤(3)计算下一个查询结果。

查询流程

TsFileExecutor接收一个QueryExpression,执行该查询并返回相应的 QueryDataSet

其基本流程如下:

(1)接收一个 QueryExpression
(2)如果无过滤条件,执行归并查询。如果该 QueryExpression 包含 Filter(过滤条件),则通过 ExpressionOptimizer 对该 QueryExpression 的 Filter 进行优化。如果是 GlobalTimeExpression,执行归并查询。如果包含值过滤,交给 ExecutorWithTimeGenerator 执行连接查询。
(3) 生成对应的 QueryDataSet,迭代地生成 RowRecord,将查询结果返回。

源码分析

此处引用的为v0.12.0[4]的代码
这里未介绍 value filter 相关的查询,详细的介绍可以参考[5],代码可以参考[4]

接口调用

// 这里展示了一个具有 0,1,2,3 4个设备,每个设备具有1,2,3 3个测点的TsFile的例子 
// 初始化 TsFile reader
try (TsFileSequenceReader reader = new TsFileSequenceReader(path);
        ReadOnlyTsFile readTsFile = new ReadOnlyTsFile(reader)) {

无 filter

      // 将查询映射到设备0的各个 path (测点)上
      ArrayList<Path> paths = new ArrayList<>();
      paths.add(new Path("device_0", "sensor_1"));
      paths.add(new Path("device_0", "sensor_2"));
      paths.add(new Path("device_0", "sensor_3"));


      // 不使用 filter的情况,应该查询出所有设备0某一测点具有有效值的数据点对应的时间戳下的设备0的测点1,2,3的情况
      queryAndPrint(paths, readTsFile, null);
}

存在 time filter

      // 使用 time filter (4 <= time <= 10) 的情况,应该选出时间戳在[4,10]间的有效记录
      IExpression timeFilter =
          BinaryExpression.and(
              new GlobalTimeExpression(TimeFilter.gtEq(4L)),
              new GlobalTimeExpression(TimeFilter.ltEq(10L)));
      queryAndPrint(paths, readTsFile, timeFilter);

其中,查询执行部分接口调用如下:

// 其中,paths 为查询的全体测点集,statement 为查询条件
private static void queryAndPrint(
      ArrayList<Path> paths, ReadOnlyTsFile readTsFile, IExpression statement) throws IOException {
    // 根据查询条件生成queryExpression
    QueryExpression queryExpression = QueryExpression.create(paths, statement);
    // 执行查询,如若存在过滤条件 (filter) 则先进行优化
    QueryDataSet queryDataSet = readTsFile.query(queryExpression);
    
    // 完成查询,并输出全部查询结果
    while (queryDataSet.hasNext()) {
      System.out.println(queryDataSet.next());
    }
    System.out.println("------------");
  }

另外值得一提的是IoTDB还提供了可以根据 offset 逐 byte 读取 TsFile 的接口:

marker = reader.readMarker()

流程解析

如上文所述,查询过程的主要流程包括生成QueryExpression,(对于存在filter时) 优化, 查询 (归并查询 / 连接查询)。我们发现,在生成QueryExpression后,由readTsFile.query()生成queryDataSet,再迭代地生成RowRecord

查看readTsFile.query()模块有:

public QueryDataSet query(QueryExpression queryExpression) throws IOException {
    return tsFileExecutor.execute(queryExpression);
  }

public QueryDataSet execute(QueryExpression queryExpression) throws IOException {
    // 使用 bloom filter 判断检索对象 path 是否存在
    BloomFilter bloomFilter = metadataQuerier.getWholeFileMetadata().getBloomFilter();
    List<Path> filteredSeriesPath = new ArrayList<>();
    if (bloomFilter != null) {
      for (Path path : queryExpression.getSelectedSeries()) {
        if (bloomFilter.contains(path.getFullPath())) {
          filteredSeriesPath.add(path);
        }
      }
      queryExpression.setSelectSeries(filteredSeriesPath);
    }

    // 加载metadata
    metadataQuerier.loadChunkMetaDatas(queryExpression.getSelectedSeries());
   
    // 存在 filter 时
    if (queryExpression.hasQueryFilter()) {
      try {
        // 获取查询表达式
        IExpression expression = queryExpression.getExpression();
        // 获取可执行查询表达式
        IExpression regularIExpression =
            ExpressionOptimizer.getInstance()
                .optimize(expression, queryExpression.getSelectedSeries());
        // 更新为对应的可执行表达式
        queryExpression.setExpression(regularIExpression);
        
        // 当可执行表达式为 GlobalTimeExpression 时,及全由 GlobalTimeExpression 组成时,执行归并查询
        if (regularIExpression instanceof GlobalTimeExpression) {
          
          return execute(
              queryExpression.getSelectedSeries(), (GlobalTimeExpression) regularIExpression);
        } else {
          // 反之,执行连接查询
          return new ExecutorWithTimeGenerator(metadataQuerier, chunkLoader)
              .execute(queryExpression);
        }
      } catch (QueryFilterOptimizationException | NoMeasurementException e) {
        throw new IOException(e);
      }
    } else {
      try {
        // 没有 filter 时,执行归并查询
        return execute(queryExpression.getSelectedSeries());
      } catch (NoMeasurementException e) {
        throw new IOException(e);
      }
    }
  }

// 其中对于存在 GlobalTimeExpression 的 filter 时,以及无 filter 时,有:
private QueryDataSet executeMayAttachTimeFiler(
      List<Path> selectedPathList, GlobalTimeExpression timeExpression)
      throws IOException, NoMeasurementException {
    List<AbstractFileSeriesReader> readersOfSelectedSeries = new ArrayList<>();
    List<TSDataType> dataTypes = new ArrayList<>();

    for (Path path : selectedPathList) {
      List<IChunkMetadata> chunkMetadataList = metadataQuerier.getChunkMetaDataList(path);
      AbstractFileSeriesReader seriesReader;
      if (chunkMetadataList.isEmpty()) {
        seriesReader = new EmptyFileSeriesReader();
        dataTypes.add(metadataQuerier.getDataType(path));
      } else {
        // 没有filter 时
        if (timeExpression == null) {
          seriesReader = new FileSeriesReader(chunkLoader, chunkMetadataList, null);
        } else {
          // 存在 GlobalTimeExpression 的 filter 时,将 GlobalTimeExpression 的条件加入每一个时间序列的 reader 中
          seriesReader =
              new FileSeriesReader(chunkLoader, chunkMetadataList, timeExpression.getFilter());
        }
        dataTypes.add(chunkMetadataList.get(0).getDataType());
      }
      readersOfSelectedSeries.add(seriesReader);
    }
    // 开始归并查询
    return new DataSetWithoutTimeGenerator(selectedPathList, dataTypes, readersOfSelectedSeries);
  }

归并查询

// 初始化堆
private void initHeap() throws IOException {
    hasDataRemaining = new ArrayList<>();
    batchDataList = new ArrayList<>();
    timeHeap = new PriorityQueue<>();
    timeSet = new HashSet<>();

    // 初始化堆,依次访问每个 series reader    
    for (int i = 0; i < paths.size(); i++) {
      AbstractFileSeriesReader reader = readers.get(i);
      
      // 如果没有数据点时
      if (!reader.hasNextBatch()) {
        batchDataList.add(new BatchData());
        hasDataRemaining.add(false);
      } else {
        
        // 如果还有数据点,将其加入list
        batchDataList.add(reader.nextBatch());
        hasDataRemaining.add(true);
      }
    }

    // 获取 list 中有效数据点中时间戳放入堆中
    for (BatchData data : batchDataList) {
      if (data.hasCurrent()) {
        timeHeapPut(data.currentTime());
      }
    }
  }

queryDataSet.hasNext()中判断堆的size是否大于0,

如若大于0,有:

public RowRecord nextWithoutConstraint() throws IOException {
    // 获取堆顶记录的最小时间
    long minTime = timeHeapGet();

    // 新建 RowRecord
    RowRecord record = new RowRecord(minTime);

    // 访问每一个选中的时间序列
    for (int i = 0; i < paths.size(); i++) {

      Field field = new Field(dataTypes.get(i));

      // 没有更多的数据点时,标记为null
      if (!hasDataRemaining.get(i)) {
        record.addField(null);
        continue;
      }
      
      // 获取数据点
      BatchData data = batchDataList.get(i);
      
      // 如果当前的最小时间戳等于该测点当前时间戳
      if (data.hasCurrent() && data.currentTime() == minTime) {
        // 添加该数据点
        putValueToField(data, field);
        // 移动到下一个时间戳
        data.next();
        
        // 没有下一个时间戳时
        if (!data.hasCurrent()) {
          AbstractFileSeriesReader reader = readers.get(i);
          // 判断有无nextBatch,如有,则判断其中有无新的时间戳,如有则将其加入堆中
          if (reader.hasNextBatch()) {
            data = reader.nextBatch();
            if (data.hasCurrent()) {
              batchDataList.set(i, data);
              timeHeapPut(data.currentTime());
            } else {
              hasDataRemaining.set(i, false);
            }
          } else {
            hasDataRemaining.set(i, false);
          }
        } else {
          // 如果有下一个时间戳,将其加入堆中
          timeHeapPut(data.currentTime());
        }
        record.addField(field);
      } else {
        // 如果两个时间不相同,则将其标志为 null
        record.addField(null);
      }
    }
    return record;
  }

连接查询

public DataSetWithTimeGenerator execute(QueryExpression queryExpression) throws IOException {
    
    // 获取查询表达式
    IExpression expression = queryExpression.getExpression();
    List<Path> selectedPathList = queryExpression.getSelectedSeries();

    // 根据查询表达式获取时间戳计算单元
    TimeGenerator timeGenerator = new TsFileTimeGenerator(expression, chunkLoader, metadataQuerier);
    
    List<Boolean> cached =
        markFilterdPaths(expression, selectedPathList, timeGenerator.hasOrNode());
    // 为每个被投影的时间序列创建 FileSeriesReaderByTimestamp
    List<FileSeriesReaderByTimestamp> readersOfSelectedSeries = new ArrayList<>();
    List<TSDataType> dataTypes = new ArrayList<>();

    Iterator<Boolean> cachedIterator = cached.iterator();
    Iterator<Path> selectedPathIterator = selectedPathList.iterator();
    // 如果“时间戳计算模块”中还有下一个时间戳,则计算出下一个时间戳
    while (cachedIterator.hasNext()) {
      boolean cachedValue = cachedIterator.next();
      Path selectedPath = selectedPathIterator.next();

      List<IChunkMetadata> chunkMetadataList = metadataQuerier.getChunkMetaDataList(selectedPath);
      if (chunkMetadataList.size() != 0) {
        dataTypes.add(chunkMetadataList.get(0).getDataType());
        
        // 如果该时间戳下没有满足 filter 的数据点,则将各对应的测点下该时间戳处设为null
        if (cachedValue) {
          readersOfSelectedSeries.add(null);
          continue;
        }
        FileSeriesReaderByTimestamp seriesReader =
            new FileSeriesReaderByTimestamp(chunkLoader, chunkMetadataList);
        
        // 将该 FileSeriesReaderByTimestamp 组件加入 readersOfSelectedSeries 以便获取在该时间戳下的数据点
        readersOfSelectedSeries.add(seriesReader);
      } else {
        // 如果 selectedPath 为空
        selectedPathIterator.remove();
        cachedIterator.remove();
      }
    }

    // 返回 DataSetWithTimeGenerator
    return new DataSetWithTimeGenerator(
        selectedPathList, cached, dataTypes, timeGenerator, readersOfSelectedSeries);
  }

// 根据上述“时间戳计算模块”中算出的时间戳,对于每一个时间戳,分别在每个时间序列上使用之前记录的 FileSeriesReaderByTimestamp 组件获取在该时间戳下的数据点,并合并成一个 RowRecord,得到一个查询结果
public RowRecord nextWithoutConstraint() throws IOException {
    // 获取下一个“时间戳计算模块”中算出的时间戳
    long timestamp = timeGenerator.next();
    RowRecord rowRecord = new RowRecord(timestamp);
    
    // 对于每个时间序列,获取在该时间戳下的数据点
    for (int i = 0; i < paths.size(); i++) {

      // 如果是存在 filter 作用在该时间序列的情况
      if (cached.get(i)) {
        // 利用 time generator 中的 reader 获取对应数据点
        Object value = timeGenerator.getValue(paths.get(i));
        
        // 加入 RowRecord
        rowRecord.addField(value, dataTypes.get(i));
        continue;
      }

      // 如果是不存在 filter 作用在该时间序列的情况
      // 利用 time generator 中的 reader 获取对应数据点
      FileSeriesReaderByTimestamp fileSeriesReaderByTimestamp = readers.get(i);
      Object value = fileSeriesReaderByTimestamp.getValueInTimestamp(timestamp);
      // 加入 RowRecord
      rowRecord.addField(value, dataTypes.get(i));
    }

    return rowRecord;
  }

  1. 物联网时序数据库 Apache IoTDB,详细信息可以在https://iotdb.apache.org/中找到。

  2. https://iotdb.apache.org/zh/SystemDesign/TsFile/Format.html

  3. https://iotdb.apache.org/zh/SystemDesign/TsFile/Write.html

  4. https://github.com/apache/iotdb

  5. https://iotdb.apache.org/zh/SystemDesign/TsFile/Read.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,711评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,079评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,194评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,089评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,197评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,306评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,338评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,119评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,541评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,846评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,014评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,694评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,322评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,026评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,257评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,863评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,895评论 2 351

推荐阅读更多精彩内容