说在前面: 这个list记录了博主在学习IoTDB[1]期间的总结和思考,欢迎一起讨论学习哈
相关介绍可以参考list的第一篇博客:[IoTDB 学习笔记] [part1] 介绍
MManager 基本功能及结构
MManager 提供了7种需要记录日志的针对时间序列节点和存储组节点的操作以及六种针对时间序列的标签
tag
与属性attribute
的操作,每个操作在操作前先获得整个元数据的写锁,操作完后释放:
- 创建时间序列
- 删除时间序列
- 设置存储组
- 删除存储组
- 设置TTL
- 改变时间序列标签信息offset
- 改变时间序列的别名
tag
&attribute
operation:
- 重命名标签或属性
- 重新设置标签或属性的值
- 删除已经存在的标签或属性
- 添加新的标签
- 添加新的属性
- 更新插入标签和属性
MManager
中的元数据主要以元数据树的形式存在,树中包含三种节点:StorageGroupMNode
、InternalMNode
(非叶子节点)、LeafMNode
(叶子节点),他们都是MNode
的子类。
每个InternalMNode
中都有一个读写锁,查询元数据信息时,需要获得路径上每一个InternalMNode
的读锁,修改元数据信息时,如果修改的是LeafMNode
,需要获得其父节点的写锁,若修改的是InternalMNode
,则只需获得本身的写锁。若该InternalMNode
位于 Device 层,则还包含了一个Map<String, MNode> aliasChildren
,用于存储别名信息;
StorageGroupMNode
继承InternalMNode
,作为存储组的节点;
LeafMNode
中包含了对应时间序列的 Schema 信息,其alias
以及该时间序列的标签/属性信息在 tlog 文件中的offset
。
一个示例如图 1 [2]所示,示例中的整个元数据树分为4层,root 层,storage group 层,device 层以及 measurement 层。
MTree
中提供了用于创建或删除存储组与时间序列的操作。
在创建存储组的时候,首先创建所有中间节点,然后确保路径前缀中不包含其他的存储组(存储组间互不包含),然后再确定目标的存储组不存在,如若前述的条件均满足,则创建存储组节点,并添加到相应的路径位置下。
在创建时间序列时,首先确保其中间节点均存在,然后确定目标叶节点不存在,如若均满足则创建叶节点,如有alias
则再创建一个目标节点的兄弟节点指向该叶节点。
在删除存储组或时间序列的时候,先将目标节点在其父节点中的记录删除,然后如若删除后父节点为空则递归向上删除。
同时,MManager
还提供了元数据查询的功能:
不带过滤条件的元数据查询
根据是否需要根据热度排序,调用getAllMeasurementSchemaByHeatOrder
(需要) 或者getAllMeasurementSchema
(不需要)。带过滤条件的元数据查询
其中过滤条件只能是tag属性。
通过在MManager中维护的tag的倒排索引,获得所有满足索引条件的MeasurementMNode
。如若需要根据热度排序则根据lastTimeStamp
排序,反之根据序列名的字母序排序。
其中,如果元数据较多,一次输出的查询结果可能导致OOM,此时考虑使用fetch size参数 (服务器端一次最多只取 fetch size 个时间序列)。
源码分析
MManager 基本结构
MManager 包含如下内部属性
public static final String TIME_SERIES_TREE_HEADER = "=== Timeseries Tree ===\n\n";
private static final String TAG_FORMAT = "tag key is %s, tag value is %s, tlog offset is %d";
private static final String DEBUG_MSG = "%s : TimeSeries %s is removed from tag inverted index, ";
private static final String DEBUG_MSG_1 =
"%s: TimeSeries %s's tag info has been removed from tag inverted index ";
private static final String PREVIOUS_CONDITION =
"before deleting it, tag key is %s, tag value is %s, tlog offset is %d, contains key %b";
private static final int UPDATE_SCHEMA_MAP_IN_ARRAYPOOL_THRESHOLD = 5000;
private static final Logger logger = LoggerFactory.getLogger(MManager.class);
// 用于生成自动快照的判断 MTree 是否修改的时间间隔阈值
private static final long MTREE_SNAPSHOT_THREAD_CHECK_TIME = 600L;
private final int mtreeSnapshotInterval;
private final long mtreeSnapshotThresholdTime;
private String logFilePath;
private String mtreeSnapshotPath;
private String mtreeSnapshotTmpPath;
// 元数据信息存储在MTree当中
private MTree mtree;
private MLogWriter logWriter;
private TagLogFile tagLogFile;
private boolean isRecovering;
// device -> DeviceMNode
private RandomDeleteCache<PartialPath, Pair<MNode, Template>> mNodeCache;
// tag key -> tag value -> LeafMNode
private Map<String, Map<String, Set<MeasurementMNode>>> tagIndex = new ConcurrentHashMap<>();
// data type -> number
private Map<TSDataType, Integer> schemaDataTypeNumMap = new ConcurrentHashMap<>();
private long reportedDataTypeTotalNum;
private AtomicLong totalSeriesNumber = new AtomicLong();
private boolean initialized;
protected static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private File logFile;
private ScheduledExecutorService timedCreateMTreeSnapshotThread;
private ScheduledExecutorService timedForceMLogThread;
// MTree 的大小阈值
private static final long MTREE_SIZE_THRESHOLD = config.getAllocateMemoryForSchema();
private boolean allowToCreateNewSeries = true;
private static final int ESTIMATED_SERIES_SIZE = config.getEstimatedSeriesSize();
// template name -> template
private Map<String, Template> templateMap = new ConcurrentHashMap<>();
基本操作
节点的基本操作通过MManager
的内部函数实现,其中多数操作主要通过调用MTree
对应方法实现。
在这些基本操作之中这里主要分析setStorageGroup
,createTimeseries
,deleteStorageGroups
,showTimeseriesWithoutIndex
:
- setStorageGroup
设置存储组主要通过MTree
下的setStorageGroup
接口来实现
void setStorageGroup(PartialPath path) throws MetadataException {
String[] nodeNames = path.getNodes();
// 检查路径是否满足格式
checkStorageGroup(path.getFullPath());
// 将当前节点设置为根节点
MNode cur = root;
if (nodeNames.length <= 1 || !nodeNames[0].equals(root.getName())) {
throw new IllegalPathException(path.getFullPath());
}
int i = 1;
// e.g., path = root.a.b.sg, 生成中间节点 (类型为 MNode)
while (i < nodeNames.length - 1) {
MNode temp = cur.getChild(nodeNames[i]);
// 查看中间节点是否存在,不存在则添加
if (temp == null) {
cur.addChild(nodeNames[i], new MNode(cur, nodeNames[i]));
} else if (temp instanceof StorageGroupMNode) {
// 如果中间节点是 StorageGroupMNode,抛出错误
throw new StorageGroupAlreadySetException(temp.getFullPath());
}
// 切换当前节点
cur = cur.getChild(nodeNames[i]);
i++;
}
if (cur.hasChild(nodeNames[i])) {
// 如果目标存储组节点存在,抛出错误
if (cur.getChild(nodeNames[i]) instanceof StorageGroupMNode) {
throw new StorageGroupAlreadySetException(path.getFullPath());
} else {
throw new StorageGroupAlreadySetException(path.getFullPath(), true);
}
} else {
// 如若不存在则生成存储组节点并添加到对应位置 (当前节点的子节点)
StorageGroupMNode storageGroupMNode =
new StorageGroupMNode(
cur, nodeNames[i], IoTDBDescriptor.getInstance().getConfig().getDefaultTTL());
cur.addChild(nodeNames[i], storageGroupMNode);
}
}
- createTimeseries
设置存储组主要通过MTree
下的createTimeseries
接口来实现
// MManager 中的 createTimeseries
public void createTimeseries(CreateTimeSeriesPlan plan, long offset) throws MetadataException {
// 检查还能否写入新的时间序列
if (!allowToCreateNewSeries) {
throw new MetadataException(
"IoTDB system load is too large to create timeseries, "
+ "please increase MAX_HEAP_SIZE in iotdb-env.sh/bat and restart");
}
try {
PartialPath path = plan.getPath();
SchemaUtils.checkDataTypeWithEncoding(plan.getDataType(), plan.getEncoding());
// 确保获取存储组路径
ensureStorageGroup(path);
TSDataType type = plan.getDataType();
// 利用 MTree 生成 timeseries
MeasurementMNode leafMNode =
mtree.createTimeseries(
path,
type,
plan.getEncoding(),
plan.getCompressor(),
plan.getProps(),
plan.getAlias());
// 更新 tag
if (plan.getTags() != null) {
// tag key, tag value
for (Entry<String, String> entry : plan.getTags().entrySet()) {
if (entry.getKey() == null || entry.getValue() == null) {
continue;
}
tagIndex
.computeIfAbsent(entry.getKey(), k -> new ConcurrentHashMap<>())
.computeIfAbsent(entry.getValue(), v -> new CopyOnWriteArraySet<>())
.add(leafMNode);
}
}
// 更新 statistics 和 schemaDataTypeNumMap
totalSeriesNumber.addAndGet(1);
if (totalSeriesNumber.get() * ESTIMATED_SERIES_SIZE >= MTREE_SIZE_THRESHOLD) {
logger.warn("Current series number {} is too large...", totalSeriesNumber);
allowToCreateNewSeries = false;
}
updateSchemaDataTypeNumMap(type, 1);
// 写 log
if (!isRecovering) {
// either tags or attributes is not empty
if ((plan.getTags() != null && !plan.getTags().isEmpty())
|| (plan.getAttributes() != null && !plan.getAttributes().isEmpty())) {
offset = tagLogFile.write(plan.getTags(), plan.getAttributes());
}
plan.setTagOffset(offset);
logWriter.createTimeseries(plan);
}
leafMNode.setOffset(offset);
} catch (IOException e) {
throw new MetadataException(e);
}
}
// MTree 下的 createTimeseries
MeasurementMNode createTimeseries(
PartialPath path,
TSDataType dataType,
TSEncoding encoding,
CompressionType compressor,
Map<String, String> props,
String alias)
throws MetadataException {
// 获取分级 path
String[] nodeNames = path.getNodes();
// 判断是否是合法的 Timeseries 路径 (path长度不小于3 (root -> [storage group] -> device -> Timeseries))
if (nodeNames.length <= 2 || !nodeNames[0].equals(root.getName())) {
throw new IllegalPathException(path.getFullPath());
}
// 判断是否是合法的 Timeseries 路径 (使用正则式判断)
checkTimeseries(path.getFullPath());
MNode cur = root;
boolean hasSetStorageGroup = false;
Template upperTemplate = cur.getDeviceTemplate();
// e.g, path = root.sg.d1.s1,构建中间节点,并让 cur 设置为 d1
// 从 root 后第一层节点开始构建路径 (measurement 之前)
for (int i = 1; i < nodeNames.length - 1; i++) {
String nodeName = nodeNames[i];
// cur 指向 storage group node 时
if (cur instanceof StorageGroupMNode) {
hasSetStorageGroup = true;
}
if (!cur.hasChild(nodeName)) {
if (!hasSetStorageGroup) {
throw new StorageGroupNotSetException("Storage group should be created first");
}
cur.addChild(nodeName, new MNode(cur, nodeName));
}
cur = cur.getChild(nodeName);
if (cur.getDeviceTemplate() != null) {
upperTemplate = cur.getDeviceTemplate();
}
}
if (upperTemplate != null && !upperTemplate.isCompatible(path)) {
throw new PathAlreadyExistException(
path.getFullPath() + " ( which is incompatible with template )");
}
if (props != null && props.containsKey(LOSS) && props.get(LOSS).equals(SDT)) {
checkSDTFormat(path.getFullPath(), props);
}
// 获取叶节点 (measurement 节点) 名称
String leafName = nodeNames[nodeNames.length - 1];
// 保持添加叶节点以及 alias 过程的原子性
// 将写部分设置为 synchronized (上锁)
synchronized (this) {
MNode child = cur.getChild(leafName);
if (child instanceof MeasurementMNode || child instanceof StorageGroupMNode) {
throw new PathAlreadyExistException(path.getFullPath());
}
if (alias != null) {
MNode childByAlias = cur.getChild(alias);
// 判断该设备节点下是否已经存在同名 alias
if (childByAlias instanceof MeasurementMNode) {
throw new AliasAlreadyExistException(path.getFullPath(), alias);
}
}
// 生成叶节点
MeasurementMNode measurementMNode =
new MeasurementMNode(cur, leafName, alias, dataType, encoding, compressor, props);
if (child != null) {
// 如若存在重名的叶节点,则覆盖掉
cur.replaceChild(measurementMNode.getName(), measurementMNode);
} else {
// 如若不存在,则直接添加该叶节点
cur.addChild(leafName, measurementMNode);
}
// 添加 alias
if (alias != null) {
cur.addAlias(alias, measurementMNode);
}
return measurementMNode;
}
// 解锁
}
- deleteStorageGroups
设置存储组主要通过MTree
下的deleteStorageGroups
接口来实现
// MManager 中的 deleteStorageGroups
public void deleteStorageGroups(List<PartialPath> storageGroups) throws MetadataException {
try {
// 遍历要删除的存储组
for (PartialPath storageGroup : storageGroups) {
totalSeriesNumber.addAndGet(mtree.getAllTimeseriesCount(storageGroup));
// 清除 MNode Cache
if (!allowToCreateNewSeries
&& totalSeriesNumber.get() * ESTIMATED_SERIES_SIZE < MTREE_SIZE_THRESHOLD) {
logger.info("Current series number {} come back to normal level", totalSeriesNumber);
allowToCreateNewSeries = true;
}
mNodeCache.clear();
// 删除存储组
List<MeasurementMNode> leafMNodes = mtree.deleteStorageGroup(storageGroup);
for (MeasurementMNode leafMNode : leafMNodes) {
removeFromTagInvertedIndex(leafMNode);
// 更新 statistics (in schemaDataTypeNumMap)
updateSchemaDataTypeNumMap(leafMNode.getSchema().getType(), -1);
}
// 删除 triggers
TriggerEngine.drop(leafMNodes);
if (!config.isEnableMemControl()) {
MemTableManager.getInstance().addOrDeleteStorageGroup(-1);
}
// 正常进行完成时,写 log
if (!isRecovering) {
logWriter.deleteStorageGroup(storageGroup);
}
}
} catch (IOException e) {
throw new MetadataException(e.getMessage());
}
}
// MTree 中的 deleteStorageGroups
List<MeasurementMNode> deleteStorageGroup(PartialPath path) throws MetadataException {
// 设置当前节点为目标节点
MNode cur = getNodeByPath(path);
// 当前节点并非存储组节点时,抛出错误
if (!(cur instanceof StorageGroupMNode)) {
throw new StorageGroupNotSetException(path.getFullPath());
}
// 假设正在一个包含 root.a.b.sg1, root.a.sg2 路径的元数据树下删除 root.a.b.sg1
// 删除 sg1
cur.getParent().deleteChild(cur.getName());
// 获取该存储组下所有叶节点
List<MeasurementMNode> leafMNodes = new LinkedList<>();
Queue<MNode> queue = new LinkedList<>();
queue.add(cur);
// 递归加载子节点,获取其下所有叶节点
while (!queue.isEmpty()) {
MNode node = queue.poll();
for (MNode child : node.getChildren().values()) {
if (child instanceof MeasurementMNode) {
leafMNodes.add((MeasurementMNode) child);
} else {
queue.add(child);
}
}
}
// 设置当前节点为目标节点父节点
cur = cur.getParent();
// 如果删除目标节点后,父节点为空,则删除父节点,并递归向上重复该过程
while (!IoTDBConstant.PATH_ROOT.equals(cur.getName()) && cur.getChildren().size() == 0) {
cur.getParent().deleteChild(cur.getName());
cur = cur.getParent();
}
return leafMNodes;
}
- showTimeseriesWithoutIndex
主要通过调用MTree
的对应接口getAllMeasurementSchemaByHeatOrder
或者getAllMeasurementSchema
实现,这里选取getAllMeasurementSchema
进行分析
// MManager 中的 showTimeseriesWithoutIndex
private List<ShowTimeSeriesResult> showTimeseriesWithoutIndex(
ShowTimeSeriesPlan plan, QueryContext context) throws MetadataException {
List<Pair<PartialPath, String[]>> ans;
// 对于热度顺序排序的情况
if (plan.isOrderByHeat()) {
ans = mtree.getAllMeasurementSchemaByHeatOrder(plan, context);
} else {
// 对于未使用热度顺序排序的情况
ans = mtree.getAllMeasurementSchema(plan);
}
// 初始化
List<ShowTimeSeriesResult> res = new LinkedList<>();
for (Pair<PartialPath, String[]> ansString : ans) {
long tagFileOffset = Long.parseLong(ansString.right[5]);
try {
Pair<Map<String, String>, Map<String, String>> tagAndAttributePair =
new Pair<>(Collections.emptyMap(), Collections.emptyMap());
if (tagFileOffset >= 0) {
tagAndAttributePair = tagLogFile.read(config.getTagAttributeTotalSize(), tagFileOffset);
}
// 添加查找结果
res.add(
new ShowTimeSeriesResult(
ansString.left.getFullPath(),
ansString.right[0],
ansString.right[1],
TSDataType.valueOf(ansString.right[2]),
TSEncoding.valueOf(ansString.right[3]),
CompressionType.valueOf(ansString.right[4]),
tagAndAttributePair.left,
tagAndAttributePair.right));
} catch (IOException e) {
throw new MetadataException(
"Something went wrong while deserialize tag info of " + ansString.left.getFullPath(),
e);
}
}
return res;
}
// MTree 中的 getAllMeasurementSchema
List<Pair<PartialPath, String[]>> getAllMeasurementSchema(
ShowTimeSeriesPlan plan, boolean removeCurrentOffset) throws MetadataException {
List<Pair<PartialPath, String[]>> res = new LinkedList<>();
// 获取分层节点名称
String[] nodes = plan.getPath().getNodes();
if (nodes.length == 0 || !nodes[0].equals(root.getName())) {
throw new IllegalPathException(plan.getPath().getFullPath());
}
// 设置 limit
limit.set(plan.getLimit());
//设置 offset
offset.set(plan.getOffset());
curOffset.set(-1);
count.set(0);
// 查询目标时间序列
findPath(root, nodes, 1, res, offset.get() != 0 || limit.get() != 0, false, null, null);
// 避免内存泄漏
limit.remove();
offset.remove();
if (removeCurrentOffset) {
curOffset.remove();
}
count.remove();
return res;
}
// 其中的 findPath 部分
private void findPath(
MNode node,
String[] nodes,
int idx,
List<Pair<PartialPath, String[]>> timeseriesSchemaList,
boolean hasLimit,
boolean needLast,
QueryContext queryContext,
Template upperTemplate)
throws MetadataException {
if (node instanceof MeasurementMNode && nodes.length <= idx) {
if (hasLimit) {
curOffset.set(curOffset.get() + 1);
if (curOffset.get() < offset.get() || count.get().intValue() == limit.get().intValue()) {
return;
}
}
IMeasurementSchema measurementSchema = ((MeasurementMNode) node).getSchema();
if (measurementSchema instanceof MeasurementSchema) {
// 查询完叶节点时
addMeasurementSchema(
node, timeseriesSchemaList, needLast, queryContext, measurementSchema, "*");
} else if (measurementSchema instanceof VectorMeasurementSchema) {
String lastWord = nodes[nodes.length - 1];
addVectorMeasurementSchema(
node,
timeseriesSchemaList,
needLast,
queryContext,
measurementSchema,
nodes.length == idx ? lastWord : "*");
}
if (hasLimit) {
count.set(count.get() + 1);
}
}
// 获取下一层节点名称
String nodeReg = MetaUtils.getNodeRegByIdx(idx, nodes);
if (node.getDeviceTemplate() != null) {
upperTemplate = node.getDeviceTemplate();
}
// 判断是否使用通配符*;或者查询完叶节点
if (!nodeReg.contains(PATH_WILDCARD)) {
MNode next = null;
// 判断是否是并列表示的节点
if (nodeReg.contains("(") && nodeReg.contains(",")) {
next = node.getChildOfAlignedTimeseries(nodeReg);
} else {
// 获取下一层节点
next = node.getChild(nodeReg);
}
if (next != null) {
// 下一层节点非空时递归查询
findPath(
next,
nodes,
idx + 1,
timeseriesSchemaList,
hasLimit,
needLast,
queryContext,
upperTemplate);
}
} else {
// 对于存在通配符时遍历该节点所有子节点
for (MNode child : node.getDistinctMNodes()) {
boolean continueSearch = false;
if (child instanceof MeasurementMNode
&& ((MeasurementMNode) child).getSchema() instanceof VectorMeasurementSchema) {
// 对于 VectorMeasurementSchema 的 MeasurementMNode 的情况
List<String> measurementsList =
((MeasurementMNode) child).getSchema().getValueMeasurementIdList();
for (String measurement : measurementsList) {
// 判断是否查到尽头
if (Pattern.matches(nodeReg.replace("*", ".*"), measurement)) {
continueSearch = true;
}
}
} else {
// 判断是否查到尽头
if (Pattern.matches(nodeReg.replace("*", ".*"), child.getName())) {
continueSearch = true;
}
}
if (!continueSearch) {
continue;
}
// 递归查询
findPath(
child,
nodes,
idx + 1,
timeseriesSchemaList,
hasLimit,
needLast,
queryContext,
upperTemplate);
if (hasLimit && count.get().intValue() == limit.get().intValue()) {
break;
}
}
}
// 使用 template 时的部分
if (!(node instanceof MeasurementMNode) && node.isUseTemplate()) {
if (upperTemplate != null) {
HashSet<IMeasurementSchema> set = new HashSet<>();
for (IMeasurementSchema schema : upperTemplate.getSchemaMap().values()) {
if (set.add(schema)) {
if (schema instanceof MeasurementSchema) {
addMeasurementSchema(
new MeasurementMNode(node, schema.getMeasurementId(), schema, null),
timeseriesSchemaList,
needLast,
queryContext,
schema,
nodeReg);
} else if (schema instanceof VectorMeasurementSchema) {
String firstNode = schema.getValueMeasurementIdList().get(0);
addVectorMeasurementSchema(
new MeasurementMNode(node, firstNode, schema, null),
timeseriesSchemaList,
needLast,
queryContext,
schema,
nodeReg);
}
}
}
}
}
}
Log 文件
元数据日志管理
所有元数据的操作均会记录到元数据日志文件中,此文件默认为 data/system/schema/mlog.bin。
系统重启时会重做 mlog 中的日志,重做之前需要标记不需要记录日志。当重启结束后,标记需要记录日志。
元数据日志的类型由 MetadataOperationType 类记录。mlog 直接存储字符串编码。
源码分析
- 一些 sql 对应的 mlog 记录
set storage group to root.turbine --> 2,root.turbine
delete storage group root.turbine --> 1,root.turbine
create timeseries root.turbine.d1.s1(temprature) with datatype=FLOAT, encoding=RLE, compression=SNAPPY tags(tag1=v1, tag2=v2) attributes(attr1=v1, attr2=v2) --> 0,root.turbine.d1.s1,3,2,1,,温度,offset
标签文件
所有时间序列的标签/属性信息都会保存在标签文件中,此文件默认为 data/system/schema/tlog.txt。
每条时间序列的
tags
和attributes
持久化后总字节数为 L,在 iotdb-engine.properties 中配置。持久化内容:
Map<String,String> tags
,Map<String,String> attributes
,如果内容不足 L,则需补空。
源码分析
六种针对时间序列的标签与属性的操作则直接作为MManager
的内置函数出现:renameTagOrAttributeKey
,setTagsOrAttributesValue
,dropTagsOrAttributes
,addTags
,addAttributes
,upsertTagsAndAttributes
。
在这些基本操作之中这里主要分析addAttributes
public void addAttributes(Map<String, String> attributesMap, PartialPath fullPath)
throws MetadataException, IOException {
// 获取目标节点
MNode mNode = mtree.getNodeByPath(fullPath);
// 如若目标节点非是 MeasurementMNode,抛出异常
if (!(mNode instanceof MeasurementMNode)) {
throw new PathNotExistException(fullPath.getFullPath());
}
MeasurementMNode leafMNode = (MeasurementMNode) mNode;
// 没有 tag 或者attribute 的情况,需要在 tlog 中添加记录
if (leafMNode.getOffset() < 0) {
long offset = tagLogFile.write(Collections.emptyMap(), attributesMap);
logWriter.changeOffset(fullPath, offset);
leafMNode.setOffset(offset);
return;
}
Pair<Map<String, String>, Map<String, String>> pair =
tagLogFile.read(config.getTagAttributeTotalSize(), leafMNode.getOffset());
// 遍历需要添加的 attribute
for (Entry<String, String> entry : attributesMap.entrySet()) {
// 获取键
String key = entry.getKey();
// 获取值
String value = entry.getValue();
// 目标 attribute 存在时,抛出异常
if (pair.right.containsKey(key)) {
throw new MetadataException(
String.format("TimeSeries [%s] already has the attribute [%s].", fullPath, key));
}
// 不存在时,进行添加
pair.right.put(key, value);
}
// 持久化
tagLogFile.write(pair.left, pair.right, leafMNode.getOffset());
}
MTree 检查点
为了加快 IoTDB 重启速度,IoTDB为MTree
设置了检查点,这样避免了在重启时按行读取并复现 mlog.bin 中的信息。创建MTree
的快照有两种方式:
自动创建。每隔10分钟,后台线程检查
MTree
的最后修改时间,需要同时满足:用户超过1小时(可配置)没修改MTree
且 mlog.bin 中积累了100000行日志(可配置)两个条件;手动创建。使用
create snapshot for schema
命令手动触发创建 MTree 快照。
创建过程
- 首先给
MTree
加读锁,防止创建快照过程中对其进行修改- 将
MTree
序列化进临时 snapshot 文件(mtree.snapshot.tmp)。MTree
的序列化采用“先子节点、后父节点”的深度优先序列化方式,将节点的信息按照类型转化成对应格式的字符串,便于反序列化时读取和组装MTree
。
其中,字符串转化格式如下:
- 普通节点:0,名字,子节点个数
- 存储组节点:1,名字,TTL,子节点个数
- 传感器节点:2,名字,别名,数据类型,编码,压缩方式,属性,偏移量,子节点个数
- 序列化结束后,将临时文件重命名为正式文件(mtree.snapshot),防止在序列化过程中出现服务器人为或意外关闭,导致序列化失败的情况。
- 调用
MLogWriter.clear()
方法,清空 mlog.bin- 释放
MTree
读锁
源码分析
public void createMTreeSnapshot() {
long time = System.currentTimeMillis();
logger.info("Start creating MTree snapshot to {}", mtreeSnapshotPath);
try {
// 将 MTree 序列化进临时 snapshot 文件 mtreeSnapshotTmpPath
mtree.serializeTo(mtreeSnapshotTmpPath);
// 获取对应临时快照与快照文件
File tmpFile = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotTmpPath);
File snapshotFile = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotPath);
// 如果路径下存在快照文件则先删除
if (snapshotFile.exists()) {
Files.delete(snapshotFile.toPath());
}
// 将临时文件重命名为正式文件
if (tmpFile.renameTo(snapshotFile)) {
logger.info(
"Finish creating MTree snapshot to {}, spend {} ms.",
mtreeSnapshotPath,
System.currentTimeMillis() - time);
}
// 调用 clear() 方法,清空 mlog.bin
logWriter.clear();
} catch (IOException e) {
logger.warn("Failed to create MTree snapshot to {}", mtreeSnapshotPath, e);
if (SystemFileFactory.INSTANCE.getFile(mtreeSnapshotTmpPath).exists()) {
try {
Files.delete(SystemFileFactory.INSTANCE.getFile(mtreeSnapshotTmpPath).toPath());
} catch (IOException e1) {
logger.warn("delete file {} failed: {}", mtreeSnapshotTmpPath, e1.getMessage());
}
}
}
}
// 其中序列化的时候从子节点由下往上序列化,按照深度优先顺序
public void serializeTo(MLogWriter logWriter) throws IOException {
serializeChildren(logWriter);
logWriter.serializeStorageGroupMNode(this);
}
public void serializeTo(MLogWriter logWriter) throws IOException {
serializeChildren(logWriter);
logWriter.serializeMNode(this);
}
void serializeChildren(MLogWriter logWriter) throws IOException {
if (children == null) {
return;
}
for (Entry<String, MNode> entry : children.entrySet()) {
entry.getValue().serializeTo(logWriter);
}
}
恢复过程
- 检查临时文件 mtree.snapshot.tmp 是否存在,如果存在证明在创建快照的序列化过程中出现服务器人为或意外关闭,导致序列化失败,删除临时文件;
- 检查快照文件 mtree.snapshot 是否存在。如果不存在,则使用新的
MTree
;否则启动反序列化过程,得到MTree
- 对于 mlog.bin 中的内容,逐行读取并操作,完成
MTree
的恢复。读取过程中更新logNumber
,并返回,用于后面mlog.bin行数的记录。
源码分析
加载快照并通过反序列化得到MTree
后读取 mlog.bin 时,通过 log 中的记录生成 redo 的plan
,然后通过调用operation
来进行操作的执行:
public void operation(PhysicalPlan plan) throws IOException, MetadataException {
switch (plan.getOperatorType()) {
case CREATE_TIMESERIES:
CreateTimeSeriesPlan createTimeSeriesPlan = (CreateTimeSeriesPlan) plan;
createTimeseries(createTimeSeriesPlan, createTimeSeriesPlan.getTagOffset());
break;
case CREATE_ALIGNED_TIMESERIES:
CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan =
(CreateAlignedTimeSeriesPlan) plan;
createAlignedTimeSeries(createAlignedTimeSeriesPlan);
break;
case DELETE_TIMESERIES:
DeleteTimeSeriesPlan deleteTimeSeriesPlan = (DeleteTimeSeriesPlan) plan;
// cause we only has one path for one DeleteTimeSeriesPlan
deleteTimeseries(deleteTimeSeriesPlan.getPaths().get(0));
break;
case SET_STORAGE_GROUP:
SetStorageGroupPlan setStorageGroupPlan = (SetStorageGroupPlan) plan;
setStorageGroup(setStorageGroupPlan.getPath());
break;
case DELETE_STORAGE_GROUP:
DeleteStorageGroupPlan deleteStorageGroupPlan = (DeleteStorageGroupPlan) plan;
deleteStorageGroups(deleteStorageGroupPlan.getPaths());
break;
case TTL:
SetTTLPlan setTTLPlan = (SetTTLPlan) plan;
setTTL(setTTLPlan.getStorageGroup(), setTTLPlan.getDataTTL());
break;
case CHANGE_ALIAS:
ChangeAliasPlan changeAliasPlan = (ChangeAliasPlan) plan;
changeAlias(changeAliasPlan.getPath(), changeAliasPlan.getAlias());
break;
case CHANGE_TAG_OFFSET:
ChangeTagOffsetPlan changeTagOffsetPlan = (ChangeTagOffsetPlan) plan;
changeOffset(changeTagOffsetPlan.getPath(), changeTagOffsetPlan.getOffset());
break;
case CREATE_TEMPLATE:
CreateTemplatePlan createTemplatePlan = (CreateTemplatePlan) plan;
createDeviceTemplate(createTemplatePlan);
break;
case SET_DEVICE_TEMPLATE:
SetDeviceTemplatePlan setDeviceTemplatePlan = (SetDeviceTemplatePlan) plan;
setDeviceTemplate(setDeviceTemplatePlan);
break;
case SET_USING_DEVICE_TEMPLATE:
SetUsingDeviceTemplatePlan setUsingDeviceTemplatePlan = (SetUsingDeviceTemplatePlan) plan;
setUsingDeviceTemplate(setUsingDeviceTemplatePlan);
break;
case AUTO_CREATE_DEVICE_MNODE:
AutoCreateDeviceMNodePlan autoCreateDeviceMNodePlan = (AutoCreateDeviceMNodePlan) plan;
autoCreateDeviceMNode(autoCreateDeviceMNodePlan);
break;
default:
logger.error("Unrecognizable command {}", plan.getOperatorType());
}
}
具体恢复过程如下:
private int initFromLog(File logFile) throws IOException {
File tmpFile = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotTmpPath);
// 检查临时文件 mtree.snapshot.tmp 是否存在
if (tmpFile.exists()) {
logger.warn("Creating MTree snapshot not successful before crashing...");
// 如果存在证明在创建快照的序列化过程中出现服务器人为或意外关闭,导致序列化失败,删除临时文件
Files.delete(tmpFile.toPath());
}
// 检查快照文件 mtree.snapshot 是否存在
File mtreeSnapshot = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotPath);
long time = System.currentTimeMillis();
// 判断快照是否存在
if (!mtreeSnapshot.exists()) {
// 快照不存在的时候使用新的 MTree
mtree = new MTree();
} else {
// 快照存在的时候,反序列话快照,然后利用其生成 MTree
mtree = MTree.deserializeFrom(mtreeSnapshot);
logger.debug(
"spend {} ms to deserialize mtree from snapshot", System.currentTimeMillis() - time);
}
time = System.currentTimeMillis();
// 判断 log 是否存在,存在时通过 log 恢复元数据
if (logFile.exists()) {
int idx = 0;
try (MLogReader mLogReader =
new MLogReader(config.getSchemaDir(), MetadataConstant.METADATA_LOG); ) {
idx = applyMlog(mLogReader);
logger.debug(
"spend {} ms to deserialize mtree from mlog.bin", System.currentTimeMillis() - time);
return idx;
} catch (Exception e) {
throw new IOException("Failed to parser mlog.bin for err:" + e);
}
} else {
return 0;
}
}
private int applyMlog(MLogReader mLogReader) {
int idx = 0;
// 不断按顺序读取 log 中的记录
while (mLogReader.hasNext()) {
PhysicalPlan plan = null;
try {
// 根据 log 中的记录生成plan
plan = mLogReader.next();
if (plan == null) {
continue;
}
// 执行操作
operation(plan);
idx++;
} catch (Exception e) {
logger.error(
"Can not operate cmd {} for err:", plan == null ? "" : plan.getOperatorType(), e);
}
}
return idx;
}
-
物联网时序数据库 Apache IoTDB,详细信息可以在https://iotdb.apache.org/中找到。 ↩
-
https://iotdb.apache.org/zh/SystemDesign/SchemaManager/SchemaManager.html ↩