8.MIT 6.824 LAB 4B(分布式shard database)

第一步 阅读4B的文档

弄清楚里面的每个段落

第二步 基于LAB3,写出可以通过单GROUP的代码

Your first task is to pass the very first shardkv test. In this test, there is only a single assignment of shards, so your code should be very similar to that of your Lab 3 server. The biggest modification will be to have your server detect when a configuration happens and start accepting requests whose keys match shards that it now owns.

基于LAB3,把代码可以抄的都抄过去


image.png

抄代码就不展示了。基本大多数都是把代码复制过去。

因为这边的CLIENT 给了代码 是需要看ERR这个属性的。


image.png

所以和LAB3不同(那里我没用这个REPLY ERR的属性),需要加上返回值。


image.png

很快,测试1通过了


image.png

第三步 增加拉CONFIG,和拒绝不属于自己的SHARD的代码

来自HINT

Add code to server.go to periodically fetch the latest configuration from the shardmaster, and add code to reject client requests if the receiving group isn't responsible for the client's key's shard. You should still pass the first test.

image.png

image.png

image.png

image.png

测试之后,依然确保可以过第一个TEST

第四步 思考

思考 CONFIG变化后,如何转移SHARD

我的思考是这样的。如果一个REPLICA GROUP A得到一个SHARD 1,对应B 失去一个SHARD 1
如果是A检测到多了,去等待别人来发给我,会比较被动。因为不知道要等多久。
其次B还需要发现自己失去SHARD 1后,要主动去发给A,这增加了B 的工作量。
因为我们这边在做SHARD MIGRATION的时候,是不能响应请求。但对B来说,他可以立刻更新CONFIG,即使没把SHARD 1发送出去,他也是可以响应请求。
但对A来说,一定要拿到SHARD 1后,它才可以继续服务。
基于上述思考,决定让A去问B要SHARD,这样会简化设计。因为这样做B可以如果发现了新的CONFIG,可以直接更新让它立刻生效。A需要等待PULL成功后,更新CONFIG让它生效。

Reconfiguration 会影响到 PutAppend/Get,因此同样需要利用 raft 保证 group 内的一致性,确保集群内完成了之前的操作后同时进行Reconfiguration;

要思考的第二个点,是传输SHARD的RPC。

首先SHARD DATA是一定要发过去的。
但是只是发SHARD DATA是不够的。
比如一个APPEND REQUEST 在向A SERVER发送的时候,TIMEOUT了。这个时候A server 已经做了这个更新操作。
在这个点之后,Reconfiguration 发生,CLIENT 去问B SERVER发送APPEND REQ。如果只是SHARD DATA过去。会造成APPEND2次。
所以我们还需要把去重的MAP也一起发过去。

发过去的参数除了要诉说我要哪个SHARD之外,还需要加上CONFIG NUM,因为有可能我发的CONFIG NUM比那边还要大,说明那边的CONFIG还没同步到。
基于上述思路。我设计的RPC如下。


image.png

第五步 实现MIGRATE SHARD RPC HANDLER

这里有个很重要的思路,每个RAFT GROUP都是由LEADER负责发送和接受RPC。FOLLOWER只负责从APPLY MSG里去和LEADER SYNC状态。

还有一个点,就是我们不能直接从DB里去取数据,如果我们没有实现清理数据的前提下,因为数据不清理。所以我们会有多的数据,想象一下。我们先接受SHARD1,然后不接受,再重新接受SHARD1,此时做迁移,会是一个并集。而我们只是希望是重新接受的那部分。基于上述考虑。我们需要基于每一个CONFIG,单独把要迁移的数据给抽出来。这样依据CONFIG来做迁移。


image.png

第六步 思考难点

如何去PULL DATA?如果我们选择让LEADER去交互,我们必须要HANDLER RAFT Leader挂掉,得有新的LEADER来负责PULL DATA。
所以在所有节点上必须得存好要问哪里去PULL DATA。如果PULL到,我们需要确保LEADER会往RAFT里发CMD(这个CMD是让节点同步数据,同时删掉那个维护的哪里去PULL DATA的地方)

而且我们必须额外开一个后台进程与循环的做这件事。不然LEADER转移过去之后,就没有人PULL DATA了。 因为PULL DATA 这件事是没有CLIENT超时重试的。

