[IoTDB 学习笔记] [part 3] 元数据管理 MManager

说在前面: 这个list记录了博主在学习IoTDB[1]期间的总结和思考,欢迎一起讨论学习哈
相关介绍可以参考list的第一篇博客:[IoTDB 学习笔记] [part1] 介绍

MManager 基本功能及结构

MManager 提供了7种需要记录日志的针对时间序列节点和存储组节点的操作以及六种针对时间序列的标签 tag与属性attribute的操作,每个操作在操作前先获得整个元数据的写锁,操作完后释放:

  • 创建时间序列
  • 删除时间序列
  • 设置存储组
  • 删除存储组
  • 设置TTL
  • 改变时间序列标签信息offset
  • 改变时间序列的别名

tag & attribute operation:

  • 重命名标签或属性
  • 重新设置标签或属性的值
  • 删除已经存在的标签或属性
  • 添加新的标签
  • 添加新的属性
  • 更新插入标签和属性

MManager中的元数据主要以元数据树的形式存在,树中包含三种节点:StorageGroupMNodeInternalMNode(非叶子节点)、LeafMNode(叶子节点),他们都是MNode的子类。

每个InternalMNode中都有一个读写锁,查询元数据信息时,需要获得路径上每一个InternalMNode的读锁,修改元数据信息时,如果修改的是LeafMNode,需要获得其父节点的写锁,若修改的是InternalMNode,则只需获得本身的写锁。若该InternalMNode位于 Device 层,则还包含了一个Map<String, MNode> aliasChildren,用于存储别名信息;
StorageGroupMNode继承InternalMNode,作为存储组的节点;
LeafMNode中包含了对应时间序列的 Schema 信息,其alias以及该时间序列的标签/属性信息在 tlog 文件中的offset

一个示例如图 1 [2]所示,示例中的整个元数据树分为4层,root 层,storage group 层,device 层以及 measurement 层。

图 1

MTree中提供了用于创建或删除存储组与时间序列的操作。

在创建存储组的时候,首先创建所有中间节点,然后确保路径前缀中不包含其他的存储组(存储组间互不包含),然后再确定目标的存储组不存在,如若前述的条件均满足,则创建存储组节点,并添加到相应的路径位置下。

在创建时间序列时,首先确保其中间节点均存在,然后确定目标叶节点不存在,如若均满足则创建叶节点,如有alias则再创建一个目标节点的兄弟节点指向该叶节点。

在删除存储组或时间序列的时候,先将目标节点在其父节点中的记录删除,然后如若删除后父节点为空则递归向上删除。

同时,MManager还提供了元数据查询的功能:

  • 不带过滤条件的元数据查询
    根据是否需要根据热度排序,调用getAllMeasurementSchemaByHeatOrder (需要) 或者getAllMeasurementSchema (不需要)。

  • 带过滤条件的元数据查询
    其中过滤条件只能是tag属性。
    通过在MManager中维护的tag的倒排索引,获得所有满足索引条件的MeasurementMNode。如若需要根据热度排序则根据lastTimeStamp排序,反之根据序列名的字母序排序。

其中,如果元数据较多,一次输出的查询结果可能导致OOM,此时考虑使用fetch size参数 (服务器端一次最多只取 fetch size 个时间序列)。

源码分析

MManager 基本结构

MManager 包含如下内部属性

  public static final String TIME_SERIES_TREE_HEADER = "===  Timeseries Tree  ===\n\n";
  private static final String TAG_FORMAT = "tag key is %s, tag value is %s, tlog offset is %d";
  private static final String DEBUG_MSG = "%s : TimeSeries %s is removed from tag inverted index, ";
  private static final String DEBUG_MSG_1 =
      "%s: TimeSeries %s's tag info has been removed from tag inverted index ";
  private static final String PREVIOUS_CONDITION =
      "before deleting it, tag key is %s, tag value is %s, tlog offset is %d, contains key %b";

  private static final int UPDATE_SCHEMA_MAP_IN_ARRAYPOOL_THRESHOLD = 5000;

  private static final Logger logger = LoggerFactory.getLogger(MManager.class);

  // 用于生成自动快照的判断 MTree 是否修改的时间间隔阈值
  private static final long MTREE_SNAPSHOT_THREAD_CHECK_TIME = 600L;

  private final int mtreeSnapshotInterval;
  private final long mtreeSnapshotThresholdTime;

  private String logFilePath;
  private String mtreeSnapshotPath;
  private String mtreeSnapshotTmpPath;
  // 元数据信息存储在MTree当中
  private MTree mtree;
  private MLogWriter logWriter;
  private TagLogFile tagLogFile;
  private boolean isRecovering;
  // device -> DeviceMNode
  private RandomDeleteCache<PartialPath, Pair<MNode, Template>> mNodeCache;
  // tag key -> tag value -> LeafMNode
  private Map<String, Map<String, Set<MeasurementMNode>>> tagIndex = new ConcurrentHashMap<>();

  // data type -> number
  private Map<TSDataType, Integer> schemaDataTypeNumMap = new ConcurrentHashMap<>();

  private long reportedDataTypeTotalNum;
  private AtomicLong totalSeriesNumber = new AtomicLong();
  private boolean initialized;
  protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();

  private File logFile;
  private ScheduledExecutorService timedCreateMTreeSnapshotThread;
  private ScheduledExecutorService timedForceMLogThread;

  // MTree 的大小阈值
  private static final long MTREE_SIZE_THRESHOLD = config.getAllocateMemoryForSchema();

  private boolean allowToCreateNewSeries = true;

  private static final int ESTIMATED_SERIES_SIZE = config.getEstimatedSeriesSize();

  // template name -> template
  private Map<String, Template> templateMap = new ConcurrentHashMap<>();

