Influxdb Cluster下的数据写入

Cluster下的数据写入

数据写入的实现
  1. 主要分析cluster/points_writer.go中的WritePoints函数的实现
// WritePoints writes across multiple local and remote data nodes according the consistency level.
func (w *PointsWriter) WritePoints(p *WritePointsRequest) error {
    w.statMap.Add(statWriteReq, 1)
    w.statMap.Add(statPointWriteReq, int64(len(p.Points)))

    //2.1 先获取RetentionPolicy
    if p.RetentionPolicy == "" {
        db, err := w.MetaClient.Database(p.Database)
        if err != nil {
            return err
        } else if db == nil {
            return influxdb.ErrDatabaseNotFound(p.Database)
        }
        p.RetentionPolicy = db.DefaultRetentionPolicy
    }

    // 2.2 生成 shardMap
    shardMappings, err := w.MapShards(p)
    if err != nil {
        return err
    }

    // Write each shard in it's own goroutine and return as soon
    // as one fails.
    ch := make(chan error, len(shardMappings.Points))
    for shardID, points := range shardMappings.Points {
    
        // 2.3 写入数据到Shard
        go func(shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) {
            ch <- w.writeToShard(shard, p.Database, p.RetentionPolicy, p.ConsistencyLevel, points)
        }(shardMappings.Shards[shardID], p.Database, p.RetentionPolicy, points)
    }

    // Send points to subscriptions if possible.
    ok := false
    // We need to lock just in case the channel is about to be nil'ed
    w.mu.RLock()
    select {
    case w.subPoints <- p:
        ok = true
    default:
    }
    w.mu.RUnlock()
    if ok {
        w.statMap.Add(statSubWriteOK, 1)
    } else {
        w.statMap.Add(statSubWriteDrop, 1)
    }

    // 2.4 等待写入完成 
    for range shardMappings.Points {
        select {
        case <-w.closing:
            return ErrWriteFailed
        case err := <-ch:
            if err != nil {
                return err
            }
        }
    }
    return nil
}
  1. 上面的函数实现主要分如下几个步骤
    2.1 获取对应的RetentionPolicy
    2.2 生成ShardMap, 将各个point对应到相应ShardGroup中的Shard中, 这步很关键
    2.3 按ShardId不同,开启新的goroutine, 将points写入相应的Shard,可能设计对写入数据到其它的DataNode上;
    2.4 等待写入完成或退出
ShardMap的生成
  1. 先讲一下ShardGroup的概念
    1.1 写入Influxdb的每一条数据对带有相应的time时间,每一个SharGroup都有自己的start和end时间,这个时间跨度是由用户写入时选取的RetentionPolicy时的ShardGroupDarution决定,这样每条写入的数据就必然仅属于一个确定的ShardGroup中;

  2. 主要实现在cluster/points_writer.go中的MapShards

func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) {

    // holds the start time ranges for required shard groups
    timeRanges := map[time.Time]*meta.ShardGroupInfo{}

    rp, err := w.MetaClient.RetentionPolicy(wp.Database, wp.RetentionPolicy)
    if err != nil {
        return nil, err
    }
    if rp == nil {
        return nil, influxdb.ErrRetentionPolicyNotFound(wp.RetentionPolicy)
    }

    for _, p := range wp.Points {
        timeRanges[p.Time().Truncate(rp.ShardGroupDuration)] = nil
    }

    // holds all the shard groups and shards that are required for writes
    for t := range timeRanges {
        sg, err := w.MetaClient.CreateShardGroup(wp.Database, wp.RetentionPolicy, t)
        if err != nil {
            return nil, err
        }
        timeRanges[t] = sg
    }

    mapping := NewShardMapping()
    for _, p := range wp.Points {
        sg := timeRanges[p.Time().Truncate(rp.ShardGroupDuration)]
        sh := sg.ShardFor(p.HashID())
        mapping.MapPoint(&sh, p)
    }
    return mapping, nil
}
  1. 我们来拆解下上面函数的实现
    3.1 扫描所有的points, 按时间确定我们需要多个ShardGroup