因为要后台循环去PULL DATA,我们拿到DATA后,送进RAFT,再进入到APPLY CH,需要所有的节点都可以同步这个数据。一旦同步成功,我们需要清理这个要等待的数据。这样后台线程可以少发很多无用的RPC。

同时我们在索要数据的时候也要知道往哪个REPLICA GROUP要。


image.png
image.png
image.png
image.png

第7步

目前我们已经再LEADER端,把收到的新的CONFIG和拿到的MIGRATION DATA打给放进RAFT的LOG去做线性一致的排序。

所以当这个2个消息从APPLY MSG出来的时候,需要去做一些事情。
为此,我单独开了一个函数去写APPLY的逻辑


image.png

第8步 实现APPLY MSG 是MIGRATION DATA REPLY

这里的点(后面大量调试获得的),因为REPLY 发到RAFT里面,虽然有顺序,但返回的时候顺序可能是乱的。比如现在我的CONFIG已经更新到9,这个时候RAFT才把CONFIG的6 返回回来。我们应该直接忽略这个版本。如果更新了,就会产生不一致。

那么依据乱序思想,我们不得不CHECK 就是当前REPLY的CONFIG版本号必须是当前CONFIG版本号小一个。

为什么?

这里我们在收到CONFIG 变更,我们就会刷新CONFIG。但是此时CONFIG刷新之后,我们会更新COME IN SHARD,随后后台线程会去PULL。从更新COME IN SHARD到数据SHARD过来,这段时间内,我们必须得拒绝掉所有的索要该SHARD的请求。所以我们不能直接从CONFIG来判断是不是WRONG GROUP。

至此,我们需要额外再维护一个我现在能HANDLER哪些SHARD的数据结构。


image.png

那么发出去的SHARD,我可以直接从这个数据结构里删掉。要进来的话,等真的进来了,再添加到这个数据结构中。


image.png

判断是不是WRONG GROUP,也依据这个数据结构来看。

第9步 实现updateInAndOutDataShard

这边我们会根据新的CONFIG来,判断自己要送出去的数据是哪些,自己要接受进来的数组是哪些。在我的设计里这2个数据结构必要性在第5,6步讨论过了。


image.png

第十步 判断WRONG GROUP的时机

在前面的版本中,我们是在SERVER端接受到请求的时候,就直接去依据CONFIG判断WRONG GROUP。现在我们改成依据MYSHARD来看,但是这还是不足的。

还记得我们在做LAB 3的时候,判断去重,必须得再消息回来的时候再看一次。 因为可能在请求发送的时候,数据还在REPLICA GROUP 1。可是到消息从RAFT返回来的时候,当中发生过更新CONFIG。数据不再GROUP 1了。所以要把判断WRONG GROUP的逻辑,加在数据返回层。

同时因为数据会在APPLY CH收到新的CONFIG,一部分要TO OUT的数据就会从DB里DELETE掉。为了确保NOTIFY CH的传输过程中,这个DB的更改不会影响到实际的GET的返回值。我们需要在接到APPLY CH的时候就把结果给注入到OP里。不然等OP发过去再从DB拿,有一定概率此时另一个线程已经再DELETE DB了。


image.png

image.png

image.png

同时根据这个思路,我把SHARD MASTER的QUERY 也加在返回层来做。


image.png

第11步 初始化新加的属性

image.png

第12步 更新POLL NEW CONFIG代码,需要一个个更新

来自HINT

Process re-configurations one at a time, in order.

同时注意如果当前CONFIG,那些需要转移的SHARD还没做完。不要立刻去拿下一个CONFIG。


image.png

image.png

第13步 测试JOIN AND LEAVE

发现有时可以过,有时过不了会阻塞。

BUG 1 死锁

通过几小时的调试发现,是一个3维死锁。首先RAFT里面拿了RAFT的锁,阻塞在APPLY CH那。APPLY CH的后台线程阻塞在KV SERVER的锁上。 还有一个PULL CONFIG的线程,持有了KV SERVER的锁,阻塞在RAFT的锁上。

image.png

FIX方法,交换代码顺序。


image.png

测试通过

但是写了这么多代码,很多地方都没注意保护共享变量。所以用TEST DATA RACE的时候会出问题。

检查思路,先看3个后台进程,随后看几个RPC handler
这边就自己加一下锁吧。
GO TEST RACE OK之后,会在第三个测试败掉。是SNAPSHOT