基本操作

节点的基本操作通过MManager的内部函数实现,其中多数操作主要通过调用MTree对应方法实现。

在这些基本操作之中这里主要分析setStorageGroupcreateTimeseriesdeleteStorageGroupsshowTimeseriesWithoutIndex

  • setStorageGroup

设置存储组主要通过MTree下的setStorageGroup接口来实现

void setStorageGroup(PartialPath path) throws MetadataException {
    String[] nodeNames = path.getNodes();
    // 检查路径是否满足格式
    checkStorageGroup(path.getFullPath());
    // 将当前节点设置为根节点
    MNode cur = root;
    if (nodeNames.length <= 1 || !nodeNames[0].equals(root.getName())) {
      throw new IllegalPathException(path.getFullPath());
    }
    int i = 1;
    // e.g., path = root.a.b.sg, 生成中间节点 (类型为 MNode)
    while (i < nodeNames.length - 1) {
      MNode temp = cur.getChild(nodeNames[i]);
      // 查看中间节点是否存在,不存在则添加
      if (temp == null) {
        cur.addChild(nodeNames[i], new MNode(cur, nodeNames[i]));
      } else if (temp instanceof StorageGroupMNode) {
        // 如果中间节点是 StorageGroupMNode,抛出错误
        throw new StorageGroupAlreadySetException(temp.getFullPath());
      }
      // 切换当前节点
      cur = cur.getChild(nodeNames[i]);
      i++;
    }
    if (cur.hasChild(nodeNames[i])) {
      // 如果目标存储组节点存在,抛出错误
      if (cur.getChild(nodeNames[i]) instanceof StorageGroupMNode) {
        throw new StorageGroupAlreadySetException(path.getFullPath());
      } else {
        throw new StorageGroupAlreadySetException(path.getFullPath(), true);
      }
    } else {
      // 如若不存在则生成存储组节点并添加到对应位置 (当前节点的子节点)
      StorageGroupMNode storageGroupMNode =
          new StorageGroupMNode(
              cur, nodeNames[i], IoTDBDescriptor.getInstance().getConfig().getDefaultTTL());
      cur.addChild(nodeNames[i], storageGroupMNode);
    }
  }
  • createTimeseries

设置存储组主要通过MTree下的createTimeseries接口来实现

// MManager 中的 createTimeseries
public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException {
    // 检查还能否写入新的时间序列
    if (!allowToCreateNewSeries) {
      throw new MetadataException(
          "IoTDB system load is too large to create timeseries, "
              + "please increase MAX_HEAP_SIZE in iotdb-env.sh/bat and restart");
    }
    try {
      PartialPath path = plan.getPath();
      SchemaUtils.checkDataTypeWithEncoding(plan.getDataType(), plan.getEncoding());
      
      // 确保获取存储组路径
      ensureStorageGroup(path);

      TSDataType type = plan.getDataType();
      // 利用 MTree 生成 timeseries
      MeasurementMNode leafMNode =
          mtree.createTimeseries(
              path,
              type,
              plan.getEncoding(),
              plan.getCompressor(),
              plan.getProps(),
              plan.getAlias());

      // 更新 tag
      if (plan.getTags() != null) {
        // tag key, tag value
        for (Entry<String, String> entry : plan.getTags().entrySet()) {
          if (entry.getKey() == null || entry.getValue() == null) {
            continue;
          }
          tagIndex
              .computeIfAbsent(entry.getKey(), k -> new ConcurrentHashMap<>())
              .computeIfAbsent(entry.getValue(), v -> new CopyOnWriteArraySet<>())
              .add(leafMNode);
        }
      }

      // 更新 statistics 和 schemaDataTypeNumMap
      totalSeriesNumber.addAndGet(1);
      if (totalSeriesNumber.get() * ESTIMATED_SERIES_SIZE >= MTREE_SIZE_THRESHOLD) {
        logger.warn("Current series number {} is too large...", totalSeriesNumber);
        allowToCreateNewSeries = false;
      }
      updateSchemaDataTypeNumMap(type, 1);

      // 写 log
      if (!isRecovering) {
        // either tags or attributes is not empty
        if ((plan.getTags() != null && !plan.getTags().isEmpty())
            || (plan.getAttributes() != null && !plan.getAttributes().isEmpty())) {
          offset = tagLogFile.write(plan.getTags(), plan.getAttributes());
        }
        plan.setTagOffset(offset);
        logWriter.createTimeseries(plan);
      }
      leafMNode.setOffset(offset);

    } catch (IOException e) {
      throw new MetadataException(e);
    }
  }

