etcd 基于 raft library 写了一个demo,提供了简单的 put、get、节点变更,外界通过 http 访问
代码都在 etcd/contrib/raftexample 包下,可以先读 README.md 文档看下
一、整体结构
- demo 支持3个功能 put、get、节点变更
- put 请求不是直接调用应用层的接口,而是通过 propose chan,应用层再读取消息异步处理
- config change 请求同上,通过 config change chan,应用层再读取消息异步处理
- get 请求是直接通过接口调用,同步返回
- 支持wal,后续介绍
二、基础元素
1. KV Storage
demo 很暴力的 用一个 map 实现了 kv storage
代码在 etcd/contrib/raftexample/kvstore.go
type kvstore struct {
//只写chan,接收put请求
proposeC chan<- string
mu sync.RWMutex
//简单的map实现kv存储
kvStore map[string]string
//操作snapshot的类,这个类不关于数据格式,接收参数是 bytes
snapshotter *snap.Snapshotter
}
//这是http put请求调用的方法
//没有直接更改 kvStore, 而是写入 proposeC
//再传入 raft 库,然后协商一致后,再写回 kvStore(后续详细介绍)
func (s *kvstore) Propose(k string, v string) {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {
log.Fatal(err)
}
s.proposeC <- buf.String()
}
构造时会传入 *commitC <-chan string,只读取 chan,
应用层接收 raft-library 的 ready 消息后,会把协商一致的提案写入 commitC,kvStore 读取后进行处理
func (s *kvstore) readCommits(commitC <-chan *string, errorC <-chan error) {
//监听commitC
for data := range commitC {
//初始启动时,应用层会写入nil
//触发 kvstore 加载磁盘文件
if data == nil {
snapshot, err := s.snapshotter.Load()
...
continue
}
//如果是正常数据,就尝试用官方库序列化成二进制数据,
//成功的话就把原始值塞进map
//之所以尝试序列化,是为了刷成snapshot做准备
var dataKv kv
dec := gob.NewDecoder(bytes.NewBufferString(*data))
if err := dec.Decode(&dataKv); err != nil {
log.Fatalf("raftexample: could not decode message (%v)", err)
}
s.mu.Lock()
s.kvStore[dataKv.Key] = dataKv.Val
s.mu.Unlock()
}
if err, ok := <-errorC; ok {
log.Fatal(err)
}
}
2. snapshotter
上述 kvstore 运行期间,数据是放在map,即内存,需要定期持久化,不然重启后神马数据都没有了。
持久化的对象是 raftpb.Snapshot,这个类来自etcd的公共包 etcd/raft/raftpb
type Snapshot struct {
//数据格式由调用方决定,持久化之后就是二进制数组
Data []byte `protobuf:"bytes,1,opt,name=data" json:"data,omitempty"`
Metadata SnapshotMetadata `protobuf:"bytes,2,opt,name=metadata" json:"metadata"`
XXX_unrecognized []byte `json:"-"`
}
type SnapshotMetadata struct {
ConfState ConfState `protobuf:"bytes,1,opt,name=conf_state,json=confState" json:"conf_state"`
Index uint64 `protobuf:"varint,2,opt,name=index" json:"index"`
Term uint64 `protobuf:"varint,3,opt,name=term" json:"term"`
XXX_unrecognized []byte `json:"-"`
}
//在这个demo里,数据格式就是把map刷成bytes
func (s *kvstore) getSnapshot() ([]byte, error) {
s.mu.Lock()
defer s.mu.Unlock()
return json.Marshal(s.kvStore)
}
在公共包内 etcd/snap 有一个类 snap.Snapshotter ,作用是操作 raftpb.Snapshot,包括以下功能
- 写磁盘文件
//raftpb.Snapshot.Metadata 包含 Term、Index 信息
//文件命名规则是 “Term-Index.snap”,其中 Term、Index 不足16位,在前面补0
//这个Index表示 snapshot的最后一条记录的 Index
//不关心格式,参数丢进来就是raftpb.Snapshot
func (s *Snapshotter) SaveSnap(snapshot raftpb.Snapshot) error
- 加载磁盘文件
//加载的是最新的文件
func (s *Snapshotter) Load() (*raftpb.Snapshot, error)
3. WAL
持久化光有 snapshot 不够,因为持久化不是实时的,一般是定时定量。
一旦重启或宕机,最近一次持久化到服务挂掉这期间的数据还是有可能会丢失, WAL 正是解决这个问题的机制。
WAL 是 write ahead log 的缩写,即在执行真正的写操作之前先写一个操作日志,这些日志都会严格保证持久化,即实时持久化,以保证整个操作的一致性和可恢复性。
服务重启时,kvstore 的 map 根据 snapshot + WAL (基量 + 增量) 可以得到完整的恢复。
关于 WAL 有太多的文章介绍,这边就记录阅读源码时遇到的几个问题。
- 文件命名规则
//“seq-index.wal”,其中 seq、index 不足16位,在前面补0
//seq是递增,实际创建文件时基于wal目录下的 “最新文件的文件名里的 seq” + 1
func walName(seq, index uint64) string {
return fmt.Sprintf("%016x-%016x.wal", seq, index)
}
- 日志格式
文件的读写分别交给 wal/encoder.go:decoder 及 wal/encoder.go:encoder 处理
写日志时,需要8字节对齐,这是为了性能考虑,cpu缓冲命中率有关系
具体可以参考 https://stackoverflow.com/questions/21219130/is-8-byte-alignment-for-double-type-necessary
- 文件大小 默认是64MB
三、流程 - 启动
1. 入口
func main() {
//解析参数
cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")
id := flag.Int("id", 1, "node ID")
kvport := flag.Int("port", 9121, "key-value server port")
join := flag.Bool("join", false, "join an existing cluster")
flag.Parse()
//创建put请求写入的 propose chan
proposeC := make(chan string)
defer close(proposeC)
//创建节点变更请求写入的 config change chan
confChangeC := make(chan raftpb.ConfChange)
defer close(confChangeC)
var kvs *kvstore
//应用层打包snapshot时,使用的是 kvs.getSnapshot(),就是直接序列化map
getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }
//应用层基于 raft library 创建一个 raftNode
//返回 commitC 上面有介绍
//返回 snapshotterReady,这个是为了保证下面的 newKVStore 在raftNode执行完再执行
commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)
//创建kvstore
kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)
//启动http服务
serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)
}
2. newRaftNode
func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,
confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error, <-chan *snap.Snapshotter) {
...
go rc.startRaft()
return commitC, errorC, rc.snapshotterReady
}
func (rc *raftNode) startRaft() {
...
rc.snapshotter = snap.New(rc.snapdir)
rc.snapshotterReady <- rc.snapshotter
oldwal := wal.Exist(rc.waldir)
rc.wal = rc.replayWAL()
//调用 raft library 的 接口,启动一个 node
if oldwal {
rc.node = raft.RestartNode(c)
} else {
startPeers := rpeers
if rc.join {
startPeers = nil
}
rc.node = raft.StartNode(c, startPeers)
}
//创建传输层
rc.transport = &rafthttp.Transport{
ID: types.ID(rc.id),
ClusterID: 0x1000,
Raft: rc,
ServerStats: stats.NewServerStats("", ""),
LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)),
ErrorC: make(chan error),
}
rc.transport.Start()
for i := range rc.peers {
if i+1 != rc.id {
rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
}
}
go rc.serveRaft()
//监听raft library 的 ready chan
go rc.serveChannels()
}
func (rc *raftNode) serveChannels() {
...
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
//接收http层的请求
go func() {
..
for rc.proposeC != nil && rc.confChangeC != nil {
select {
case prop, ok := <-rc.proposeC:
if !ok {
rc.proposeC = nil
} else {
//put请求最终会在这边进行处理
rc.node.Propose(context.TODO(), []byte(prop))
}
case cc, ok := <-rc.confChangeC:
if !ok {
rc.confChangeC = nil
} else {
confChangeCount += 1
cc.ID = confChangeCount
//节点变更请求最终会在这边进行处理
rc.node.ProposeConfChange(context.TODO(), cc)
}
}
}
// client closed channel; shutdown raft if not already
close(rc.stopc)
}()
//监听 raft library 抛出来的事件
for {
select {
//周期性的执行任务,心跳、选举
case <-ticker.C:
rc.node.Tick()
//raft library提交的事件
case rd := <-rc.node.Ready():
//先存wal
rc.wal.Save(rd.HardState, rd.Entries)
//如果有同步snapshot,则将snapshot存下来
if !raft.IsEmptySnap(rd.Snapshot) {
rc.saveSnap(rd.Snapshot)
rc.raftStorage.ApplySnapshot(rd.Snapshot)
rc.publishSnapshot(rd.Snapshot)
}
rc.raftStorage.Append(rd.Entries)
//如果有消息需要发给其他节点,发之
rc.transport.Send(rd.Messages)
//应用层处理已提交的提案,具体就是写到 commit chan,然后 kvstore 消费存储到其 map 中
if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
rc.stop()
return
}
//如果有需要对数据进行打包,打包之
rc.maybeTriggerSnapshot()
//告诉 raft 状态机 可以继续下一步的处理了
rc.node.Advance()
...
}
}
}
3. newKVStore
可以参考上述的基础元素说明
4. serveHttpKVAPI
func serveHttpKVAPI(kv *kvstore, port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) {
srv := http.Server{
Addr: ":" + strconv.Itoa(port),
//所有请求的接收都在httpKVAPI
Handler: &httpKVAPI{
store: kv,
confChangeC: confChangeC,
},
}
...
}
func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
key := r.RequestURI
switch {
case r.Method == "PUT":
...
//调用的是 kvstore.Propose
//具体操作就是写到 Propose Chan,应用层再消费之,最终调用 raftLibrary.Propose
h.store.Propose(key, string(v))
...
case r.Method == "GET":
...
//直接就是访问kvStore的map
if v, ok := h.store.Lookup(key); ok {
w.Write([]byte(v))
} else {
http.Error(w, "Failed to GET", http.StatusNotFound)
}
case r.Method == "POST":
...
//万万没想到 POST 方法是用来进行节点变更的
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: nodeId,
Context: url,
}
//写入 config Change Chan,应用层再消费之,最终调用 raftLibrary.ProposeConfChange
h.confChangeC <- cc
...
case r.Method == "DELETE":
...
//删除节点,同POST
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
NodeID: nodeId,
}
h.confChangeC <- cc
...
default:
...
}
}