我们需要存储更多的状态进SNAPSHOT。

第14步 实现新的SNAPSHOT

image.png

image.png

image.png

再直接测试,发现阻塞在UNRELIABLE 3


image.png

BUG 2 一处地方没有释放锁

image.png

修复后重新对这个CASE单独测试100次通过,测全集。只剩下CHANLLEGE 1,DELETE的TASK了


image.png

第15步 思考如何删除不必要的状态

在上面的实现里,我们开了3个数据结构,一个是TO OUT,一个是COME IN,一个是MY SHARD;
第三个是固定大小的。不用考虑
第二个,我们已经再接受到DATA之后会去删除它。
唯一没有回收的就是第一个。

最NAIVE的实现是当我们把数据当做REPLY发过去的时候,就直接删掉。这是危险的。因为很有可能这个消息会丢失,被那边服务器拒绝,造成这个数据就永远不会被回收。

正确的做法是等到对方服务器,成功接收了DATA,然后删除了对应的COME IN,这个时候应该发REQUEST告诉TO OUT一方,你可以安全的把TO OUT里的这块DATA给回收了。

但是依然存在RPC会丢失的情况。和PULL的思想一样。(用一个COME IN LIST+ 后台线程,来不断重试,成功时候删除COME IN LIST内容,就不再去PULL直到有新的COME IN来。失败的话,因为COME IN 内容还在,就会自动重试,不怕网络不稳定)

那么我针对这个CASE,用相同的套路。后台GC线程+Garbage List.

具体思路就是当COME IN 的DATA收到后,我们要把这块数据标记进Garbage List。 后台GC线程发现Garbage List有内容,就会往对应的GROUP发送GC RPC。对应的GROUP清理成功后,REPLY告知。我们把Garbage List对应的内容删除。

同样我们依然只和LEADER交互,并且利用RAFT LOG,来确保所有节点都成功删除GARBAGE,再RPC回复SUCCESS

第16步 写GC RPC HANDLER,抽一个TEMPLATE

发现可以用ERR 里面加一个WRONGLEADER来代表LEADER不对。就可以去掉一个参数。

当OP TYPE是GC的时候,KEY 是CONFIG NUM,SEQNUM是SHARD。


image.png
image.png
image.png

第17步 实现GC

image.png

image.png

第18步 实现GC后台进程

image.png
image.png

第19步 往GARBAGE里添加值

image.png

第20步,更新SNAPSHOT

这里小伙伴自行更新吧

测试通过

image.png

第21步

因为我的REPLICA GROUP里会往MASTER 发送QUERY请求,这个时候可能会造成LAST LEADER的DATA RACE。
所以我用原子方法改写。


image.png

第22步 CONCISE代码

1.我把WRONDLEADER给去掉了。同时用ERR 的WRONGLEADER来表示。


image.png

2.把几个RPC HANDLER用TEMPLATE 提取公有逻辑


image.png

最终430 行代码

GO TEST 测试200次

这个测试不适合并行,因为会大量开线程在做。并行测试会造成有些CASE跑的巨慢。所以串行测试了。
同时CONFIG里因为MASTER的资源没有回收。越到后面TEST 跑的越慢,我加了如下代码来提速测试


image.png

基本跑完一整套是2分钟


image.png

测试200次的结果 是

BUG 3

TestChallenge2Unaffected 会有1/50的概率阻塞。

经过打LOG 发现,是还没来得及把所有DATA SHARD完,超过了1秒,之后就有数据再也MIGRATE不过来。造成拿不到而阻塞。

这里分享一个打LOG的技巧,避免淹没在茫茫LOG海里。就是出了问题,再打LOG


image.png

image.png
image.png
image.png

原因如下:


image.png

下图这个4号数据块 在测试里属于101的OWN,但是还没来得及拿到,100的网就断了。再也取不到了。


image.png

解决方案,加快PULL的频率


image.png

加快QUERY CONFIG的速度。思路是如果是拿已知的CONFIG,因为CONFIG的APPEND不会修改,所以可以直接返回。


image.png

缩短MASTER CLIENT的睡眠时间。


image.png

测试200次后无阻塞。

SHARD KV 测试200次通过

测试脚本

#!/bin/bash

export GOPATH="/home/zyx/Desktop/mit6.824/6.824"
export PATH="$PATH:/usr/lib/go-1.9/bin"