// MTree 下的 createTimeseries
MeasurementMNode createTimeseries(
      PartialPath path,
      TSDataType dataType,
      TSEncoding encoding,
      CompressionType compressor,
      Map<String, String> props,
      String alias)
      throws MetadataException {
    // 获取分级 path
    String[] nodeNames = path.getNodes();
    // 判断是否是合法的 Timeseries 路径 (path长度不小于3 (root -> [storage group] -> device -> Timeseries))
    if (nodeNames.length <= 2 || !nodeNames[0].equals(root.getName())) {
      throw new IllegalPathException(path.getFullPath());
    }
    // 判断是否是合法的 Timeseries 路径 (使用正则式判断)
    checkTimeseries(path.getFullPath());
    MNode cur = root;
    boolean hasSetStorageGroup = false;
    Template upperTemplate = cur.getDeviceTemplate();
    // e.g, path = root.sg.d1.s1,构建中间节点,并让 cur 设置为 d1
    // 从 root 后第一层节点开始构建路径 (measurement 之前)
    for (int i = 1; i < nodeNames.length - 1; i++) {
      String nodeName = nodeNames[i];
      // cur 指向 storage group node 时
      if (cur instanceof StorageGroupMNode) {
        hasSetStorageGroup = true;
      }
      if (!cur.hasChild(nodeName)) {
        if (!hasSetStorageGroup) {
          throw new StorageGroupNotSetException("Storage group should be created first");
        }
        cur.addChild(nodeName, new MNode(cur, nodeName));
      }
      cur = cur.getChild(nodeName);

      if (cur.getDeviceTemplate() != null) {
        upperTemplate = cur.getDeviceTemplate();
      }
    }

    if (upperTemplate != null && !upperTemplate.isCompatible(path)) {
      throw new PathAlreadyExistException(
          path.getFullPath() + " ( which is incompatible with template )");
    }

    if (props != null && props.containsKey(LOSS) && props.get(LOSS).equals(SDT)) {
      checkSDTFormat(path.getFullPath(), props);
    }

    // 获取叶节点 (measurement 节点) 名称
    String leafName = nodeNames[nodeNames.length - 1];

    // 保持添加叶节点以及 alias 过程的原子性
    // 将写部分设置为 synchronized (上锁)
    synchronized (this) {
      MNode child = cur.getChild(leafName);
      if (child instanceof MeasurementMNode || child instanceof StorageGroupMNode) {
        throw new PathAlreadyExistException(path.getFullPath());
      }

      if (alias != null) {
        MNode childByAlias = cur.getChild(alias);
        // 判断该设备节点下是否已经存在同名 alias
        if (childByAlias instanceof MeasurementMNode) {
          throw new AliasAlreadyExistException(path.getFullPath(), alias);
        }
      }

      // 生成叶节点
      MeasurementMNode measurementMNode =
          new MeasurementMNode(cur, leafName, alias, dataType, encoding, compressor, props);
      if (child != null) {
        // 如若存在重名的叶节点,则覆盖掉
        cur.replaceChild(measurementMNode.getName(), measurementMNode);
      } else {
        // 如若不存在,则直接添加该叶节点
        cur.addChild(leafName, measurementMNode);
      }

      // 添加 alias
      if (alias != null) {
        cur.addAlias(alias, measurementMNode);
      }

      return measurementMNode;
    }
    // 解锁 
  }
  • deleteStorageGroups

设置存储组主要通过MTree下的deleteStorageGroups接口来实现

