raft理论与实践[6]-lab3a-基于raft构建分布式容错kv服务

准备工作

前言

  • 在之前的文章中,我们实现了raft算法的基本框架

  • 在本实验中,我们将基于raft算法实现分布式容错的kv服务器

  • 客户端用于交互raft服务器

  • kvraft/client.go文件用于书写我们的客户端代码,调用Clerk的Get/Put/Append方法为系统提供强一致性的保证

  • 这里的强一致性指的是,如果我们一个一个的调用(而不是并发)Clerk的Get/Put/Append方法,那么我们的系统就好像是只有一个raft服务器存在一样,并且调用是序列的,即后面的调用比前面的调用后执行

  • 对于并发调用,最终状态可能难以预料,但是必须与这些方法按某种顺序序列化后执行一次的结果相同

  • 如果调用在时间上重叠,则这些调用是并发的。例如,如果客户端X调用Clerk.Put(),同时客户端Y调用Clerk.Append()

  • 同时,后面的方法在执行之前,必须保证已经观察到前面所有方法执行后的状态(技术上叫做线性化(linearizability))

  • 强一致性保证对应用程序很方便,因为这意味着所有客户端都看到相同的最新状态

  • 对于单个服务器,强一致性相对简单。多台的副本服务器却相对困难,因为所有服务器必须为并发请求选择相同的执行顺序,并且必须避免使用最新状态来回复客户端

本服务实现的功能

  • 本服务支持3种基本的操作,Put(key, value), Append(key, arg), and Get(key)

  • 维护着一个简单的键/值对数据库

  • Put(key, value)将数据库中特定key的值绑定为value

  • Append(key, arg)添加,将arg与key对应。如果key的值不存在,则其行为类似于Put

  • Get(key) 获取当前key的值

  • 在本实验中,我们将实现服务具体的功能,而不必担心Raft log日志会无限增长

实验思路

  • 对lab2中的raft服务器架构进行封装,封装上一些数据库、数据库快照、并会处理log的具体执行逻辑。

  • 对于数据库执行的Get/Put/Append方法都对其进行序列化并放入到lab2 raft的体系中,这样就能保证这些方法的一致性

获取源代码

  • 假设读者已经阅读了准备工作中的一系列文章

  • 在此基础上我们增加了本实验的基本框架kvraft文件以及linearizability文件

  • 读者需要在kvraft文件夹中,实验本实验的具体功能

  • 获取实验代码如下

<pre style="font-family: Consolas, "Liberation Mono", Menlo, Courier, monospace; -webkit-font-smoothing: antialiased; margin: 0px; padding: 0.88889em; max-width: 100%; font-size: 0.9em; word-break: break-all; white-space: pre-line; font-style: normal; font-variant: normal; font-weight: normal; font-stretch: normal; line-height: 1.45; color: rgb(86, 116, 130); word-wrap: normal; background: rgb(246, 246, 246); overflow: auto; letter-spacing: normal; orphans: 2; text-align: left; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">git clone git@github.com:dreamerjackson/golang-deep-distributed-lab.git
git reset --hard d345b34bc</pre>

客户端

  • Clerk结构体存储了所有raft服务器的客户端servers []*labrpc.ClientEnd,因此我们可以通过Clerk结构体与所有raft服务器通信

  • 我们需要为Clerk结构体实现Put(key, value), Append(key, arg), Get(key)方法

  • Clerk结构体是我们连接raft服务器的桥梁

  • 注意Clerk必须将方法发送到当前的leader节点中,由于其可能并不会知道哪一个节点为leader,因此需要重试。但是记住保存上一个leader的id会加快这一过程,因为leader在稳定的系统里面是不会变的。

  • 客户端必须要等到此操作不仅为commit,而且已经被完全应用后,才能够返回,这才能够保证下次get操作能够得到最新的

  • 需要注意的是,如果raft服务器出现了分区,可能会陷入一直等待,直到分区消失

补充Clerk

  • leader记录最后一个leader的序号

  • seq 记录rpc的序号

  • id记录客户端的唯一id

<pre style="font-family: Consolas, "Liberation Mono", Menlo, Courier, monospace; -webkit-font-smoothing: antialiased; margin: 0px; padding: 0.88889em; max-width: 100%; font-size: 0.9em; word-break: break-all; white-space: pre-line; font-style: normal; font-variant: normal; font-weight: normal; font-stretch: normal; line-height: 1.45; color: rgb(86, 116, 130); word-wrap: normal; background: rgb(246, 246, 246); overflow: auto; letter-spacing: normal; orphans: 2; text-align: left; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">type Clerk struct {
...
leader int // remember last leader
seq int // RPC sequence number
id int64 // client id
}</pre>

