// // save Raft's persistent state to stable storage, // where it can later be retrieved after a crash and restart. // see paper's Figure 2 for a description of what should be persistent. // // NOTE: must be guarded by rf.mu func(rf *Raft) persist() { DPrintf("[%d] persist state, curTerm %d, voteFor %d, log %+v\n", rf.me, rf.currentTerm, rf.votedFor, rf.log) // Your code here (2C). // Example: w := new(bytes.Buffer) e := labgob.NewEncoder(w) e.Encode(rf.currentTerm) e.Encode(rf.votedFor) e.Encode(rf.log) data := w.Bytes() rf.persister.SaveRaftState(data) }
// // restore previously persisted state. // func(rf *Raft) readPersist(data []byte) { if data == nil || len(data) < 1 { // bootstrap without any state? return } DPrintf("[%d] read state from storage\n", rf.me) // Your code here (2C). // Example: r := bytes.NewBuffer(data) d := labgob.NewDecoder(r) var currentTerm int var voteFor int var log []LogEntry if d.Decode(¤tTerm) != nil || d.Decode(&voteFor) != nil || d.Decode(&log) != nil { DPrintf("[%d] [Error]: read state failed", rf.me) } else { // NOTE: need latch or not? it depends on the caller rf.currentTerm = currentTerm rf.votedFor = voteFor rf.log = log DPrintf("[%d] read state from storage, %+v\n", rf.me, rf) } }
for { // if rf.killed() { // close(done) // actually we don't need this, now that rf is killed, blocked routines will be reclaimed by GC or OS // return // } reply, ok := <-replyCh if !ok { return } if reply.Term > maxTermFromRsp { maxTermFromRsp = reply.Term } if reply.VoteGranted { voteNum++ if2*voteNum > len(rf.peers) { voteOnce.Do(func() { // we can't break loop because we need to receive all data from the channel, otherwise some goroutine will be blocked forever // update if need rf.mu.Lock(rf.me, "doReceiveRequestVoteReply.once") // double check whether curTerm is the same with rf.currentTerm to avoid while executing `RequestVote`, the candidate had started a new election if curTerm != rf.currentTerm || rf.role != CANDIDATE || rf.killed() { rf.mu.Unlock(rf.me, "doReceiveRequestVoteReply.once") return } if rf.currentTerm < maxTermFromRsp { rf.turnOnPendingPersist(rf.setNewTerm, maxTermFromRsp) rf.changeRoleTo(FOLLOWER) rf.mu.Unlock(rf.me, "doReceiveRequestVoteReply.once") return } rf.changeRoleTo(LEADER) DPrintf("[%d.%d.%d] becomes leader, log len:%d log content:%v \n", rf.me, rf.role, rf.currentTerm, len(rf.log), rf.log) rf.mu.Unlock(rf.me, "doReceiveRequestVoteReply.once") }) } } else { unVoteNum++ if2*unVoteNum >= len(rf.peers) { unVoteOnce.Do(func() { rf.mu.Lock(rf.me, "doReceiveRequestVoteReply.unVote") if curTerm != rf.currentTerm || rf.role != CANDIDATE || rf.killed() { rf.mu.Unlock(rf.me, "doReceiveRequestVoteReply.unVote") return } rf.changeRoleTo(FOLLOWER) rf.mu.Unlock(rf.me, "doReceiveRequestVoteReply.unVote") }) } } } }
lab2A是整个lab2的骨架,核心包含两个计时器, 一个用于选举,一个用于心跳(后面实验中也包含log replication). 选举中的一个注意点是要保证candidate的log必须和follower保持 at least up-to-date. 除此外,如果发生过网络分区,那旧leader也可能会收到来自新leader的心跳,此时要注意role的转换(目前我的策略是一旦发现有比自己更大的Term,则自动降级为Follower)。