// MManager 中的 deleteStorageGroups
public void deleteStorageGroups(List<PartialPath> storageGroups) throws MetadataException {
    try {
      // 遍历要删除的存储组
      for (PartialPath storageGroup : storageGroups) {
        totalSeriesNumber.addAndGet(mtree.getAllTimeseriesCount(storageGroup));
        // 清除 MNode Cache
        if (!allowToCreateNewSeries
            && totalSeriesNumber.get() * ESTIMATED_SERIES_SIZE < MTREE_SIZE_THRESHOLD) {
          logger.info("Current series number {} come back to normal level", totalSeriesNumber);
          allowToCreateNewSeries = true;
        }
        mNodeCache.clear();

        // 删除存储组
        List<MeasurementMNode> leafMNodes = mtree.deleteStorageGroup(storageGroup);
        for (MeasurementMNode leafMNode : leafMNodes) {
          removeFromTagInvertedIndex(leafMNode);
          // 更新 statistics (in schemaDataTypeNumMap)
          updateSchemaDataTypeNumMap(leafMNode.getSchema().getType(), -1);
        }

        // 删除 triggers
        TriggerEngine.drop(leafMNodes);

        if (!config.isEnableMemControl()) {
          MemTableManager.getInstance().addOrDeleteStorageGroup(-1);
        }

        // 正常进行完成时,写 log
        if (!isRecovering) {
          logWriter.deleteStorageGroup(storageGroup);
        }
      }
    } catch (IOException e) {
      throw new MetadataException(e.getMessage());
    }
  }

// MTree 中的 deleteStorageGroups
List<MeasurementMNode> deleteStorageGroup(PartialPath path) throws MetadataException {
    // 设置当前节点为目标节点
    MNode cur = getNodeByPath(path);
    // 当前节点并非存储组节点时,抛出错误
    if (!(cur instanceof StorageGroupMNode)) {
      throw new StorageGroupNotSetException(path.getFullPath());
    }
    // 假设正在一个包含 root.a.b.sg1, root.a.sg2 路径的元数据树下删除 root.a.b.sg1
    // 删除 sg1
    cur.getParent().deleteChild(cur.getName());

    // 获取该存储组下所有叶节点
    List<MeasurementMNode> leafMNodes = new LinkedList<>();
    Queue<MNode> queue = new LinkedList<>();
    queue.add(cur);
    // 递归加载子节点,获取其下所有叶节点
    while (!queue.isEmpty()) {
      MNode node = queue.poll();
      for (MNode child : node.getChildren().values()) {
        if (child instanceof MeasurementMNode) {
          leafMNodes.add((MeasurementMNode) child);
        } else {
          queue.add(child);
        }
      }
    }
    
    // 设置当前节点为目标节点父节点
    cur = cur.getParent();
    // 如果删除目标节点后,父节点为空,则删除父节点,并递归向上重复该过程
    while (!IoTDBConstant.PATH_ROOT.equals(cur.getName()) && cur.getChildren().size() == 0) {
      cur.getParent().deleteChild(cur.getName());
      cur = cur.getParent();
    }
    return leafMNodes;
  }
  • showTimeseriesWithoutIndex

主要通过调用MTree的对应接口getAllMeasurementSchemaByHeatOrder或者getAllMeasurementSchema实现,这里选取getAllMeasurementSchema进行分析

// MManager 中的 showTimeseriesWithoutIndex
private List<ShowTimeSeriesResult> showTimeseriesWithoutIndex(
      ShowTimeSeriesPlan plan, QueryContext context) throws MetadataException {
    List<Pair<PartialPath, String[]>> ans;
    // 对于热度顺序排序的情况
    if (plan.isOrderByHeat()) {
      ans = mtree.getAllMeasurementSchemaByHeatOrder(plan, context);
    } else {
      // 对于未使用热度顺序排序的情况
      ans = mtree.getAllMeasurementSchema(plan);
    }
    // 初始化
    List<ShowTimeSeriesResult> res = new LinkedList<>();
    for (Pair<PartialPath, String[]> ansString : ans) {
      long tagFileOffset = Long.parseLong(ansString.right[5]);
      try {
        Pair<Map<String, String>, Map<String, String>> tagAndAttributePair =
            new Pair<>(Collections.emptyMap(), Collections.emptyMap());
        if (tagFileOffset >= 0) {
          tagAndAttributePair = tagLogFile.read(config.getTagAttributeTotalSize(), tagFileOffset);
        }
        // 添加查找结果
        res.add(
            new ShowTimeSeriesResult(
                ansString.left.getFullPath(),
                ansString.right[0],
                ansString.right[1],
                TSDataType.valueOf(ansString.right[2]),
                TSEncoding.valueOf(ansString.right[3]),
                CompressionType.valueOf(ansString.right[4]),
                tagAndAttributePair.left,
                tagAndAttributePair.right));
      } catch (IOException e) {
        throw new MetadataException(
            "Something went wrong while deserialize tag info of " + ansString.left.getFullPath(),
            e);
      }
    }
    return res;
  }