补充Get方法

  • Get方法会遍历访问每一个raft服务,直到找到leader

  • 调用时会陷入堵塞,等待rpc方法返回

  • 设置有超时时间,一旦超时,会重新发送

  • 为了保证Get方法到的数据是准确最新的,也必须要将其加入到raft算法中

  • 客户端必须要等到此操作不仅为commit,而且已经被完全应用后,才能够返回,这才能够保证下次get操作能够得到最新的。

<pre style="font-family: Consolas, "Liberation Mono", Menlo, Courier, monospace; -webkit-font-smoothing: antialiased; margin: 0px; padding: 0.88889em; max-width: 100%; font-size: 0.9em; word-break: break-all; white-space: pre-line; font-style: normal; font-variant: normal; font-weight: normal; font-stretch: normal; line-height: 1.45; color: rgb(86, 116, 130); word-wrap: normal; background: rgb(246, 246, 246); overflow: auto; letter-spacing: normal; orphans: 2; text-align: left; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">func (ck *Clerk) Get(key string) string {
DPrintf("Clerk: Get: %q\n", key)
cnt := len(ck.servers)
for {
args := &GetArgs{Key: key, ClientID: ck.id, SeqNo: ck.seq}
reply := new(GetReply)

    ck.leader %= cnt
    done := make(chan bool, 1)
    go func() {
        ok := ck.servers[ck.leader].Call("KVServer.Get", args, reply)
        done <- ok
    }()
    select {
    case <-time.After(200 * time.Millisecond): // rpc timeout: 200ms
        ck.leader++
        continue
    case ok := <-done:
        if ok && !reply.WrongLeader {
            ck.seq++
            if reply.Err == OK {
                return reply.Value
            }
            return ""
        }
        ck.leader++
    }
}

return ""

}</pre>

补充Append和Put方法

  • 调用同一个PutAppend方法,但是最后一个参数用于标识具体的操作

<pre style="font-family: Consolas, "Liberation Mono", Menlo, Courier, monospace; -webkit-font-smoothing: antialiased; margin: 0px; padding: 0.88889em; max-width: 100%; font-size: 0.9em; word-break: break-all; white-space: pre-line; font-style: normal; font-variant: normal; font-weight: normal; font-stretch: normal; line-height: 1.45; color: rgb(86, 116, 130); word-wrap: normal; background: rgb(246, 246, 246); overflow: auto; letter-spacing: normal; orphans: 2; text-align: left; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">func (ck *Clerk) Put(key string, value string) {
ck.PutAppend(key, value, "Put")
}
func (ck *Clerk) Append(key string, value string) {
ck.PutAppend(key, value, "Append")
}</pre>

  • 和Get方法相似,遍历访问每一个raft服务,直到找到leader

  • 调用时会陷入堵塞,等待rpc方法返回

  • 设置有超时时间,一旦超时,会重新发送

  • 客户端必须要等到此操作不仅为commit,而且已经被完全应用后,才能够返回,这才能够保证下次get操作能够得到最新的。

<pre style="font-family: Consolas, "Liberation Mono", Menlo, Courier, monospace; -webkit-font-smoothing: antialiased; margin: 0px; padding: 0.88889em; max-width: 100%; font-size: 0.9em; word-break: break-all; white-space: pre-line; font-style: normal; font-variant: normal; font-weight: normal; font-stretch: normal; line-height: 1.45; color: rgb(86, 116, 130); word-wrap: normal; background: rgb(246, 246, 246); overflow: auto; letter-spacing: normal; orphans: 2; text-align: left; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">func (ck *Clerk) PutAppend(key string, value string, op string) {
// You will have to modify this function.
DPrintf("Clerk: PutAppend: %q => (%q,%q) from: %d\n", op, key, value, ck.id)
cnt := len(ck.servers)
for {
args := &PutAppendArgs{Key: key, Value: value, Op: op, ClientID: ck.id, SeqNo: ck.seq}
reply := new(PutAppendReply)

    ck.leader %= cnt
    done := make(chan bool, 1)
    go func() {
        ok := ck.servers[ck.leader].Call("KVServer.PutAppend", args, reply)
        done <- ok
    }()
    select {
    case <-time.After(200 * time.Millisecond): // rpc timeout: 200ms
        ck.leader++
        continue
    case ok := <-done:
        if ok && !reply.WrongLeader && reply.Err == OK {
            ck.seq++
            return
        }
        ck.leader++
    }
}

}</pre>

