3.etcd源码笔记 - example

etcd 基于 raft library 写了一个demo,提供了简单的 put、get、节点变更,外界通过 http 访问

代码都在 etcd/contrib/raftexample 包下,可以先读 README.md 文档看下

一、整体结构

3.1 design.png
  • demo 支持3个功能 put、get、节点变更
  • put 请求不是直接调用应用层的接口,而是通过 propose chan,应用层再读取消息异步处理
  • config change 请求同上,通过 config change chan,应用层再读取消息异步处理
  • get 请求是直接通过接口调用,同步返回
  • 支持wal,后续介绍

二、基础元素

1. KV Storage

demo 很暴力的 用一个 map 实现了 kv storage

代码在 etcd/contrib/raftexample/kvstore.go

type kvstore struct {
    //只写chan,接收put请求
    proposeC    chan<- string
    mu          sync.RWMutex
    //简单的map实现kv存储
    kvStore     map[string]string
    //操作snapshot的类,这个类不关于数据格式,接收参数是 bytes
    snapshotter *snap.Snapshotter
}

//这是http put请求调用的方法
//没有直接更改 kvStore, 而是写入 proposeC
//再传入 raft 库,然后协商一致后,再写回 kvStore(后续详细介绍)
func (s *kvstore) Propose(k string, v string) {
    var buf bytes.Buffer
    if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {
        log.Fatal(err)
    }
    s.proposeC <- buf.String()
}

构造时会传入 *commitC <-chan string,只读取 chan,

应用层接收 raft-libraryready 消息后,会把协商一致的提案写入 commitCkvStore 读取后进行处理

func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) {
    //监听commitC
    for data := range commitC {
        //初始启动时,应用层会写入nil
        //触发 kvstore 加载磁盘文件
        if data == nil {
            snapshot, err := s.snapshotter.Load()
            ...
            continue
        }
        
        //如果是正常数据,就尝试用官方库序列化成二进制数据, 
        //成功的话就把原始值塞进map
        //之所以尝试序列化,是为了刷成snapshot做准备
        var dataKv kv
        dec := gob.NewDecoder(bytes.NewBufferString(*data))
        if err := dec.Decode(&dataKv); err != nil {
            log.Fatalf("raftexample: could not decode message (%v)", err)
        }
        s.mu.Lock()
        s.kvStore[dataKv.Key] = dataKv.Val
        s.mu.Unlock()
    }
    if err, ok := <-errorC; ok {
        log.Fatal(err)
    }
}

2. snapshotter

上述 kvstore 运行期间,数据是放在map,即内存,需要定期持久化,不然重启后神马数据都没有了。

持久化的对象是 raftpb.Snapshot,这个类来自etcd的公共包 etcd/raft/raftpb

type Snapshot struct {
    //数据格式由调用方决定,持久化之后就是二进制数组
    Data             []byte           `protobuf:"bytes,1,opt,name=data" json:"data,omitempty"`
    Metadata         SnapshotMetadata `protobuf:"bytes,2,opt,name=metadata" json:"metadata"`
    XXX_unrecognized []byte           `json:"-"`
}

type SnapshotMetadata struct {
    ConfState        ConfState `protobuf:"bytes,1,opt,name=conf_state,json=confState" json:"conf_state"`
    Index            uint64    `protobuf:"varint,2,opt,name=index" json:"index"`
    Term             uint64    `protobuf:"varint,3,opt,name=term" json:"term"`
    XXX_unrecognized []byte    `json:"-"`
}

//在这个demo里,数据格式就是把map刷成bytes
func (s *kvstore) getSnapshot() ([]byte, error) {
    s.mu.Lock()
    defer s.mu.Unlock()
    return json.Marshal(s.kvStore)
}

在公共包内 etcd/snap 有一个类 snap.Snapshotter ,作用是操作 raftpb.Snapshot,包括以下功能

  • 写磁盘文件
//raftpb.Snapshot.Metadata 包含 Term、Index 信息
//文件命名规则是 “Term-Index.snap”,其中 Term、Index 不足16位,在前面补0
//这个Index表示 snapshot的最后一条记录的 Index
//不关心格式,参数丢进来就是raftpb.Snapshot
func (s *Snapshotter) SaveSnap(snapshot raftpb.Snapshot) error
  • 加载磁盘文件
