// check whether command appears in the log or not rf.mu.Lock() for i := len(rf.log) - 1; i >= 0; i-- { // backward scan is faster if rf.log[i].Command == command { index = i + 1 rf.mu.Unlock() return index, term, isLeader } } // not found in the log(range between[0, commitIndex]), append it rf.log = append(rf.log, LogEntry{Term: rf.currentTerm, Command: command}) index = len(rf.log) DPrintf("[%d.%d.%d] appends new log entry[%v] at index[%d]\n", rf.me, rf.role, rf.currentTerm, len(rf.log), index) rf.mu.Unlock()
func(rf *Raft) doSendAppendEntires(replyCh chan AppendEntriesReplyInCh, curTerm, pendingCommitIndex int) { rf.mu.Lock() defer rf.mu.Unlock() var sendRpcNum int = 0// use this to determine when to close channel // for debug // curTerm := rf.currentTerm // curRole := rf.role
for i := 0; i < len(rf.peers) && rf.role == LEADER; i++ { if i == rf.me { continue } if rf.currentTerm != curTerm { sendRpcNum++ // plus 1 so that we can close channel if sendRpcNum == len(rf.peers)-1 { close(replyCh) } continue }
prevLogTerm := -1 prevLogIndex := -1 var entries []LogEntry if rf.nextIndex[i] > 0 { DPrintf("[%d] log len:%d, nextIndex[%d]=%d\n", rf.me, len(rf.log), i, rf.nextIndex[i]) prevLogTerm = rf.log[rf.nextIndex[i]-1].Term prevLogIndex = rf.nextIndex[i] - 1 } elseif rf.nextIndex[i] < 0 && pendingCommitIndex >= 0 { // NOTE: Fix bug: if svr becomes leader, set all nextIndex to -1, then accepted new logs, and then doSendAppendEntries rf.nextIndex[i] = 0 }
if rf.nextIndex[i] >= 0 { // for debug if rf.nextIndex[i] > pendingCommitIndex+1 { log.Panicf("rf.nextIndex[i]=%d is large than pendingCommitIndex=%d", rf.nextIndex[i], pendingCommitIndex) } entries = make([]LogEntry, 0) // do a copy entries = append(entries, rf.log[rf.nextIndex[i]:pendingCommitIndex+1]...) }
rf.mu.Lock() if rf.role == LEADER { rf.mu.Unlock() replyCh <- AppendEntriesReplyInCh{ AppendEntriesReply: reply, svrId: svrId, } rf.mu.Lock() sendRpcNum++ // NOTE:sendPrcNum++ here to prevent close opeartion(below) was executed ahead of replyCh channel due to no one accpet reply } else { sendRpcNum++ } cnt := sendRpcNum rf.mu.Unlock()
if cnt == len(rf.peers)-1 { close(replyCh) // DPrintf("[%d.%d.%d] SendAppendEntries close reply channel\n", rf.me, curRole, curTerm) } }(i) } }
rf.mu.Lock() if curTerm != rf.currentTerm || rf.role != LEADER || rf.killed() { rf.mu.Unlock() // return continue// if we return now, replyCh will block some routines }
if reply.Term > rf.currentTerm { DPrintf("[%d.%d.%d] --> [%d] , remote term %d is large than me \n", rf.me, rf.role, rf.currentTerm, reply.svrId, reply.Term) // for debug if reply.Success { log.Panicf("reply.Term is large than me, but reply.Success is true") } rf.setNewTerm(reply.Term) rf.changeRoleTo(FOLLOWER) // someone's term is large than me, so give up leader role and change to follower rf.mu.Unlock() continue } // update nextIndex and matchIndex rf.updateNextIndex(reply, pendingCommitIndex) if reply.Success { successCnt++ } else { failCnt++ }
if2*successCnt > len(rf.peers) { succOne.Do(func() { rf.hBStopByCommitOp = false// force to reset preempByCommit so that heartBeatTimer can send msg if curTerm != rf.currentTerm || rf.role != LEADER || rf.killed() { // return return } if rf.commitIndex != pendingCommitIndex { rf.commitIndex = pendingCommitIndex DPrintf("[%d.%d.%d] get majority of appendentries rsp, change committedIndex to %d\n", rf.me, rf.role, rf.currentTerm, pendingCommitIndex) // notify applier rf.committedChangeCond.Broadcast() rf.hBStopByCommitOp = true rf.mu.Unlock() // fire next turn AppendEntries to tell others to commit rf.fireAppendEntires(false) rf.mu.Lock() // TODO:lock followed unlock immediately, how to fix this? use `go rf.fireAppendEntires` is another way, but launch a new goroutine is costly too } }) } elseif2*failCnt > len(rf.peers) && rf.hBStopByCommitOp { // // unlikely, // actually, it's unnecessary to execute code below // now that leader `commit opeartion` failed, which means leader is separeted from others. // so we can assume that leader will convert to follower in the further(when rejoin to the network), therefore no heartbeat is needed right now // comment those code is much better because it reduce the num of useless heartbeat // // failOne.Do(func() { // rf.mu.Lock() // defer rf.mu.Lock() // rf.preempByCommit = false // force to turn on heartBeatTimer // }) DPrintf("[%d] commit operation failed, can't get majority rsp\n", rf.me) } rf.mu.Unlock() } }
func(rf *Raft) changeRoleTo(role RaftRole) { ... if role == LEADER { rf.resetNextIndex() rf.hBStopByCommitOp = false// force to turn on heartbeatTimer } }
// follow all server rule if args.Term > rf.currentTerm { rf.setNewTerm(args.Term) }
// 1. Reply false if term < currentTerm (§5.1) if args.Term < rf.currentTerm { return } if args.PrevLogIndex != -1 { // 2. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3) iflen(rf.log)-1 < args.PrevLogIndex { // case 3: rf.log is too short reply.XLen = len(rf.log) return } if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm { reply.XTerm = rf.log[args.PrevLogIndex].Term reply.XIndex = args.PrevLogIndex i := args.PrevLogIndex - 1 // find the first index of args.PrevLogTerm in rf.log for i >= 0 && rf.log[i].Term == rf.log[i+1].Term { reply.XIndex = i i-- } return } rf.log = rf.log[:args.PrevLogIndex+1] } else { // clear log, because leader sent a full log rf.log = []LogEntry{} } // 4. Append any new entries not already in the log rf.log = append(rf.log, args.Entries...) // 5. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry) if args.LeaderCommit > rf.commitIndex { if args.LeaderCommit > len(rf.log)-1 { rf.commitIndex = len(rf.log) - 1 } else { rf.commitIndex = args.LeaderCommit } // notify applier rf.committedChangeCond.Broadcast() // NOTE: persistent, maybe do a batch persistent? DPrintf("[%d] change commitIndex to %d\n", rf.me, rf.commitIndex) } reply.Success = true DPrintf("[%d.%d.%d] AppendEntiresRPC end from leader [%d], log len:%d, log info %+v\n", rf.me, rf.role, rf.currentTerm, args.LeaderId, len(rf.log), rf.log) }
// follow all server rule if args.Term > rf.currentTerm { rf.setNewTerm(args.Term) }
// 1. Reply false if term < currentTerm (§5.1) if args.Term < rf.currentTerm { return } if args.PrevLogIndex != -1 { // 2. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3) iflen(rf.log)-1 < args.PrevLogIndex { // case 3: rf.log is too short reply.XLen = len(rf.log) return } if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm { reply.XTerm = rf.log[args.PrevLogIndex].Term reply.XIndex = args.PrevLogIndex i := args.PrevLogIndex - 1 // find the first index of args.PrevLogTerm in rf.log for i >= 0 && rf.log[i].Term == rf.log[i+1].Term { reply.XIndex = i i-- } return } rf.log = rf.log[:args.PrevLogIndex+1] } else { // clear log, because leader sent a full log rf.log = []LogEntry{} } // 4. Append any new entries not already in the log rf.log = append(rf.log, args.Entries...) // 5. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry) if args.LeaderCommit > rf.commitIndex { if args.LeaderCommit > len(rf.log)-1 { rf.commitIndex = len(rf.log) - 1 } else { rf.commitIndex = args.LeaderCommit } // notify applier rf.committedChangeCond.Broadcast() // NOTE: persistent, maybe do a batch persistent? DPrintf("[%d] change commitIndex to %d\n", rf.me, rf.commitIndex) } reply.Success = true DPrintf("[%d.%d.%d] AppendEntiresRPC end from leader [%d], log len:%d, log info %+v\n", rf.me, rf.role, rf.currentTerm, args.LeaderId, len(rf.log), rf.log) }
func(rf *Raft) applier() { DPrintf("[%d] applier start...\n", rf.me) for { var commitIndex int rf.mu.Lock() for rf.lastAppliedIndex == rf.commitIndex && !rf.killed() { rf.committedChangeCond.Wait() } commitIndex = rf.commitIndex if rf.killed() { rf.mu.Unlock() break } rf.lastAppliedIndex++ DPrintf("[%d] applier try to apply log[%d-%d]\n", rf.me, rf.lastAppliedIndex, commitIndex) for rf.lastAppliedIndex <= commitIndex { rf.applyCh <- ApplyMsg{ CommandValid: true, // NOTE: how to use this? maybe need to fix this in the future Command: rf.log[rf.lastAppliedIndex].Command, CommandIndex: rf.lastAppliedIndex + 1, // raft's log id starts from 0, but service starts from 1, so fix it by plus 1 } // DPrintf("[%d] appiler feeds command, command idx:%d\n", rf.me, rf.lastAppliedIndex) rf.lastAppliedIndex++ } rf.lastAppliedIndex-- // back one step rf.mu.Unlock() // NOTE: another solution is put the lock inside for loop in order to avoid being blocked by applyCh, but we assume service will accept data immediately here } }
// lots of successful commands to new group. for i := 0; i < cmdNum; i++ { cmd++ cfg.one(cmd, 3, true) }
// now another partitioned leader and one follower leader2 := cfg.checkOneLeader() other := (leader1 + 2) % servers if leader2 == other { other = (leader2 + 1) % servers } cfg.disconnect(other) DPrintf("disconnect: %v \n", other)
// lots more commands that won't commit for i := 0; i < cmdNum; i++ { cmd++ cfg.rafts[leader2].Start(cmd) }
time.Sleep(RaftElectionTimeout / 2)
// bring original leader back to life, for i := 0; i < servers; i++ { cfg.disconnect(i) } cfg.connect((leader1 + 0) % servers) cfg.connect((leader1 + 1) % servers) cfg.connect(other) DPrintf("connect: %v %v %v\n", (leader1+0)%servers, (leader1+1)%servers, other)
// lots of successful commands to new group. for i := 0; i < cmdNum; i++ { cmd++ cfg.one(cmd, 3, true) }
// now everyone for i := 0; i < servers; i++ { cfg.connect(i) } cmd++ cfg.one(cmd, servers, true)