rm res -rf
mkdir res
for ((i = 0; i < 200; i++))
do
echo $i
(go test) > ./res/$i
grep -nr "FAIL.*" res
done
image.png

回归测试SHARD MASTER500次通过

image.png

回归测试RAFT 300次通过

有2次时之前说的不是代码问题的KNOWN ISSUE,具体参考文集的2C部分


image.png

回归测试KVRAFT 210次通过

image.png

CONCISE SERVER

package shardkv

import (
    "bytes"
    "labrpc"
    "log"
    "shardmaster"
    "strconv"
    "time"
)
import "raft"
import "sync"
import "labgob"

type Op struct {
    OpType  string  "operation type(eg. put/append/gc/get)"
    Key     string  "key for normal, config num for gc"
    Value   string
    Cid     int64   "cid for put/append, operation uid for get/gc"
    SeqNum  int     "seqnum for put/append, shard for gc"
}

type ShardKV struct {
    mu           sync.Mutex
    me           int
    rf           *raft.Raft
    applyCh      chan raft.ApplyMsg

    make_end     func(string) *labrpc.ClientEnd
    gid          int
    masters      []*labrpc.ClientEnd
    maxraftstate int // snapshot if log grows this big
    // Your definitions here.
    mck             *shardmaster.Clerk
    cfg             shardmaster.Config
    persist         *raft.Persister
    db              map[string]string
    chMap           map[int]chan Op
    cid2Seq         map[int64]int

    toOutShards     map[int]map[int]map[string]string "cfg num -> (shard -> db)"
    comeInShards    map[int]int     "shard->config number"
    myShards        map[int]bool    "to record which shard i can offer service"
    garbages        map[int]map[int]bool              "cfg number -> shards"

    killCh      chan bool
}

func (kv *ShardKV) Get(args *GetArgs, reply *GetReply) {
    originOp := Op{"Get",args.Key,"",Nrand(),0}
    reply.Err,reply.Value = kv.templateStart(originOp)
}
func (kv *ShardKV) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
    originOp := Op{args.Op,args.Key,args.Value,args.Cid,args.SeqNum}
    reply.Err,_ = kv.templateStart(originOp)
}
func (kv *ShardKV) templateStart(originOp Op) (Err, string) {
    index,_,isLeader := kv.rf.Start(originOp)
    if isLeader {
        ch := kv.put(index, true)
        op := kv.beNotified(ch, index)
        if equalOp(originOp, op) { return OK, op.Value }
        if op.OpType == ErrWrongGroup { return ErrWrongGroup, "" }
    }
    return ErrWrongLeader,""
}
func (kv *ShardKV) GarbageCollection(args *MigrateArgs, reply *MigrateReply) {
    reply.Err = ErrWrongLeader
    if _, isLeader := kv.rf.GetState(); !isLeader {return}
    kv.mu.Lock()
    defer kv.mu.Unlock()
    if _,ok := kv.toOutShards[args.ConfigNum]; !ok {return}
    if _,ok := kv.toOutShards[args.ConfigNum][args.Shard]; !ok {return}
    originOp := Op{"GC",strconv.Itoa(args.ConfigNum),"",Nrand(),args.Shard}
    kv.mu.Unlock()
    reply.Err,_ = kv.templateStart(originOp)
    kv.mu.Lock()
}

func (kv *ShardKV) ShardMigration(args *MigrateArgs, reply *MigrateReply) {
    reply.Err, reply.Shard, reply.ConfigNum = ErrWrongLeader, args.Shard, args.ConfigNum
    if _,isLeader := kv.rf.GetState(); !isLeader {return}
    kv.mu.Lock()
    defer kv.mu.Unlock()
    reply.Err = ErrWrongGroup
    if args.ConfigNum >= kv.cfg.Num {return}
    reply.Err,reply.ConfigNum, reply.Shard = OK, args.ConfigNum, args.Shard
    reply.DB, reply.Cid2Seq = kv.deepCopyDBAndDedupMap(args.ConfigNum,args.Shard)
}
func (kv *ShardKV) deepCopyDBAndDedupMap(config int,shard int) (map[string]string, map[int64]int) {
    db2 := make(map[string]string)
    cid2Seq2 := make(map[int64]int)
    for k, v := range kv.toOutShards[config][shard] {
        db2[k] = v
    }
    for k, v := range kv.cid2Seq {
        cid2Seq2[k] = v
    }
    return db2, cid2Seq2
}