Server

  • kvraft/server.go文件用于书写我们的客户端代码

  • KVServer结构是对于之前书写的raft架构的封装

  • applyCh chan raft.ApplyMsg 用于状态虚拟机应用coommit log,执行操作

  • db map[string]string 是模拟的一个数据库

  • notifyChs map[int]chan struct{} commandID => notify chan 状态虚拟机应用此command后,会通知此通道

  • duplicate map[int64]*LatestReply 检测重复请求

<pre style="font-family: Consolas, "Liberation Mono", Menlo, Courier, monospace; -webkit-font-smoothing: antialiased; margin: 0px; padding: 0.88889em; max-width: 100%; font-size: 0.9em; word-break: break-all; white-space: pre-line; font-style: normal; font-variant: normal; font-weight: normal; font-stretch: normal; line-height: 1.45; color: rgb(86, 116, 130); word-wrap: normal; background: rgb(246, 246, 246); overflow: auto; letter-spacing: normal; orphans: 2; text-align: left; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">type KVServer struct {
...
rf *raft.Raft
applyCh chan raft.ApplyMsg
// Your definitions here.
persist raft.Persister
db map[string]string
notifyChs map[int]chan struct{} // per log
// duplication detection table
duplicate map[int64]
LatestReply
}</pre>

完成PutAppend、Get方法

  • 下面以PutAppend为例,Get方法类似

  • 检测当前是否leader状态

  • 检测是否重复请求

  • 将此command通过rf.Start(cmd) 放入raft中

  • select等待直到ch被激活,即command index被此kv服务器应用

  • ch被激活后,需要再次检测当前节点是否为leader

    • 如果不是,说明leader更换,立即返回错误,这时由于如果不再是leader,那么虽然此kv服务器应用了此command index,但不一定是相同的command

    • 这个时候会堵塞直到序号为commandIndex的命令被应用,但是,如果leader更换,此commandIndex的命令不一定就是我们的当前的命令

    • 但是完全有可能新的leader已经应用了此状态,我们这时候虽然仍然返回错误,希望客户端重试,这是由于操作是幂等的并且重复操作无影响。

    • 优化方案是为command指定一个唯一的标识,这样就能够明确此特定操作是否被应用

<pre style="font-family: Consolas, "Liberation Mono", Menlo, Courier, monospace; -webkit-font-smoothing: antialiased; margin: 0px; padding: 0.88889em; max-width: 100%; font-size: 0.9em; word-break: break-all; white-space: pre-line; font-style: normal; font-variant: normal; font-weight: normal; font-stretch: normal; line-height: 1.45; color: rgb(86, 116, 130); word-wrap: normal; background: rgb(246, 246, 246); overflow: auto; letter-spacing: normal; orphans: 2; text-align: left; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
// Your code here.
// not leader
if _, isLeader := kv.rf.GetState(); !isLeader {
reply.WrongLeader = true
reply.Err = ""
return
}

DPrintf("[%d]: leader %d receive rpc: PutAppend(%q => (%q,%q), (%d-%d).\n", kv.me, kv.me,
    args.Op, args.Key, args.Value, args.ClientID, args.SeqNo)

kv.mu.Lock()
// duplicate put/append request
if dup, ok := kv.duplicate[args.ClientID]; ok {
    // filter duplicate
    if args.SeqNo <= dup.Seq {
        kv.mu.Unlock()
        reply.WrongLeader = false
        reply.Err = OK
        return
    }
}

// new request
cmd := Op{Key: args.Key, Value: args.Value, Op: args.Op, ClientID: args.ClientID, SeqNo: args.SeqNo}
index, term, _ := kv.rf.Start(cmd)
ch := make(chan struct{})
kv.notifyChs[index] = ch
kv.mu.Unlock()

reply.WrongLeader = false
reply.Err = OK

// wait for Raft to complete agreement
select {
case <-ch:
    // lose leadership
    curTerm, isLeader := kv.rf.GetState()
    if !isLeader || term != curTerm {
        reply.WrongLeader = true
        reply.Err = ""
        return
    }
case <-kv.shutdownCh:
    return
}

}</pre>

完成对于log的应用操作

  • &lt;-kv.applyCh 是当log成为commit状态时,状态机对于log的应用操作

  • 本系列构建的为kv-raft服务,根据不同的服务其应用操作的方式不同

  • 下面的操作是简单的操作内存map数据库

  • 同时,将最后一个操作记录下来,避免同一个log应用了两次。