// MTree 中的 getAllMeasurementSchema
List<Pair<PartialPath, String[]>> getAllMeasurementSchema(
      ShowTimeSeriesPlan plan, boolean removeCurrentOffset) throws MetadataException {
    List<Pair<PartialPath, String[]>> res = new LinkedList<>();
    // 获取分层节点名称
    String[] nodes = plan.getPath().getNodes(); 
    if (nodes.length == 0 || !nodes[0].equals(root.getName())) {
      throw new IllegalPathException(plan.getPath().getFullPath());
    }
    // 设置 limit
    limit.set(plan.getLimit()); 
    //设置 offset
    offset.set(plan.getOffset()); 
    curOffset.set(-1);
    count.set(0);
    // 查询目标时间序列
    findPath(root, nodes, 1, res, offset.get() != 0 || limit.get() != 0, false, null, null);
    // 避免内存泄漏
    limit.remove();
    offset.remove();
    if (removeCurrentOffset) {
      curOffset.remove();
    }
    count.remove();
    return res;
  }

// 其中的 findPath 部分
private void findPath(
      MNode node,
      String[] nodes,
      int idx,
      List<Pair<PartialPath, String[]>> timeseriesSchemaList,
      boolean hasLimit,
      boolean needLast,
      QueryContext queryContext,
      Template upperTemplate)
      throws MetadataException {
    if (node instanceof MeasurementMNode && nodes.length <= idx) {
      if (hasLimit) {
        curOffset.set(curOffset.get() + 1);
        if (curOffset.get() < offset.get() || count.get().intValue() == limit.get().intValue()) {
          return;
        }
      }
      IMeasurementSchema measurementSchema = ((MeasurementMNode) node).getSchema();
      if (measurementSchema instanceof MeasurementSchema) {
        // 查询完叶节点时
        addMeasurementSchema(
            node, timeseriesSchemaList, needLast, queryContext, measurementSchema, "*");
      } else if (measurementSchema instanceof VectorMeasurementSchema) {
        String lastWord = nodes[nodes.length - 1];
        addVectorMeasurementSchema(
            node,
            timeseriesSchemaList,
            needLast,
            queryContext,
            measurementSchema,
            nodes.length == idx ? lastWord : "*");
      }
      if (hasLimit) {
        count.set(count.get() + 1);
      }
    }

    // 获取下一层节点名称
    String nodeReg = MetaUtils.getNodeRegByIdx(idx, nodes); 
    if (node.getDeviceTemplate() != null) {
      upperTemplate = node.getDeviceTemplate();
    }

    // 判断是否使用通配符*;或者查询完叶节点
    if (!nodeReg.contains(PATH_WILDCARD)) {
      MNode next = null;
      // 判断是否是并列表示的节点
      if (nodeReg.contains("(") && nodeReg.contains(",")) { 
        next = node.getChildOfAlignedTimeseries(nodeReg);
      } else {
        // 获取下一层节点
        next = node.getChild(nodeReg);
      }
      if (next != null) {
        // 下一层节点非空时递归查询
        findPath(
            next,
            nodes,
            idx + 1,
            timeseriesSchemaList,
            hasLimit,
            needLast,
            queryContext,
            upperTemplate);
      }
    } else {
      // 对于存在通配符时遍历该节点所有子节点
      for (MNode child : node.getDistinctMNodes()) {
        boolean continueSearch = false;
        if (child instanceof MeasurementMNode
            && ((MeasurementMNode) child).getSchema() instanceof VectorMeasurementSchema) {
          // 对于 VectorMeasurementSchema 的 MeasurementMNode 的情况
          List<String> measurementsList =
              ((MeasurementMNode) child).getSchema().getValueMeasurementIdList();
          for (String measurement : measurementsList) {
            // 判断是否查到尽头
            if (Pattern.matches(nodeReg.replace("*", ".*"), measurement)) {
              continueSearch = true;
            }
          }
        } else {
          // 判断是否查到尽头
          if (Pattern.matches(nodeReg.replace("*", ".*"), child.getName())) {
            continueSearch = true;
          }
        }
        if (!continueSearch) {
          continue;
        }
      
        // 递归查询
        findPath(
            child,
            nodes,
            idx + 1,
            timeseriesSchemaList,
            hasLimit,
            needLast,
            queryContext,
            upperTemplate);
        if (hasLimit && count.get().intValue() == limit.get().intValue()) {
          break;
        }
      }
    }

    // 使用 template 时的部分
    if (!(node instanceof MeasurementMNode) && node.isUseTemplate()) {
      if (upperTemplate != null) {
        HashSet<IMeasurementSchema> set = new HashSet<>();
        for (IMeasurementSchema schema : upperTemplate.getSchemaMap().values()) {
          if (set.add(schema)) {
            if (schema instanceof MeasurementSchema) {
              addMeasurementSchema(
                  new MeasurementMNode(node, schema.getMeasurementId(), schema, null),
                  timeseriesSchemaList,
                  needLast,
                  queryContext,
                  schema,
                  nodeReg);
            } else if (schema instanceof VectorMeasurementSchema) {
              String firstNode = schema.getValueMeasurementIdList().get(0);
              addVectorMeasurementSchema(
                  new MeasurementMNode(node, firstNode, schema, null),
                  timeseriesSchemaList,
                  needLast,
                  queryContext,
                  schema,
                  nodeReg);
            }
          }
        }
      }
    }
  }