func (kv *ShardKV) beNotified(ch chan Op,index int) Op{
    select {
    case notifyArg,ok := <- ch :
        if ok {
            close(ch)
        }
        kv.mu.Lock()
        delete(kv.chMap,index)
        kv.mu.Unlock()
        return notifyArg
    case <- time.After(time.Duration(1000)*time.Millisecond):
        return Op{}
    }
}
func (kv *ShardKV) put(idx int,createIfNotExists bool) chan Op{
    kv.mu.Lock()
    defer kv.mu.Unlock()
    if _, ok := kv.chMap[idx]; !ok {
        if !createIfNotExists {return nil}
        kv.chMap[idx] = make(chan Op,1)
    }
    return kv.chMap[idx]
}
func equalOp(a Op, b Op) bool{
    return a.Key == b.Key &&  a.OpType == b.OpType && a.SeqNum == b.SeqNum && a.Cid == b.Cid
}

func (kv *ShardKV) Kill() {
    kv.rf.Kill()
    select{
    case  <-kv.killCh:
    default:
    }
    kv.killCh <- true
}

func (kv *ShardKV) readSnapShot(snapshot []byte) {
    kv.mu.Lock()
    defer kv.mu.Unlock()
    if snapshot == nil || len(snapshot) < 1 {return}
    r := bytes.NewBuffer(snapshot)
    d := labgob.NewDecoder(r)
    var db map[string]string
    var cid2Seq map[int64]int
    var toOutShards map[int]map[int]map[string]string
    var comeInShards map[int]int
    var myShards    map[int]bool
    var garbages    map[int]map[int]bool
    var cfg shardmaster.Config
    if d.Decode(&db) != nil || d.Decode(&cid2Seq) != nil || d.Decode(&comeInShards) != nil ||
        d.Decode(&toOutShards) != nil || d.Decode(&myShards) != nil || d.Decode(&cfg) != nil ||
        d.Decode(&garbages) != nil {
        log.Fatal("readSnapShot ERROR for server %v",kv.me)
    } else {
        kv.db, kv.cid2Seq, kv.cfg = db, cid2Seq, cfg
        kv.toOutShards, kv.comeInShards, kv.myShards, kv.garbages = toOutShards,comeInShards,myShards,garbages
    }
}

func (kv *ShardKV) needSnapShot() bool {
    kv.mu.Lock()
    defer kv.mu.Unlock()
    threshold := 10
    return kv.maxraftstate > 0 &&
        kv.maxraftstate - kv.persist.RaftStateSize() < kv.maxraftstate/threshold
}

func (kv *ShardKV) doSnapShot(index int) {
    w := new(bytes.Buffer)
    e := labgob.NewEncoder(w)
    kv.mu.Lock()
    e.Encode(kv.db)
    e.Encode(kv.cid2Seq)
    e.Encode(kv.comeInShards)
    e.Encode(kv.toOutShards)
    e.Encode(kv.myShards)
    e.Encode(kv.cfg)
    e.Encode(kv.garbages)
    kv.mu.Unlock()
    kv.rf.DoSnapShot(index,w.Bytes())
}



