从一个小需求开始---从LSM到influxdb源码解读(1)

需求来了

今年上半年做了一个需求,需求目的是获取实时的广告曝光在各个时刻各个类目的占比,发现曝光不足的类目,针对对应类目商家进行引导广告投放。团队当时用的存储介质只有mysql,es,mongodb和公司自研的搜索引擎,都不太适合放大量的时序数据,最后比较了一堆开源框架,从使用和维护成本出发考虑决定还是用了influxdb,(使用感受,啥都好,唯一缺点:分布式要钱,我们也么得钱, 穷是原罪o(╥﹏╥)o),数据不是T0级的数据,不需要长时间的存储,所以单机也就搞定了。

投入上线以后效果还是挺好的,实时性很强,写入也没有延迟什么的,除了前期偶尔出现了一两次influxdb进程突然挂掉以外(应该是我写入姿势太暴力了),后面基本不需要维护了。最后数据的界面大致如下:

后续交给运营大大push

原理了解一发

时序数据库算是比较小众的数据库了,除非是做监控什么的,感觉在业务开发上很少用到,influxdb的优点都可以百度到,不赘述了。之前大部分接触的mysql、es的原理也无非B+树,Trie树 ,读了一些简介以后发现influxdb实现的原理是lsm-tree,就研究了一下这种数据结构。

lsm-tree

全称 Log Structured Merge Tree,理解下字面意思,日志结构的合并树,很明显了,日志结构嘛,就是有顺序的,一条条往后面怼的,这是个粗略的理解。顺序的读比B+树这种随机读肯定是要快很多了。关于lsm的原理,网络上有很多文章,摘选一个可以作为基础阅读了解一下 lsm概述

来自官网论文

看着这个图也可以发现几个特点了,树的数据在内存和磁盘中都有,读源码可以发现,influxdb文件级别的树总共有4层。

针对这一块的原理,可以直接开始看influxdb的文件结构。influxdb有两个核心的文件目录,data和wal,wal不赘述了,预写日志,很多数据库都有这个,data下面存放的是实际的数据,截了一个实际的表里的数据:

实际的文件

可以发现tsm的一些文件,其实就是lsm,前面是一个序列号,后面的是实际的lsm-tree的层级。根据更新时间就可以发现层级较大的文件合并的时间间隔较长,04是八小时一次,02是两小时一次,刚刚好四个小时的02 level的可以聚合成一个04 level的文件;文件大小也是随层级增大而增大的,但是为什么01和02的差距那么小?可能和压缩的比例有关?

看到这里感觉lsm和一个数据结构很相似 -- skiplist,跳跃表也是一种提升查询效率的数据结构,数据也是按照分层级按照一定顺序存储的。只不过lsm里面是一个树形结构不是完全的链表形式。
因为主要被分为内存和磁盘两块代码,先从内存级入手捋一捋。

C0 tree -- cache.go

先来看看C0级别的源码: tsdb/tsm1/cache.go
cache设计的初衷是为了让写在wal的数据也能够被查询到,因为可能还没有被写入文件。

// Cache maintains an in-memory store of Values for a set of keys.
type Cache struct {
    mu      sync.RWMutex
    store   *ring
    maxSize uint64

    // snapshots are the cache objects that are currently being written to tsm files
    // they're kept in memory while flushing so they can be queried along with the cache.
    // they are read only and should never be modified
    snapshot     *Cache
    snapshotting bool
    //监控数据忽略
    tracker       *cacheTracker
    lastSnapshot  time.Time
    lastWriteTime time.Time
}

其中有一个snapshot,注释写的很清楚,内存中的数据可以被查询但是不会被修改,因为准备被写入tsm文件了,这个只是为了提升查询的速度,减少磁盘的读取,如果能在内存中读到就不会去磁盘进行读取了。store是实际存储数据的结构ring,里面包含了多个分区,每个分区会根据你的key -- 存储的索引hash出来的值,来进行存储。所以实际cache的写入就是按照索引的值分区存入内存。cacheTracker及后续的两个时间都是监控所用,暂时忽略。

type ring struct {
    // Number of keys within the ring. This is used to provide a hint for
    // allocating the return values in keys(). It will not be perfectly accurate
    // since it doesn't consider adding duplicate keys, or trying to remove non-
    // existent keys.
    keysHint int64

    // 16个分区
    partitions [numPartitions]*partition
}


// partition provides safe access to a map of series keys to entries.
type partition struct {
    mu    sync.RWMutex
    store map[string]*entry
}

// entry is a set of values and some metadata.
//string是索引的key 跟踪entry
type entry struct {
    mu     sync.RWMutex
    values Values // All stored values.

    // 数据类型
    vtype byte
}

//values的枚举
type (
    Value         = value.Value
    IntegerValue  = value.IntegerValue
    UnsignedValue = value.UnsignedValue
    FloatValue    = value.FloatValue
    BooleanValue  = value.BooleanValue
    StringValue   = value.StringValue
)