Log 文件

元数据日志管理

所有元数据的操作均会记录到元数据日志文件中,此文件默认为 data/system/schema/mlog.bin。

系统重启时会重做 mlog 中的日志,重做之前需要标记不需要记录日志。当重启结束后,标记需要记录日志。

元数据日志的类型由 MetadataOperationType 类记录。mlog 直接存储字符串编码。

源码分析

  • 一些 sql 对应的 mlog 记录
set storage group to root.turbine --> 2,root.turbine
delete storage group root.turbine --> 1,root.turbine
create timeseries root.turbine.d1.s1(temprature) with datatype=FLOAT, encoding=RLE, compression=SNAPPY tags(tag1=v1, tag2=v2) attributes(attr1=v1, attr2=v2) --> 0,root.turbine.d1.s1,3,2,1,,温度,offset

标签文件

所有时间序列的标签/属性信息都会保存在标签文件中,此文件默认为 data/system/schema/tlog.txt。

  • 每条时间序列的tagsattributes持久化后总字节数为 L,在 iotdb-engine.properties 中配置。

  • 持久化内容:Map<String,String> tagsMap<String,String> attributes,如果内容不足 L,则需补空。

源码分析

六种针对时间序列的标签与属性的操作则直接作为MManager的内置函数出现:renameTagOrAttributeKeysetTagsOrAttributesValuedropTagsOrAttributesaddTagsaddAttributesupsertTagsAndAttributes

在这些基本操作之中这里主要分析addAttributes

public void addAttributes(Map<String, String> attributesMap, PartialPath fullPath)
      throws MetadataException, IOException {
    // 获取目标节点
    MNode mNode = mtree.getNodeByPath(fullPath);
    // 如若目标节点非是 MeasurementMNode,抛出异常
    if (!(mNode instanceof MeasurementMNode)) {
      throw new PathNotExistException(fullPath.getFullPath());
    }
    MeasurementMNode leafMNode = (MeasurementMNode) mNode;
    // 没有 tag 或者attribute 的情况,需要在 tlog 中添加记录
    if (leafMNode.getOffset() < 0) {
      long offset = tagLogFile.write(Collections.emptyMap(), attributesMap);
      logWriter.changeOffset(fullPath, offset);
      leafMNode.setOffset(offset);
      return;
    }

    Pair<Map<String, String>, Map<String, String>> pair =
        tagLogFile.read(config.getTagAttributeTotalSize(), leafMNode.getOffset());

    // 遍历需要添加的 attribute
    for (Entry<String, String> entry : attributesMap.entrySet()) {
      // 获取键
      String key = entry.getKey();
      // 获取值
      String value = entry.getValue();
      // 目标 attribute 存在时,抛出异常
      if (pair.right.containsKey(key)) {
        throw new MetadataException(
            String.format("TimeSeries [%s] already has the attribute [%s].", fullPath, key));
      }
      // 不存在时,进行添加
      pair.right.put(key, value);
    }

    // 持久化
    tagLogFile.write(pair.left, pair.right, leafMNode.getOffset());
  }

MTree 检查点

为了加快 IoTDB 重启速度,IoTDB为MTree设置了检查点,这样避免了在重启时按行读取并复现 mlog.bin 中的信息。创建MTree的快照有两种方式:

  • 自动创建。每隔10分钟,后台线程检查MTree的最后修改时间,需要同时满足:用户超过1小时(可配置)没修改MTree且 mlog.bin 中积累了100000行日志(可配置)两个条件;

  • 手动创建。使用create snapshot for schema命令手动触发创建 MTree 快照。