func (kv *ShardKV) tryPollNewCfg() {
    _, isLeader := kv.rf.GetState();
    kv.mu.Lock()
    if !isLeader || len(kv.comeInShards) > 0{
        kv.mu.Unlock()
        return
    }
    next := kv.cfg.Num + 1
    kv.mu.Unlock()
    cfg := kv.mck.Query(next)
    if cfg.Num == next {
        kv.rf.Start(cfg) //sync follower with new cfg
    }
}
func (kv *ShardKV) tryGC() {
    _, isLeader := kv.rf.GetState();
    kv.mu.Lock()
    if !isLeader || len(kv.garbages) == 0{
        kv.mu.Unlock()
        return
    }
    var wait sync.WaitGroup
    for cfgNum, shards := range kv.garbages {
        for shard := range shards {
            wait.Add(1)
            go func(shard int, cfg shardmaster.Config) {
                defer wait.Done()
                args := MigrateArgs{shard, cfg.Num}
                gid := cfg.Shards[shard]
                for _, server := range cfg.Groups[gid] {
                    srv := kv.make_end(server)
                    reply := MigrateReply{}
                    if ok := srv.Call("ShardKV.GarbageCollection", &args, &reply); ok && reply.Err == OK {
                        kv.mu.Lock()
                        defer kv.mu.Unlock()
                        delete(kv.garbages[cfgNum], shard)
                        if len(kv.garbages[cfgNum]) == 0 {
                            delete(kv.garbages, cfgNum)
                        }
                    }
                }
            }(shard, kv.mck.Query(cfgNum))
        }
    }
    kv.mu.Unlock()
    wait.Wait()
}
func (kv *ShardKV) tryPullShard() {
    _, isLeader := kv.rf.GetState();
    kv.mu.Lock()
    if  !isLeader || len(kv.comeInShards) == 0 {
        kv.mu.Unlock()
        return
    }
    var wait sync.WaitGroup
    for shard, idx := range kv.comeInShards {
        wait.Add(1)
        go func(shard int, cfg shardmaster.Config) {
            defer wait.Done()
            args := MigrateArgs{shard, cfg.Num}
            gid := cfg.Shards[shard]
            for _, server := range cfg.Groups[gid] {
                srv := kv.make_end(server)
                reply := MigrateReply{}
                if ok := srv.Call("ShardKV.ShardMigration", &args, &reply); ok && reply.Err == OK {
                    kv.rf.Start(reply)
                }

            }
        }(shard, kv.mck.Query(idx))
    }
    kv.mu.Unlock()
    wait.Wait()
}

func (kv *ShardKV) daemon(do func(), sleepMS int) {
    for {
        select {
        case <-kv.killCh:
            return
        default:
            do()
        }
        time.Sleep(time.Duration(sleepMS) * time.Millisecond)
    }
}

func (kv *ShardKV) apply(applyMsg raft.ApplyMsg) {
    if cfg, ok := applyMsg.Command.(shardmaster.Config); ok {
        kv.updateInAndOutDataShard(cfg)
    } else if migrationData, ok := applyMsg.Command.(MigrateReply); ok{
        kv.updateDBWithMigrateData(migrationData)
    }else {
        op := applyMsg.Command.(Op)
        if op.OpType == "GC" {
            cfgNum,_ := strconv.Atoi(op.Key)
            kv.gc(cfgNum,op.SeqNum);
        } else {
            kv.normal(&op)
        }
        if notifyCh := kv.put(applyMsg.CommandIndex,false); notifyCh != nil {
            send(notifyCh,op)
        }
    }
    if kv.needSnapShot() {
        go kv.doSnapShot(applyMsg.CommandIndex)
    }

}

func (kv *ShardKV) gc(cfgNum int, shard int) {
    kv.mu.Lock()
    defer kv.mu.Unlock()
    if _, ok := kv.toOutShards[cfgNum]; ok {
        delete(kv.toOutShards[cfgNum], shard)
        if len(kv.toOutShards[cfgNum]) == 0 {
            delete(kv.toOutShards, cfgNum)
        }
    }
}

func (kv *ShardKV) updateInAndOutDataShard(cfg shardmaster.Config) {
    kv.mu.Lock()
    defer kv.mu.Unlock()
    if cfg.Num <= kv.cfg.Num { //only consider newer config
        return
    }
    oldCfg, toOutShard := kv.cfg, kv.myShards
    kv.myShards, kv.cfg = make(map[int]bool), cfg
    for shard, gid := range cfg.Shards {
        if gid != kv.gid {continue}
        if _, ok := toOutShard[shard]; ok || oldCfg.Num == 0 {
            kv.myShards[shard] = true
            delete(toOutShard, shard)
        } else {
            kv.comeInShards[shard] = oldCfg.Num
        }
    }
    if len(toOutShard) > 0 { // prepare data that needed migration
        kv.toOutShards[oldCfg.Num] = make(map[int]map[string]string)
        for shard := range toOutShard {
            outDb := make(map[string]string)
            for k, v := range kv.db {
                if key2shard(k) == shard {
                    outDb[k] = v
                    delete(kv.db, k)
                }
            }
            kv.toOutShards[oldCfg.Num][shard] = outDb
        }
    }
}

