收到http请求:
handler.go serveWriteV1
points_writers.go WritePointsPrivilegedWithContext
遍历涉及的shard,每个起一个go func去写,这里是在本地节点上写
for shardID, points := range shardMappings.Points {
go func(ctx context.Context, shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) {
var numPoints, numValues int64
ctx = context.WithValue(ctx, tsdb.StatPointsWritten, &numPoints)
ctx = context.WithValue(ctx, tsdb.StatValuesWritten, &numValues)
err := w.writeToShardWithContext(ctx, shard, database, retentionPolicy, consistencyLevel, points)
本地shard写入
store.go WriteToShardWithContext
engine.go WritePointsWithContext
先写内存cache,再写WAL
// first try to write to the cache
if err := e.Cache.WriteMulti(values); err != nil {
return err
}
if e.WALEnabled {
if _, err := e.WAL.WriteMulti(values); err != nil {
return err
}
}
内存cache的结构是hash环
type ring struct {
// Number of keys within the ring. This is used to provide a hint for
// allocating the return values in keys(). It will not be perfectly accurate
// since it doesn't consider adding duplicate keys, or trying to remove non-
// existent keys.
keysHint int64
// The unique set of partitions in the ring.
// len(partitions) <= len(continuum)
partitions []*partition
}
本地节点写完后,还需要往其他节点写
err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points)
shard_writer.go WriteShard
把数据写到其他数据节点,调用PRC
// Write request.
conn.SetWriteDeadline(time.Now().Add(w.timeout))
if err := rpc.WriteTLV(conn, rpc.WriteShardRequestMessage, buf); err != nil {
conn.MarkUnusable()
return err
}
如果写失败了,会放到hinthandoff里重试
if err != nil && isRetryable(err) {
// The remote write failed so queue it via hinted handoff
atomic.AddInt64(&w.stats.PointWriteReqHH, int64(len(points)))
hherr := w.HintedHandoff.WriteShard(shardID, owner.NodeID, points)
其他数据节点收到RPC消息后, service.go processWriteShardRequest
也是调用store.go WriteToShardWithContext,后面的处理和本地节点写入一样
image.png
写数据时只是写了缓存和WAL文件,并未写磁盘,那么什么时候写磁盘呢
答案是它会定时检查缓存大小或时间超过阈值就会写磁盘
func (e *Engine) ShouldCompactCache(t time.Time) bool {
sz := e.Cache.Size()
if sz == 0 {
return false
}
if sz > e.CacheFlushMemorySizeThreshold {
return true
}
return t.Sub(e.Cache.LastWriteTime()) > e.CacheFlushWriteColdDuration
}
cache写入磁盘:engine.go WriteSnapshot
调用
compact.go WriteSnapshot
compact.go write
WriteBlock WriteIndex 实现在writer.go tsmWriter
临时文件 ".influxdb\data1\large\autogen\64\000000006-000000001.tsm.tmp"