创建过程

  1. 首先给MTree加读锁,防止创建快照过程中对其进行修改
  2. MTree序列化进临时 snapshot 文件(mtree.snapshot.tmp)。MTree的序列化采用“先子节点、后父节点”的深度优先序列化方式,将节点的信息按照类型转化成对应格式的字符串,便于反序列化时读取和组装MTree
    其中,字符串转化格式如下:
  • 普通节点:0,名字,子节点个数
  • 存储组节点:1,名字,TTL,子节点个数
  • 传感器节点:2,名字,别名,数据类型,编码,压缩方式,属性,偏移量,子节点个数
  1. 序列化结束后,将临时文件重命名为正式文件(mtree.snapshot),防止在序列化过程中出现服务器人为或意外关闭,导致序列化失败的情况。
  2. 调用MLogWriter.clear()方法,清空 mlog.bin
  3. 释放MTree读锁

源码分析

public void createMTreeSnapshot() {
    long time = System.currentTimeMillis();
    logger.info("Start creating MTree snapshot to {}", mtreeSnapshotPath);
    try {
      // 将 MTree 序列化进临时 snapshot 文件 mtreeSnapshotTmpPath
      mtree.serializeTo(mtreeSnapshotTmpPath);
      // 获取对应临时快照与快照文件
      File tmpFile = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotTmpPath);
      File snapshotFile = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotPath);
      // 如果路径下存在快照文件则先删除
      if (snapshotFile.exists()) {
        Files.delete(snapshotFile.toPath());
      }
      // 将临时文件重命名为正式文件
      if (tmpFile.renameTo(snapshotFile)) {
        logger.info(
            "Finish creating MTree snapshot to {}, spend {} ms.",
            mtreeSnapshotPath,
            System.currentTimeMillis() - time);
      }
      // 调用 clear() 方法,清空 mlog.bin
      logWriter.clear();
    } catch (IOException e) {
      logger.warn("Failed to create MTree snapshot to {}", mtreeSnapshotPath, e);
      if (SystemFileFactory.INSTANCE.getFile(mtreeSnapshotTmpPath).exists()) {
        try {
          Files.delete(SystemFileFactory.INSTANCE.getFile(mtreeSnapshotTmpPath).toPath());
        } catch (IOException e1) {
          logger.warn("delete file {} failed: {}", mtreeSnapshotTmpPath, e1.getMessage());
        }
      }
    }
  }

// 其中序列化的时候从子节点由下往上序列化,按照深度优先顺序
public void serializeTo(MLogWriter logWriter) throws IOException {
    serializeChildren(logWriter);

    logWriter.serializeStorageGroupMNode(this);
  }

public void serializeTo(MLogWriter logWriter) throws IOException {
    serializeChildren(logWriter);

    logWriter.serializeMNode(this);
  }

void serializeChildren(MLogWriter logWriter) throws IOException {
    if (children == null) {
      return;
    }
    for (Entry<String, MNode> entry : children.entrySet()) {
      entry.getValue().serializeTo(logWriter);
    }
  }

恢复过程

  1. 检查临时文件 mtree.snapshot.tmp 是否存在,如果存在证明在创建快照的序列化过程中出现服务器人为或意外关闭,导致序列化失败,删除临时文件;
  2. 检查快照文件 mtree.snapshot 是否存在。如果不存在,则使用新的MTree;否则启动反序列化过程,得到MTree
  3. 对于 mlog.bin 中的内容,逐行读取并操作,完成MTree的恢复。读取过程中更新logNumber,并返回,用于后面mlog.bin行数的记录。

源码分析

加载快照并通过反序列化得到MTree后读取 mlog.bin 时,通过 log 中的记录生成 redo 的plan,然后通过调用operation来进行操作的执行:

public void operation(PhysicalPlan plan) throws IOException, MetadataException {
    switch (plan.getOperatorType()) {
      case CREATE_TIMESERIES:
        CreateTimeSeriesPlan createTimeSeriesPlan = (CreateTimeSeriesPlan) plan;
        createTimeseries(createTimeSeriesPlan, createTimeSeriesPlan.getTagOffset());
        break;
      case CREATE_ALIGNED_TIMESERIES:
        CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan =
            (CreateAlignedTimeSeriesPlan) plan;
        createAlignedTimeSeries(createAlignedTimeSeriesPlan);
        break;
      case DELETE_TIMESERIES:
        DeleteTimeSeriesPlan deleteTimeSeriesPlan = (DeleteTimeSeriesPlan) plan;
        // cause we only has one path for one DeleteTimeSeriesPlan
        deleteTimeseries(deleteTimeSeriesPlan.getPaths().get(0));
        break;
      case SET_STORAGE_GROUP:
        SetStorageGroupPlan setStorageGroupPlan = (SetStorageGroupPlan) plan;
        setStorageGroup(setStorageGroupPlan.getPath());
        break;
      case DELETE_STORAGE_GROUP:
        DeleteStorageGroupPlan deleteStorageGroupPlan = (DeleteStorageGroupPlan) plan;
        deleteStorageGroups(deleteStorageGroupPlan.getPaths());
        break;
      case TTL:
        SetTTLPlan setTTLPlan = (SetTTLPlan) plan;
        setTTL(setTTLPlan.getStorageGroup(), setTTLPlan.getDataTTL());
        break;
      case CHANGE_ALIAS:
        ChangeAliasPlan changeAliasPlan = (ChangeAliasPlan) plan;
        changeAlias(changeAliasPlan.getPath(), changeAliasPlan.getAlias());
        break;
      case CHANGE_TAG_OFFSET:
        ChangeTagOffsetPlan changeTagOffsetPlan = (ChangeTagOffsetPlan) plan;
        changeOffset(changeTagOffsetPlan.getPath(), changeTagOffsetPlan.getOffset());
        break;
      case CREATE_TEMPLATE:
        CreateTemplatePlan createTemplatePlan = (CreateTemplatePlan) plan;
        createDeviceTemplate(createTemplatePlan);
        break;
      case SET_DEVICE_TEMPLATE:
        SetDeviceTemplatePlan setDeviceTemplatePlan = (SetDeviceTemplatePlan) plan;
        setDeviceTemplate(setDeviceTemplatePlan);
        break;
      case SET_USING_DEVICE_TEMPLATE:
        SetUsingDeviceTemplatePlan setUsingDeviceTemplatePlan = (SetUsingDeviceTemplatePlan) plan;
        setUsingDeviceTemplate(setUsingDeviceTemplatePlan);
        break;
      case AUTO_CREATE_DEVICE_MNODE:
        AutoCreateDeviceMNodePlan autoCreateDeviceMNodePlan = (AutoCreateDeviceMNodePlan) plan;
        autoCreateDeviceMNode(autoCreateDeviceMNodePlan);
        break;
      default:
        logger.error("Unrecognizable command {}", plan.getOperatorType());
    }
  }

具体恢复过程如下:

private int initFromLog(File logFile) throws IOException {
    File tmpFile = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotTmpPath);
    // 检查临时文件 mtree.snapshot.tmp 是否存在
    if (tmpFile.exists()) {
      logger.warn("Creating MTree snapshot not successful before crashing...");
      // 如果存在证明在创建快照的序列化过程中出现服务器人为或意外关闭,导致序列化失败,删除临时文件
      Files.delete(tmpFile.toPath());
    }

    // 检查快照文件 mtree.snapshot 是否存在
    File mtreeSnapshot = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotPath);
    long time = System.currentTimeMillis();
    // 判断快照是否存在   
    if (!mtreeSnapshot.exists()) {
      // 快照不存在的时候使用新的 MTree
      mtree = new MTree();
    } else {
      // 快照存在的时候,反序列话快照,然后利用其生成 MTree
      mtree = MTree.deserializeFrom(mtreeSnapshot);
      logger.debug(
          "spend {} ms to deserialize mtree from snapshot", System.currentTimeMillis() - time);
    }

    time = System.currentTimeMillis();
    // 判断 log 是否存在,存在时通过 log 恢复元数据
    if (logFile.exists()) {
      int idx = 0;
      try (MLogReader mLogReader =
          new MLogReader(config.getSchemaDir(), MetadataConstant.METADATA_LOG); ) {
        idx = applyMlog(mLogReader);
        logger.debug(
            "spend {} ms to deserialize mtree from mlog.bin", System.currentTimeMillis() - time);
        return idx;
      } catch (Exception e) {
        throw new IOException("Failed to parser mlog.bin for err:" + e);
      }
    } else {
      return 0;
    }
  }

  private int applyMlog(MLogReader mLogReader) {
    int idx = 0;
    // 不断按顺序读取 log 中的记录
    while (mLogReader.hasNext()) {
      PhysicalPlan plan = null;
      try {
        // 根据 log 中的记录生成plan
        plan = mLogReader.next();
        if (plan == null) {
          continue;
        }
        // 执行操作
        operation(plan);
        idx++;
      } catch (Exception e) {
        logger.error(
            "Can not operate cmd {} for err:", plan == null ? "" : plan.getOperatorType(), e);
      }
    }
    return idx;
  }

  1. 物联网时序数据库 Apache IoTDB,详细信息可以在https://iotdb.apache.org/中找到。

  2. https://iotdb.apache.org/zh/SystemDesign/SchemaManager/SchemaManager.html

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,711评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,079评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,194评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,089评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,197评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,306评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,338评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,119评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,541评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,846评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,014评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,694评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,322评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,026评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,257评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,863评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,895评论 2 351

推荐阅读更多精彩内容