func (kv *ShardKV) updateDBWithMigrateData(migrationData MigrateReply) {
    kv.mu.Lock()
    defer kv.mu.Unlock()
    if migrationData.ConfigNum != kv.cfg.Num-1 {return}
    delete(kv.comeInShards, migrationData.Shard)
    //this check is necessary, to avoid use  kv.cfg.Num-1 to update kv.cfg.Num's shard
    if _, ok := kv.myShards[migrationData.Shard]; !ok {
        kv.myShards[migrationData.Shard] = true
        for k, v := range migrationData.DB {
            kv.db[k] = v
        }
        for k, v := range migrationData.Cid2Seq {
            kv.cid2Seq[k] = Max(v,kv.cid2Seq[k])
        }
        if _, ok := kv.garbages[migrationData.ConfigNum]; !ok {
            kv.garbages[migrationData.ConfigNum] = make(map[int]bool)
        }
        kv.garbages[migrationData.ConfigNum][migrationData.Shard] = true
    }
}

func (kv *ShardKV) normal(op *Op) {
    shard := key2shard(op.Key)
    kv.mu.Lock()
    if _, ok := kv.myShards[shard]; !ok {
        op.OpType = ErrWrongGroup
    } else {
        maxSeq,found := kv.cid2Seq[op.Cid]
        if !found || op.SeqNum > maxSeq {
            if op.OpType == "Put" {
                kv.db[op.Key] = op.Value
            } else if op.OpType == "Append" {
                kv.db[op.Key] += op.Value
            }
            kv.cid2Seq[op.Cid] = op.SeqNum
        }
        if op.OpType == "Get" {
            op.Value = kv.db[op.Key]
        }
    }
    kv.mu.Unlock()
}

func send(notifyCh chan Op,op Op) {
    select{
    case  <-notifyCh:
    default:
    }
    notifyCh <- op
}

func StartServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister, maxraftstate int, gid int, masters []*labrpc.ClientEnd, make_end func(string) *labrpc.ClientEnd) *ShardKV {
    // call labgob.Register on structures you want
    // Go's RPC library to marshall/unmarshall.
    labgob.Register(Op{})
    labgob.Register(MigrateArgs{})
    labgob.Register(MigrateReply{})
    labgob.Register(shardmaster.Config{})

    kv := new(ShardKV)
    kv.me = me
    kv.maxraftstate = maxraftstate
    kv.make_end = make_end
    kv.gid = gid
    kv.masters = masters
    // Your initialization code here.
    kv.persist = persister
    // Use something like this to talk to the shardmaster:
    kv.mck = shardmaster.MakeClerk(kv.masters)
    kv.cfg = shardmaster.Config{}

    kv.db = make(map[string]string)
    kv.chMap = make(map[int]chan Op)
    kv.cid2Seq = make(map[int64]int)

    kv.toOutShards = make(map[int]map[int]map[string]string)
    kv.comeInShards = make(map[int]int)
    kv.myShards = make(map[int]bool)
    kv.garbages = make(map[int]map[int]bool)

    kv.readSnapShot(kv.persist.ReadSnapshot())

    kv.applyCh = make(chan raft.ApplyMsg)
    kv.rf = raft.Make(servers, me, persister, kv.applyCh)
    kv.killCh = make(chan bool,1)
    go kv.daemon(kv.tryPollNewCfg,50)
    go kv.daemon(kv.tryPullShard,80)
    go kv.daemon(kv.tryGC,100)
    go func() {
        for {
            select {
            case <- kv.killCh:
                return
            case applyMsg := <- kv.applyCh:
                if !applyMsg.CommandValid {
                    kv.readSnapShot(applyMsg.SnapShot)
                    continue
                }
                kv.apply(applyMsg)
            }
        }
    }()
    return kv
}

CONCISE CLIENT

package shardkv

//
// client code to talk to a sharded key/value service.
//
// the client first talks to the shardmaster to find out
// the assignment of shards (keys) to groups, and then
// talks to the group that holds the key's shard.
//

import (
    "labrpc"
)
import "crypto/rand"
import "math/big"
import "shardmaster"
import "time"


func key2shard(key string) int {
    shard := 0
    if len(key) > 0 {
        shard = int(key[0])
    }
    shard %= shardmaster.NShards
    return shard
}

func Nrand() int64 {
    max := big.NewInt(int64(1) << 62)
    bigx, _ := rand.Int(rand.Reader, max)
    x := bigx.Int64()
    return x
}