<pre style="font-family: Consolas, "Liberation Mono", Menlo, Courier, monospace; -webkit-font-smoothing: antialiased; margin: 0px; padding: 0.88889em; max-width: 100%; font-size: 0.9em; word-break: break-all; white-space: pre-line; font-style: normal; font-variant: normal; font-weight: normal; font-stretch: normal; line-height: 1.45; color: rgb(86, 116, 130); word-wrap: normal; background: rgb(246, 246, 246); overflow: auto; letter-spacing: normal; orphans: 2; text-align: left; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">func (kv *KVServer) applyDaemon() {
for {
select {
case msg, ok := <-kv.applyCh:
if ok {
// have client's request? must filter duplicate command
if msg.Command != nil {
cmd := msg.Command.(Op)
kv.mu.Lock()
if dup, ok := kv.duplicate[cmd.ClientID]; !ok || dup.Seq < cmd.SeqNo {
switch cmd.Op {
case "Get":
kv.duplicate[cmd.ClientID] = &LatestReply{Seq: cmd.SeqNo,
Reply: GetReply{Value: kv.db[cmd.Key],}}
case "Put":
kv.db[cmd.Key] = cmd.Value
kv.duplicate[cmd.ClientID] = &LatestReply{Seq: cmd.SeqNo,}
case "Append":
kv.db[cmd.Key] += cmd.Value
kv.duplicate[cmd.ClientID] = &LatestReply{Seq: cmd.SeqNo,}
default:
DPrintf("[%d]: server %d receive invalid cmd: %v\n", kv.me, kv.me, cmd)
panic("invalid command operation")
}
if ok {
DPrintf("[%d]: server %d apply index: %d, cmd: %v (client: %d, dup seq: %d < %d)\n",
kv.me, kv.me, msg.CommandIndex, cmd, cmd.ClientID, dup.Seq, cmd.SeqNo)
}
}
// notify channel
if notifyCh, ok := kv.notifyChs[msg.CommandIndex]; ok && notifyCh != nil {
close(notifyCh)
delete(kv.notifyChs, msg.CommandIndex)
}
kv.mu.Unlock()
}
}
}
}
}</pre>

测试

<pre style="font-family: Consolas, "Liberation Mono", Menlo, Courier, monospace; -webkit-font-smoothing: antialiased; margin: 0px; padding: 0.88889em; max-width: 100%; font-size: 0.9em; word-break: break-all; white-space: pre-line; font-style: normal; font-variant: normal; font-weight: normal; font-stretch: normal; line-height: 1.45; color: rgb(86, 116, 130); word-wrap: normal; background: rgb(246, 246, 246); overflow: auto; letter-spacing: normal; orphans: 2; text-align: left; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">> go test -v -run=3A</pre>

  • 注意,如果上面的测试出现错误也不一定是程序本身的问题,可能是单个进程运行多个测试程序带来的影响

  • 同时,我们可以运行多次避免偶然的影响

  • 因此,如果出现了这种情况,我们可以为单个测试程序独立的运行n次,保证正确性,下面是每10个测试程序独立运行,运行n次的脚本

<pre style="font-family: Consolas, "Liberation Mono", Menlo, Courier, monospace; -webkit-font-smoothing: antialiased; margin: 0px; padding: 0.88889em; max-width: 100%; font-size: 0.9em; word-break: break-all; white-space: pre-line; font-style: normal; font-variant: normal; font-weight: normal; font-stretch: normal; line-height: 1.45; color: rgb(86, 116, 130); word-wrap: normal; background: rgb(246, 246, 246); overflow: auto; letter-spacing: normal; orphans: 2; text-align: left; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">rm -rf res
mkdir res
set int j = 0
for ((i = 0; i < 2; i++))
do
for ((c = ((i*10)); c <(( (i+1)*10)); c++))
do
(go test -v -run TestPersistPartitionUnreliableLinearizable3A) &> ./res/$c &
done

sleep 40

if grep -nr "FAIL.*raft.*" res; then
    echo "fail"
fi

done</pre>

总结

  • 在本实验中,我们封装了lab2a raft框架实现了容错的kv服务

  • 如果出现了问题,需要仔细查看log,思考问题出现的原因

  • 下一个实验中,我们将实现日志的压缩

参考资料

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,122评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,070评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,491评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,636评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,676评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,541评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,292评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,211评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,655评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,846评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,965评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,684评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,295评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,894评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,012评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,126评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,914评论 2 355

推荐阅读更多精彩内容