influxdb写数据流程

收到http请求:
handler.go serveWriteV1
points_writers.go WritePointsPrivilegedWithContext
遍历涉及的shard,每个起一个go func去写,这里是在本地节点上写

    for shardID, points := range shardMappings.Points {
        go func(ctx context.Context, shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) {
            var numPoints, numValues int64
            ctx = context.WithValue(ctx, tsdb.StatPointsWritten, &numPoints)
            ctx = context.WithValue(ctx, tsdb.StatValuesWritten, &numValues)
            err := w.writeToShardWithContext(ctx, shard, database, retentionPolicy, consistencyLevel, points)

本地shard写入
store.go WriteToShardWithContext
engine.go WritePointsWithContext
先写内存cache,再写WAL

    // first try to write to the cache
    if err := e.Cache.WriteMulti(values); err != nil {
        return err
    }

    if e.WALEnabled {
        if _, err := e.WAL.WriteMulti(values); err != nil {
            return err
        }
    }

内存cache的结构是hash环

type ring struct {
    // Number of keys within the ring. This is used to provide a hint for
    // allocating the return values in keys(). It will not be perfectly accurate
    // since it doesn't consider adding duplicate keys, or trying to remove non-
    // existent keys.
    keysHint int64

    // The unique set of partitions in the ring.
    // len(partitions) <= len(continuum)
    partitions []*partition
}

本地节点写完后,还需要往其他节点写
err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points)
shard_writer.go WriteShard
把数据写到其他数据节点,调用PRC

    // Write request.
    conn.SetWriteDeadline(time.Now().Add(w.timeout))
    if err := rpc.WriteTLV(conn, rpc.WriteShardRequestMessage, buf); err != nil {
        conn.MarkUnusable()
        return err
    }

如果写失败了,会放到hinthandoff里重试

if err != nil && isRetryable(err) {
                // The remote write failed so queue it via hinted handoff
                atomic.AddInt64(&w.stats.PointWriteReqHH, int64(len(points)))
                hherr := w.HintedHandoff.WriteShard(shardID, owner.NodeID, points)

其他数据节点收到RPC消息后, service.go processWriteShardRequest
也是调用store.go WriteToShardWithContext,后面的处理和本地节点写入一样

image.png

写数据时只是写了缓存和WAL文件,并未写磁盘,那么什么时候写磁盘呢
答案是它会定时检查缓存大小或时间超过阈值就会写磁盘

func (e *Engine) ShouldCompactCache(t time.Time) bool {
    sz := e.Cache.Size()

    if sz == 0 {
        return false
    }

    if sz > e.CacheFlushMemorySizeThreshold {
        return true
    }

    return t.Sub(e.Cache.LastWriteTime()) > e.CacheFlushWriteColdDuration
}

cache写入磁盘:engine.go WriteSnapshot
调用
compact.go WriteSnapshot
compact.go write
WriteBlock WriteIndex 实现在writer.go tsmWriter

临时文件 ".influxdb\data1\large\autogen\64\000000006-000000001.tsm.tmp"

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容