type Clerk struct {
    sm       *shardmaster.Clerk
    config   shardmaster.Config
    make_end func(string) *labrpc.ClientEnd
    // You will have to modify this struct.
    lastLeader  int
    id          int64
    seqNum      int
}


func MakeClerk(masters []*labrpc.ClientEnd, make_end func(string) *labrpc.ClientEnd) *Clerk {
    ck := new(Clerk)
    ck.sm = shardmaster.MakeClerk(masters)
    ck.make_end = make_end
    // You'll have to add code here.
    ck.id = Nrand()//give each client a unique identifier, and then have them
    ck.seqNum = 0// tag each request with a monotonically increasing sequence number.
    ck.lastLeader = 0
    return ck
}

//
// fetch the current value for a key.
// returns "" if the key does not exist.
// keeps trying forever in the face of all other errors.
// You will have to modify this function.
//
func (ck *Clerk) Get(key string) string {
    args := GetArgs{}
    args.Key = key
    for {
        shard := key2shard(key)
        gid := ck.config.Shards[shard]
        if servers, ok := ck.config.Groups[gid]; ok {
            // try each server for the shard.
            for i := 0; i < len(servers); i++ {
                si := (i + ck.lastLeader) % len(servers)
                srv := ck.make_end(servers[si])
                var reply GetReply
                ok := srv.Call("ShardKV.Get", &args, &reply)
                if ok && reply.Err == OK {
                    ck.lastLeader = si;
                    return reply.Value
                }
                if ok && (reply.Err == ErrWrongGroup) {
                    break
                }
            }
        }
        time.Sleep(100 * time.Millisecond)
        // ask master for the latest configuration.
        ck.config = ck.sm.Query(-1)
    }

}

//
// shared by Put and Append.
// You will have to modify this function.
//
func (ck *Clerk) PutAppend(key string, value string, op string) {
    args := PutAppendArgs{key,value,op,ck.id,ck.seqNum}
    ck.seqNum++
    for {
        shard := key2shard(key)
        gid := ck.config.Shards[shard]
        if servers, ok := ck.config.Groups[gid]; ok {
            for i := 0; i < len(servers); i++ {
                si := (i + ck.lastLeader) % len(servers)
                srv := ck.make_end(servers[si])
                var reply PutAppendReply
                ok := srv.Call("ShardKV.PutAppend", &args, &reply)
                if ok && reply.Err == OK {
                    ck.lastLeader = si
                    return
                }
                if ok && reply.Err == ErrWrongGroup {
                    break
                }
            }
        }
        time.Sleep(100 * time.Millisecond)
        // ask master for the latest configuration.
        ck.config = ck.sm.Query(-1)
    }
}

func (ck *Clerk) Put(key string, value string) {
    ck.PutAppend(key, value, "Put")
}
func (ck *Clerk) Append(key string, value string) {
    ck.PutAppend(key, value, "Append")
}

CONCISE COMMON

package shardkv

//
// Sharded key/value server.
// Lots of replica groups, each running op-at-a-time paxos.
// Shardmaster decides which group serves each shard.
// Shardmaster may change shard assignment from time to time.
//
// You will have to modify these definitions.
//

const (
    OK             = "OK"
    ErrWrongLeader = "ErrWrongLeader"
    ErrWrongGroup  = "ErrWrongGroup"
)

type Err string

// Put or Append
type PutAppendArgs struct {
    Key   string
    Value string
    Op    string // "Put" or "Append"
    Cid    int64 "client unique id"
    SeqNum int   "each request with a monotonically increasing sequence number"
}

type PutAppendReply struct {
    Err         Err
}

type GetArgs struct {
    Key string
}

type GetReply struct {
    Err         Err
    Value       string
}

type MigrateArgs struct {
    Shard     int
    ConfigNum int
}

type MigrateReply struct {
    Err         Err
    ConfigNum   int
    Shard       int
    DB          map[string]string
    Cid2Seq     map[int64]int
}

func Max(x, y int) int {
    if x > y {
        return x
    }
    return y
}

最后再把全部代码提交进GITHUB。待我把MAP REDUCE写了一起吧。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,544评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,430评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,764评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,193评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,216评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,182评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,063评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,917评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,329评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,543评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,722评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,425评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,019评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,671评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,825评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,729评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,614评论 2 353

推荐阅读更多精彩内容