//加载的是最新的文件
func (s *Snapshotter) Load() (*raftpb.Snapshot, error)

3. WAL

持久化光有 snapshot 不够,因为持久化不是实时的,一般是定时定量。

一旦重启或宕机,最近一次持久化到服务挂掉这期间的数据还是有可能会丢失, WAL 正是解决这个问题的机制。

WALwrite ahead log 的缩写,即在执行真正的写操作之前先写一个操作日志,这些日志都会严格保证持久化,即实时持久化,以保证整个操作的一致性和可恢复性。

服务重启时,kvstoremap 根据 snapshot + WAL (基量 + 增量) 可以得到完整的恢复。

关于 WAL 有太多的文章介绍,这边就记录阅读源码时遇到的几个问题。

  • 文件命名规则
//“seq-index.wal”,其中 seq、index 不足16位,在前面补0
//seq是递增,实际创建文件时基于wal目录下的 “最新文件的文件名里的 seq” + 1
func walName(seq, index uint64) string {
    return fmt.Sprintf("%016x-%016x.wal", seq, index)
}
  • 日志格式

文件的读写分别交给 wal/encoder.go:decoderwal/encoder.go:encoder 处理

写日志时,需要8字节对齐,这是为了性能考虑,cpu缓冲命中率有关系

具体可以参考 https://stackoverflow.com/questions/21219130/is-8-byte-alignment-for-double-type-necessary

3.2 wal.png
  • 文件大小 默认是64MB

三、流程 - 启动

1. 入口

func main() {
    //解析参数
    cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")
    id := flag.Int("id", 1, "node ID")
    kvport := flag.Int("port", 9121, "key-value server port")
    join := flag.Bool("join", false, "join an existing cluster")
    flag.Parse()

    //创建put请求写入的 propose chan
    proposeC := make(chan string)
    defer close(proposeC)
    //创建节点变更请求写入的 config change chan
    confChangeC := make(chan raftpb.ConfChange)
    defer close(confChangeC)

    var kvs *kvstore
    //应用层打包snapshot时,使用的是 kvs.getSnapshot(),就是直接序列化map
    getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }
    //应用层基于 raft library 创建一个 raftNode
    //返回 commitC 上面有介绍
    //返回 snapshotterReady,这个是为了保证下面的 newKVStore 在raftNode执行完再执行
    commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)

    //创建kvstore
    kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)

    //启动http服务
    serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)
}

2. newRaftNode

func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,
    confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error, <-chan *snap.Snapshotter) {
    ...
    go rc.startRaft()
    return commitC, errorC, rc.snapshotterReady
}

func (rc *raftNode) startRaft() {
    ...
    rc.snapshotter = snap.New(rc.snapdir)
    rc.snapshotterReady <- rc.snapshotter

    oldwal := wal.Exist(rc.waldir)
    rc.wal = rc.replayWAL()
    
    //调用 raft library 的 接口,启动一个 node
    if oldwal {
        rc.node = raft.RestartNode(c)
    } else {
        startPeers := rpeers
        if rc.join {
            startPeers = nil
        }
        rc.node = raft.StartNode(c, startPeers)
    }
    //创建传输层
    rc.transport = &rafthttp.Transport{
        ID:          types.ID(rc.id),
        ClusterID:   0x1000,
        Raft:        rc,
        ServerStats: stats.NewServerStats("", ""),
        LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)),
        ErrorC:      make(chan error),
    }

    rc.transport.Start()
    for i := range rc.peers {
        if i+1 != rc.id {
            rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
        }
    }
    
    go rc.serveRaft()
    //监听raft library 的 ready chan
    go rc.serveChannels()
}

