type OpIdentify struct { OpId int ClientId int64 }
// Put or Append type PutAppendArgs struct { Key string Value string Op string// "Put" or "Append" // You'll have to add definitions here. // Field names must start with capital letters, // otherwise RPC will break. OpIdentify }
type PutAppendReply struct { Err Err }
type GetArgs struct { Key string // You'll have to add definitions here. OpIdentify }
type GetReply struct { Err Err Value string }
Clerk的数据结构:
1 2 3 4 5 6 7 8
type Clerk struct { servers []*labrpc.ClientEnd // You will have to modify this struct. curLeader int32// NOTE:atomic operation on this. -1 means no leader found. current Leader id, maybe be modified as leader changes
func(ck *Clerk) Get(key string) string { var ( value string svr int32 opId int32 sendCnt int = 1// for debug ) opId = atomic.AddInt32(&ck.opId, 1) // get operation doesn't need this args := GetArgs{ Key: key, OpIdentify: OpIdentify{ ClientId: ck.clientId, OpId: int(opId), }, } svr = atomic.LoadInt32(&ck.curLeader) if svr == -1 { svr = ck.changeLeader() }
for { // for debugging DPrintf("Client [%v] cnt:%d, send to [%d] Get, args:%+v\n", ck.clientId, sendCnt, svr, args) sendCnt++
// send RPC to server var reply GetReply // NOTE: we have to define reply here, if define out of `for loop`, labgorpc will complain ok := ck.servers[svr].Call("KVServer.Get", &args, &reply) if !ok || (reply.Err != OK && reply.Err != ErrNoKey) { // try another server DPrintf("Client [%v] Get failed: Svr[%d]err:%v\n", ck.clientId, svr, reply.Err) svr = ck.changeLeader() // time.Sleep(RpcRateLimitTimeout * time.Millisecond) continue } if reply.Err == OK { value = reply.Value DPrintf("Client [%v] found key[%v] value=[%v]\n", ck.clientId, key, value) } elseif reply.Err == ErrNoKey { DPrintf("Client [%v] no such key[%v] found\n", ck.clientId, key) } else { log.Panicf("Client [%v] unknown Err:%v", ck.clientId, reply.Err) } break } return value }
func(ck *Clerk) PutAppend(key string, value string, op string) { // You will have to modify this function. var ( svr int32 opId int32 sendCnt = 1 args PutAppendArgs )
type Op struct { // Your definitions here. // Field names must start with capital letters, // otherwise RPC will break. Key string Value string OpType string// Get/Put/Append
OpIdentify }
KVServer数据结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
type KVServer struct { mu sync.Mutex me int rf *raft.Raft applyCh chan raft.ApplyMsg dead int32// set by Kill()
maxraftstate int// snapshot if log grows this big
// Your definitions here. dataBase map[string]string opRecord map[int64]int// key:clientid, value:max opid .record what operations have been executed by which client before, to avoid duplicate executing
doneChanMap map[int]chan InternalReply curApplyIndex int// for debug, use this to do consistency check debugApplyHistory []InternalReply // for debug }
func(kv *KVServer) PutAppend(args *PutAppendArgs, reply *InternalReply) { reply.Err = ErrShutDown if kv.killed() { DPrintf("[%d] return with Shutdown\n", kv.me) return }
DPrintf("[%d] <-- [%d] Server PutAppend start...%+v\n", kv.me, args.ClientId, args) // avoid duplicate executing kv.mu.Lock() if _, ok := kv.opRecord[args.ClientId]; ok { // check duplicate if args.OpId <= kv.opRecord[args.ClientId] { DPrintf("[%d] executed PutAppend before...\n", kv.me) kv.mu.Unlock() reply.Err = OK return } } kv.mu.Unlock()
// init reply reply.Err = ErrWrongLeader op := Op{ Key: args.Key, Value: args.Value, OpType: args.Op, // Put Or Append OpIdentify: OpIdentify{ ClientId: args.ClientId, OpId: args.OpId, }, }
kv.mu.Lock() // must start before Start, altough Start interface is thread-safe idx, term, isLeader := kv.rf.Start(op) if !isLeader { DPrintf("[%d] I am not leader\n", kv.me) kv.mu.Unlock() return } // build channel var replyCh chan InternalReply if _, ok := kv.doneChanMap[idx]; !ok { DPrintf("[%d] create channel at index[%d]\n", kv.me, idx) kv.doneChanMap[idx] = make(chan InternalReply) replyCh = kv.doneChanMap[idx] kv.mu.Unlock() } else { // Started a command before DPrintf("[%d] PutAppend, more than 1 command at the same idx(%d)\n", kv.me, idx) replyCh := kv.doneChanMap[idx] kv.mu.Unlock() replyCh <- InternalReply{ Err: ErrWrongLeader, // send fake msg to old routine } } // wait on channel timer := time.NewTicker(commitLogTimeout * time.Millisecond) select { case tmpReply := <-replyCh: // check for invariant condition curTerm, isLeader := kv.rf.GetState() if curTerm != term || !isLeader { DPrintf("[%d] <-- [%d] Server PutAppend fail end, term(%d), curTerm(%d), isLeader(%v), tmpReply.Op(%v), args.Op(%v)\n", kv.me, args.ClientId, term, curTerm, isLeader, tmpReply.OpIdentify, args.OpIdentify) return } reply.Err = tmpReply.Err // OK DPrintf("[%d] <-- [%d] Server PutAppend end\n", kv.me, args.ClientId) // for debug if reply.Err != OK { log.Panicf("[%d] <-- [%d] Server PutAppend end, but Errcode is not OK\n", kv.me, args.ClientId) } case <-timer.C: DPrintf("[%d] time up, give up receive PUT/APPEND reply\n", kv.me) reply.Err = ErrWrongLeader kv.mu.Lock() delete(kv.doneChanMap, idx) kv.mu.Unlock() } }
这里比较迷惑的是以下几行代码:
1 2 3 4 5
DPrintf("[%d] PutAppend, more than 1 command at the same idx(%d)\n", kv.me, idx) replyCh := kv.doneChanMap[idx] kv.mu.Unlock() replyCh <- InternalReply{ Err: ErrWrongLeader, // send fake msg to old routine
这几行的目的就是为了解决前文提到的问题4。
另外还需要注意一些不变量检查:
1 2 3 4 5 6
// check for invariant condition curTerm, isLeader := kv.rf.GetState() if curTerm != term || !isLeader { DPrintf("[%d] <-- [%d] Server PutAppend fail end, term(%d), curTerm(%d), isLeader(%v), tmpReply.Op(%v), args.Op(%v)\n", kv.me, args.ClientId, term, curTerm, isLeader, tmpReply.OpIdentify, args.OpIdentify) return }
// receive data from `kv.applyCh` func(kv *KVServer) receiver() { for { applyMsg := <-kv.applyCh if kv.killed() { break } DPrintf("[%d] get from applyCh now, applyMsg %+v\n", kv.me, applyMsg) if applyMsg.CommandValid { kv.curApplyIndex++ if kv.curApplyIndex != applyMsg.CommandIndex { log.Panicf("kv.curApplyIndex{%v} is not equal to applyMsg.CommandIndex{%v}\n", kv.curApplyIndex, applyMsg.CommandIndex) } replyOp, ok := applyMsg.Command.(Op) if !ok { log.Panicf("replyOp convert to op failed: replyOp(%+v)\n", replyOp) } DPrintf("[%d] convert to replyOp: %+v\n", kv.me, replyOp)
var reply InternalReply reply.OpIdentify = replyOp.OpIdentify var replyCh chan InternalReply switch replyOp.OpType { case"Get": kv.doGet(replyOp, &reply) DPrintf("[%d] execute Get done\n", kv.me) case"Put", "Append": kv.doPutAppend(replyOp, &reply) DPrintf("[%d] execute Put/Append done\n", kv.me) default: log.Panicf("unknown op type%v\n", replyOp.OpType) }
kv.mu.Lock() // for debug kv.debugApplyHistory = append(kv.debugApplyHistory, reply) DPrintf("[%v] apply history(len:%d) %+v\n", kv.me, len(kv.debugApplyHistory), kv.debugApplyHistory) // end if _, ok := kv.doneChanMap[applyMsg.CommandIndex]; !ok { } else { replyCh = kv.doneChanMap[applyMsg.CommandIndex] kv.mu.Unlock()
DPrintf("[%d] try to send command[%v] through doneChanMap \n", kv.me, replyOp) replyCh <- reply DPrintf("[%d] sended command[%v] through doneChanMap \n", kv.me, replyOp)