for _, p := range wp.Points {
        timeRanges[p.Time().Truncate(rp.ShardGroupDuration)] = nil
    }

3.2 调用w.MetaClient.CreateShardGroup, 如果ShardGroup存在直接返回ShardGroup信息,如果不存在创建,创建过程涉及到将CreateShardGroup的请求发送给MetadataServer并等待本地更新到新的MetaData数据;

sg, err := w.MetaClient.CreateShardGroup(wp.Database, wp.RetentionPolicy, t)

3.3 分析ShardGroup的分配规则, 在services/meta/data.go中的CreateShardGroup

func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time) error {
    ...

    // Require at least one replica but no more replicas than nodes.
    // 确认复本数,不能大于DataNode节点总数
    replicaN := rpi.ReplicaN
    if replicaN == 0 {
        replicaN = 1
    } else if replicaN > len(data.DataNodes) {
        replicaN = len(data.DataNodes)
    }

    // Determine shard count by node count divided by replication factor.
    // This will ensure nodes will get distributed across nodes evenly and
    // replicated the correct number of times.
    // 根据复本数确定Shard数量
    shardN := len(data.DataNodes) / replicaN

    // Create the shard group.
    // 创建ShardGroup
    data.MaxShardGroupID++
    sgi := ShardGroupInfo{}
    sgi.ID = data.MaxShardGroupID
    sgi.StartTime = timestamp.Truncate(rpi.ShardGroupDuration).UTC()
    sgi.EndTime = sgi.StartTime.Add(rpi.ShardGroupDuration).UTC()

    // Create shards on the group.
    sgi.Shards = make([]ShardInfo, shardN)
    for i := range sgi.Shards {
        data.MaxShardID++
        sgi.Shards[i] = ShardInfo{ID: data.MaxShardID}
    }

    // Assign data nodes to shards via round robin.
    // Start from a repeatably "random" place in the node list.
    // ShardInfo中的Owners记录了当前Shard所有复本所在DataNode的信息
    // 分Shard的所有复本分配DataNode
    // 使用data.Index作为基数确定开始的DataNode,然后使用 round robin策略分配
    // data.Index:每次meta信息有更新,Index就会更新, 可以理解为meta信息的版本号
    nodeIndex := int(data.Index % uint64(len(data.DataNodes)))
    for i := range sgi.Shards {
        si := &sgi.Shards[i]
        for j := 0; j < replicaN; j++ {
            nodeID := data.DataNodes[nodeIndex%len(data.DataNodes)].ID
            si.Owners = append(si.Owners, ShardOwner{NodeID: nodeID})
            nodeIndex++
        }
    }

    // Retention policy has a new shard group, so update the policy. Shard
    // Groups must be stored in sorted order, as other parts of the system
    // assume this to be the case.
    rpi.ShardGroups = append(rpi.ShardGroups, sgi)
    sort.Sort(ShardGroupInfos(rpi.ShardGroups))

    return nil
}

3.3 按每一个具体的point对应到ShardGroup中的一个Shard: 按point的HashID来对Shard总数取模,HashID是measurment + tag set的Hash值

for _, p := range wp.Points {
        sg := timeRanges[p.Time().Truncate(rp.ShardGroupDuration)]
        sh := sg.ShardFor(p.HashID())
        mapping.MapPoint(&sh, p)
    }
....

 func (sgi *ShardGroupInfo) ShardFor(hash uint64) ShardInfo {
    return sgi.Shards[hash%uint64(len(sgi.Shards))]
}
数据按一致性要求写入
  1. 过程简述
    1.1 根据一致性要求确认需要成功写入几份
switch consistency {
    // 对于ConsistencyLevelAny, ConsistencyLevelOne只需要写入一份即满足一致性要求,返回客户端
    case ConsistencyLevelAny, ConsistencyLevelOne:
        required = 1
    case ConsistencyLevelQuorum:
        required = required/2 + 1
    }