func (rc *raftNode) serveChannels() {
    ...
    ticker := time.NewTicker(100 * time.Millisecond)
    defer ticker.Stop()
    //接收http层的请求
    go func() {
        ..
        for rc.proposeC != nil && rc.confChangeC != nil {
            select {
            case prop, ok := <-rc.proposeC:
                if !ok {
                    rc.proposeC = nil
                } else {
                    //put请求最终会在这边进行处理
                    rc.node.Propose(context.TODO(), []byte(prop))
                }
            case cc, ok := <-rc.confChangeC:
                if !ok {
                    rc.confChangeC = nil
                } else {
                    confChangeCount += 1
                    cc.ID = confChangeCount
                    //节点变更请求最终会在这边进行处理
                    rc.node.ProposeConfChange(context.TODO(), cc)
                }
            }
        }
        // client closed channel; shutdown raft if not already
        close(rc.stopc)
    }()

    //监听 raft library 抛出来的事件
    for {
        select {
        //周期性的执行任务,心跳、选举
        case <-ticker.C:
            rc.node.Tick()
        //raft library提交的事件
        case rd := <-rc.node.Ready():
            //先存wal
            rc.wal.Save(rd.HardState, rd.Entries)
            //如果有同步snapshot,则将snapshot存下来
            if !raft.IsEmptySnap(rd.Snapshot) {
                rc.saveSnap(rd.Snapshot)
                rc.raftStorage.ApplySnapshot(rd.Snapshot)
                rc.publishSnapshot(rd.Snapshot)
            }
            rc.raftStorage.Append(rd.Entries)
            //如果有消息需要发给其他节点,发之
            rc.transport.Send(rd.Messages)
            //应用层处理已提交的提案,具体就是写到 commit chan,然后 kvstore 消费存储到其 map 中
            if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
                rc.stop()
                return
            }
            //如果有需要对数据进行打包,打包之
            rc.maybeTriggerSnapshot()
            //告诉 raft 状态机 可以继续下一步的处理了
            rc.node.Advance()
        ...
        }
    }
}

3. newKVStore

可以参考上述的基础元素说明

4. serveHttpKVAPI

func serveHttpKVAPI(kv *kvstore, port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) {
    srv := http.Server{
        Addr: ":" + strconv.Itoa(port),
        //所有请求的接收都在httpKVAPI
        Handler: &httpKVAPI{
            store:       kv,
            confChangeC: confChangeC,
        },
    }
    ...
}

func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    key := r.RequestURI
    switch {
    case r.Method == "PUT":
        ...
        //调用的是 kvstore.Propose
        //具体操作就是写到 Propose Chan,应用层再消费之,最终调用 raftLibrary.Propose
        h.store.Propose(key, string(v))
        ...
    case r.Method == "GET":
        ...
        //直接就是访问kvStore的map
        if v, ok := h.store.Lookup(key); ok {
            w.Write([]byte(v))
        } else {
            http.Error(w, "Failed to GET", http.StatusNotFound)
        }
    case r.Method == "POST":
        ...
        //万万没想到 POST 方法是用来进行节点变更的
        cc := raftpb.ConfChange{
            Type:    raftpb.ConfChangeAddNode,
            NodeID:  nodeId,
            Context: url,
        }
        //写入 config Change Chan,应用层再消费之,最终调用 raftLibrary.ProposeConfChange
        h.confChangeC <- cc
        ...
    case r.Method == "DELETE":
        ...
        //删除节点,同POST
        cc := raftpb.ConfChange{
            Type:   raftpb.ConfChangeRemoveNode,
            NodeID: nodeId,
        }
        h.confChangeC <- cc
        ...
    default:
        ...
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容

  • 提到etcd很多人第一反应就是一个键值存储仓库。不过etcd官方文档的定义却是这样的: A highly-avai...
    神奇的考拉阅读 6,208评论 1 19
  • 寻找一种易于理解的一致性算法(扩展版) 摘要 Raft 是一种为了管理复制日志的一致性算法。它提供了和 Paxos...
    yflau阅读 940评论 0 1
  • 你开店来,我效仿,卖家世界大众脸,真伪难辨买家愁,价格战,买家卖家皆受损,众里寻他千百度,蓦然回首,正品实买就在这...
    rb123456阅读 161评论 0 0
  • 001 感恩先生的全力支持。今天做大扫除,整个家基本上重新整理了一遍,先生帮忙的同时还做好饭、接送孩子上兴趣班。 ...
    盛娟Sanny阅读 150评论 0 0
  • 2018.07.06.星期五,天气晴 今天七点开家长会,这就意味着大宝的暑假生活即将开始,下班急匆匆的赶到大...
    任昱丞妈妈阅读 178评论 0 0