ifrf.sendAppendEntries(peer,args,&reply){rf.mu.Lock()deferrf.mu.Unlock()ifreply.Term>rf.currentTerm{//changeState but not update election timer
}elseifargs.Term==rf.currentTerm{ifreply.Success{// update nextIndex, matchIndex & commit
}elseifreply.Term==rf.currentTerm{// adjust nextIndex
rf.nextIndex[peer]=max(1,rf.nextIndex[peer]-1)}}
Follower 进行日志复制(需严格按照论文)。
1
2
3
4
5
6
7
8
9
10
11
12
13
prevLogIndex:=rf.nextIndex[i]-1prevLogTerm:=rf.logAt(prevLogIndex).Termargs:=&AppendEntriesArgs{Term:rf.currentTerm,LeaderId:rf.me,PrevLogIndex:prevLogIndex,PrevLogTerm:prevLogTerm,Entries:make([]Entry,rf.getLastLogL().Index-prevLogIndex),LeaderCommit:rf.commitIndex,}// 必须要复制一遍才能免遭 data race
copy(args.Entries,rf.log[prevLogIndex+1-rf.lastIncludedIndex():])gorf.foraHeartbeat(i,args)
// last log index is too small
// entry logAt prevLogIndex whose term doesn't match prevLogTerm
ifargs.PrevLogIndex>rf.getLastLogL().Index||args.PrevLogTerm!=rf.logAt(args.PrevLogIndex).Term{reply.Success,reply.Term=false,rf.currentTermreturn}// log 比 Leader 短 || log 比 Leader 长并且存在不匹配 -> 截断并补上
needReplace:=rf.getLastLogL().Index<=args.PrevLogIndex+len(args.Entries)iflen(args.Entries)>0{fori:=args.PrevLogIndex+1;i<=args.PrevLogIndex+len(args.Entries);i++{// idx and term can identify a log
ifrf.getLastLogL().Index>=i&&rf.logAt(i).Term!=args.Entries[i-args.PrevLogIndex-1].Term{needReplace=truebreak}}ifneedReplace{rf.log=append(rf.log[:args.PrevLogIndex+1-rf.lastIncludedIndex()],args.Entries...)}}
If you follow the rule from Figure 2, the servers with the more up-to-date logs won’t be interrupted by outdated servers’ elections, and so are more likely to complete the election and become the Leader.
发送 heartbeat 的时候需要注意自己还是不是 Leader。
因为我忽略了这个点,导致 Leader crash 再恢复的时候有很小的概率出现一个有点怪的 bug:恢复的一瞬间,Leader 想将自己积累已久的日志发给其他节点,被拒收后发现自己的 term 过期了,于是将自己转为 Follower…… 好像没什么问题,但无法完成一致性检验,通过打 log 发现 Leader 在转为 Follower 后的一瞬间发送了最后一波心跳,于是检查代码,发现我在发送心跳给不同的 peer 的时候用的是 go routine 套 go routine,未能保证原子操作,在将要发送心跳的时候,Leader 已经不再是 Leader 了,但还是做出了 Leader 的行为,所以需要在发送前核验自己的 Leader 身份。