继续接续前一次的分析,地址:点这里
C1 tree -- tsm/tsm1/engine.go
engine就是实际的引擎类了,真正的聚合文件操作在compact(wg *sync.WaitGroup)这个方法中,根据是否是全量和合并的层级调用compact.go中的实际聚合方法.
C1 tree -- tsm/tsm1/compact.go
再来是聚合的类。
我个人理解,时序数据库因为其应用业务的特点:写入量通常大,读的时候一般按时间顺序取一段时间的数据,所以在Cache层是可以有乱序的,但是在文件这一层为了读的效率和准确性数据必须是有序的。这个聚合的过程除了将小文件聚合为大文件以外,还会将墓碑数据移除,墓碑数据就是在内存中写入但是后续被删除的数据。
先来看一下聚合定义的操作:
// CompactionPlanner determines what TSM files and WAL segments to include in a
// given compaction run. 这个类里的方法只负责找出哪些是策略对应需要合并的文件
type CompactionPlanner interface {
//全量触发所有level的文件合并
Plan(lastWrite time.Time) []CompactionGroup
//触发特定level的文件合并
PlanLevel(level int) []CompactionGroup
//触发level4的文件合并
PlanOptimize() []CompactionGroup
Release(group []CompactionGroup)
//判断是否文件都已经全部整合完毕了 实际是条件是层次<=1并且文件中没有墓碑,统计时会跳过正在合并的文件
FullyCompacted() bool
//修改forcefull布尔变量 下一次Plan()会强制全量
ForceFull()
SetFileStore(fs *FileStore)
}
// tsmGeneration represents the TSM files within a generation.
// 000001-01.tsm, 000001-02.tsm would be in the same generation
// 000001 each with different sequence numbers.
// 这里的官方注释很清楚不多说了
type tsmGeneration struct {
id int
files []FileStat
parseFileName ParseFileNameFunc
}
func (l compactionLevel) String() string {
switch l {
case 0:
return "snapshot"
case 1, 2, 3:
return fmt.Sprint(int(l))
case 4:
//叫优化的原因大概是这是最后一层了
return "optimize"
case 5:
return "full"
default:
panic("unsupported compaction level")
}
}
文件的读取
先理解一下文件的结构。
源码的tsm1/design.md里可以看到tsm实际的文件结构,我去掉了前面一些相对不重要的结构只看最核心的:
┌────────┬────────────────────────────────────┬─────────────┬──────────────┐
│ Header │ Blocks │ Index │ Footer │
│5 bytes │ N bytes │ N bytes │ 4 bytes │
└────────┴────────────────────────────────────┴─────────────┴──────────────┘
┌────────────────────────────────────────────────────────────────────────────┐
│ Index │
├─────────┬─────────┬──────┬───────┬─────────┬─────────┬────────┬────────┬───┤
│ Key Len │ Key │ Type │ Count │Min Time │Max Time │ Offset │ Size │...│
│ 2 bytes │ N bytes │1 byte│2 bytes│ 8 bytes │ 8 bytes │8 bytes │4 bytes │ │
└─────────┴─────────┴──────┴───────┴─────────┴─────────┴────────┴────────┴───┘
可以看到最后合并整理完的有序的index文件,文件头里都包含了这一块文件对应的时间段。
而在读取时我们肯定不会一个个文件地遍历过去查找,这样速度太慢了而且耗时是未知的,读文件这里又用到了mmap的技术,底层的原理比较深奥,就看一下大致读取的流程吧。
// 这是tsmreader根据时间戳和tags找实际数据的方法
func (t *TSMReader) Read(key []byte, timestamp int64) ([]Value, error) {
t.mu.RLock()
v, err := t.accessor.read(key, timestamp)
t.mu.RUnlock()
return v, err
}
func (m *mmapAccessor) read(key []byte, timestamp int64) ([]Value, error) {
//先判断是否在某一个文件块中
entry := m.index.Entry(key, timestamp)
if entry == nil {
return nil, nil
}
//这里实际返回的是timestamp所在文件对应entry的所有数据而不是一个时间点的数据
return m.readBlock(entry, nil)
}
//entries是内存对文件索引的一个映射
func (d *indirectIndex) Entry(key []byte, timestamp int64) *IndexEntry {
entries, err := d.ReadEntries(key, nil)
if err != nil {
d.logger.Error("error reading tsm index key", zap.String("key", fmt.Sprintf("%q", key)))
return nil
}
for _, entry := range entries {
//这个contains就是判读时间戳是不是大于entries当中的最小时间戳且小于最大时间戳
if entry.Contains(timestamp) {
return &entry
}
}
return nil
}
和内存一样,文件也有墓碑的概念,只是是单独一个文件tombstone,结构和正常的文件类似,不赘述了。删除数据的api实际上就是往这个文件里写数据。
influxdb的几个小技巧
文件索引怎么压缩
表名,索引,值
cpu,host=server1 value=1
cpu,host=server2 value=2
memory,host=server1 value=3
这一段数据你会怎么压缩?
.
.
.
influxdb做了一些映射,利用一个编码字典完成了压缩
tags是一个map
先把表名映射为cpu = 1, memory = 2
索引字段映射为host = 1
索引枚举映射为server1 = 1, server2 = 2
实际值存储的字段映射为value = 1
最终索引就映射为一个数组了
cpu,host=server1 value=1 --> 1,1,1,1
cpu,host=server2 value=2 --> 1,1,2,1
memory,host=server1 value=3 --> 3,1,2,1
基本上influxdb核心的数据处理过程都在这两篇随笔中了,大家可以参照着看看源码,希望大家都有收获。