// Value represents a TSM-encoded value. 这行注释很明显了,tsm数据的结构,
//里面包含时间戳,实际值
type Value interface {
    // UnixNano returns the timestamp of the value in nanoseconds since unix epoch.
    UnixNano() int64

    // Value returns the underlying value.
    Value() interface{}

    // Size returns the number of bytes necessary to represent the value and its timestamp.
    Size() int

    // String returns the string representation of the value and its timestamp.
    String() string

    // internalOnly is unexported to ensure implementations of Value
    // can only originate in this package.
    internalOnly()
}

上面的源代码中values是一个数组,数组的顺序是按时间顺序直接往后加的,连搜索特定时间点的数据的search(v int64)这个方法是一个根据时间戳的二分查找,那么问题来了,用过influxdb的api你会发现数据时间戳你是可以指定的,如果小的时间戳的数据被后写入了,如何保证最后写入文件的数据是按时间戳排序呢?(不按时间排序你还叫啥时序数据库2333),从Values数组的代码里就有了答案,在merge过程中会删除重复的时间戳并且判断有时间戳乱序后重新排序,这样的话即使写入内存的时候时间戳是乱序的,最终写入文件的时候依旧是按时间戳排序的。核心代码如下:

//去重的函数
func (a Values) Deduplicate() Values {
    if len(a) <= 1 {
        return a
    }

    // See if we're already sorted and deduped
    var needSort bool
    //先遍历看是否乱序或有重复值,是则进行排序
    for i := 1; i < len(a); i++ {
        if a[i-1].UnixNano() >= a[i].UnixNano() {
            needSort = true
            break
        }
    }

    if !needSort {
        return a
    }
   //排序
    sort.Stable(a)
   //去重
    var i int
    for j := 1; j < len(a); j++ {
        v := a[j]
        if v.UnixNano() != a[i].UnixNano() {
            i++
        }
        a[i] = v

    }
    return a[:i+1]
}

下面这个图是从另外一个博客中取的,画得比较好不重复画了:

image

其他的一些核心方法注意一下的就是数据的批量写入,WriteMulti的操作不是原子的,超出内存部分会抛异常但是可以写入的部分会成功,本质就是Write操作写了个for循环……

再来看快照方法,snapshot的内容最终会被写入tsm文件:

// Snapshot takes a snapshot of the current cache, adds it to the slice of caches that
// are being flushed, and resets the c[图片上传中...(image.png-c14626-1565837031888-0)]
urrent cache with new values.
func (c *Cache) Snapshot() (*Cache, error) {
    c.mu.Lock()
    defer c.mu.Unlock()

    if c.snapshotting {
        return nil, ErrSnapshotInProgress
    }

    c.snapshotting = true
    c.tracker.IncSnapshotsActive() // increment the number of times we tried to do this

    // If no snapshot exists, create a new one, otherwise update the existing snapshot
    if c.snapshot == nil {
        c.snapshot = &Cache{
            store:   newRing(),
            tracker: newCacheTracker(c.tracker.metrics, c.tracker.labels),
        }
    }

    // Did a prior snapshot exist that failed?  If so, return the existing
    // snapshot to retry.
    if c.snapshot.Size() > 0 {
        return c.snapshot, nil
    }
    //这是个语法糖 这一段就是在snapshot为空时把当前的cache的ring存到了snapshot中,把c.store置为空,
    //这里操作是不按顺序执行的 ,是类似java的交换变量,会使用一个中间临时变量的
    c.snapshot.store, c.store = c.store, c.snapshot.store
    snapshotSize := c.Size()

    // Reset the cache's store.
    c.store.reset()
    //省略监控代码
    return c.snapshot, nil
}

这一段的逻辑其实就是lsm中把memtable变成immutable memtable的一段实现。
到此l0核心部分的代码分析完毕。

总结一下

总体来看内存里的逻辑并不复杂,主要核心是要理清楚ring的层级结构。下一篇讲一下C1 tree的磁盘文件的合并策略。点我前往下半场

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 1. 简述 时间序列数据:从定义上来说,就是一串按时间维度索引的数据。 时序数据库(TSDB)特点:持续高并发写入...
    楚_kw阅读 113,479评论 4 26
  • Influxdb中的Compaction操作 Compaction概述 Influxdb的存储引擎使用了TSM文件...
    扫帚的影子阅读 5,201评论 0 3
  • 背景 2017年时序数据库忽然火了起来。开年2月Facebook开源了beringei时序数据库;到了4月基于Po...
    jiangmo阅读 54,631评论 1 26
  • ORA-00001: 违反唯一约束条件 (.) 错误说明:当在唯一索引所对应的列上键入重复值时,会触发此异常。 O...
    我想起个好名字阅读 11,023评论 0 9
  • 求知 李明 千字文书八个点, 绞尽脑汁翻白眼。 问君何有诸事顺, 慢就是快乃真传。 幸有名师予指点, 前路漫漫少磕绊。
    李唐一帆阅读 1,477评论 0 1

友情链接更多精彩内容