1.2 根据Shard.Owners对应的DataNode, 向其中的每个DataNode写入数据,如果是本机,直接调用w.TSDBStore.WriteToShard写入;如果非本机,调用err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points);
1.3 写入远端失败时,数据写入HintedHandoff本地磁盘队列多次重试写到远端,直到数据过期被清理;对于一致性要求是ConsistencyLevelAny, 写入本地HintedHandoff成功,就算是写入成功;

    w.statMap.Add(statWritePointReqHH, int64(len(points)))
                hherr := w.HintedHandoff.WriteShard(shardID, owner.NodeID, points)
                if hherr != nil {
                    ch <- &AsyncWriteResult{owner, hherr}
                    return
                }

                if hherr == nil && consistency == ConsistencyLevelAny {
                    ch <- &AsyncWriteResult{owner, nil}
                    return
            }

1.4 等待写入超时或完成

for range shard.Owners {
        select {
        case <-w.closing:
            return ErrWriteFailed
        case <-timeout:
            w.statMap.Add(statWriteTimeout, 1)
            // return timeout error to caller
            return ErrTimeout
        case result := <-ch:
            // If the write returned an error, continue to the next response
            if result.Err != nil {
                if writeError == nil {
                    writeError = result.Err
                }
                continue
            }

            wrote++

            // 写入已达到一致性要求,就立即返回
            if wrote >= required {
                w.statMap.Add(statWriteOK, 1)
                return nil
            }
        }
    }
HintedHandoff服务
  1. 定义在services/hh/service.go

  2. 写入HintedHandoff中的数据,按NodeID的不同写入不同的目录,每个目录下又分多个文件,每个文件作为一个segment, 命名规则就是依次递增的id, id的大小按序就是写入的时间按从旧到新排序;


    hitnedhandoff.png
  3. HintedHandoff服务会针对每一个远端DataNode创建NodeProcessor, 每个负责自己DataNode的写入, 运行在一个单独的goroutine中

  4. 在每个goroutine中,作两件事:一个是定时清理过期的数据,如果被清理掉的数据还没有成功写入到远端,则会丢失;二是从文件读取数据写入到远端;

func (n *NodeProcessor) run() {
    defer n.wg.Done()

    ...

    for {
        select {
        case <-n.done:
            return

        case <-time.After(n.PurgeInterval):
            if err := n.queue.PurgeOlderThan(time.Now().Add(-n.MaxAge)); err != nil {
                n.Logger.Printf("failed to purge for node %d: %s", n.nodeID, err.Error())
            }

        case <-time.After(currInterval):
            limiter := NewRateLimiter(n.RetryRateLimit)
            for {
                c, err := n.SendWrite()
                if err != nil {
                    if err == io.EOF {
                        // No more data, return to configured interval
                        currInterval = time.Duration(n.RetryInterval)
                    } else {
                        currInterval = currInterval * 2
                        if currInterval > time.Duration(n.RetryMaxInterval) {
                            currInterval = time.Duration(n.RetryMaxInterval)
                        }
                    }
                    break
                }

                // Success! Ensure backoff is cancelled.
                currInterval = time.Duration(n.RetryInterval)

                // Update how many bytes we've sent
                limiter.Update(c)

                // Block to maintain the throughput rate
                time.Sleep(limiter.Delay())
            }
        }
    }
}
  1. 数据的本地存储和读取
    5.1 定义在services/hh/queue.go,所有的segment file在内存中组织成一个队列,读从head指向的segment读取,写入到tail指向的segment, 每个segment文件的最后8字节记录当前segment文件已经读到什么位置
    5.2 清理,当这个segment文件内容都发送完当前文件会被删除,周期性清理每次只会check当前head指向的segment是否需要清理掉
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342

推荐阅读更多精彩内容