6.824 Lab2B

前置芝士

Task

处理日志,具体来说是接收 Service 层发来的日志,日志复制,应用日志。

涉及到一点 Raft 层与 Service 层的交互,比如 Leader 接受 command 并保存为日志;Apply 日志到 Service 层。

Step

  1. 选举的时候需要额外注意一个限制:只投票给 log 比自己新的 Candidate。
  2. Leader 接受 command,append 进 log,返回此 command 的下标,此时 log 在状态机中还没被应用。
  3. 心跳时顺便发送日志给 Follower(也可以分开)。
  • AppendEntries RPC 参数中的 log 需要深拷贝才能免遭 data race。
  • 发送的 log 并不是从头开始,而是从 nextIndex 开始。
  • 在不可靠网络中,收到 RPC 结果的时候可能已经过了几个任期,此时需要先检查一下 args.Term 还等不等于 currentTerm,但在此之前,如果返回的任期比 currentTerm 还要大,那么无论是不是不可靠网络,这个 Leader 都要转为 Follower(同样的,因为不确定他有没有资格成为 Candidate,所以没有必要将其转为 Candidate,反正任期增加后总会有 Candidate 产生);如果返回的任期小于等于 currentTerm,那么 Leader 只需要正常处理。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
if rf.sendAppendEntries(peer, args, &reply) {
  rf.mu.Lock()
  	defer rf.mu.Unlock()
  	if reply.Term > rf.currentTerm {
    //changeState but not update election timer
  } else if args.Term == rf.currentTerm {
    if reply.Success {
      // update nextIndex, matchIndex & commit
    } else if reply.Term == rf.currentTerm{
      // adjust nextIndex
      rf.nextIndex[peer] = max(1,rf.nextIndex[peer]-1)
    }
  }
  1. Follower 进行日志复制(需严格按照论文)。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
prevLogIndex := rf.nextIndex[i] - 1
prevLogTerm := rf.logAt(prevLogIndex).Term
args := &AppendEntriesArgs{
  Term:         rf.currentTerm,
  LeaderId:     rf.me,
  PrevLogIndex: prevLogIndex,
  PrevLogTerm:  prevLogTerm,
  Entries:      make([]Entry, rf.getLastLogL().Index-prevLogIndex),
  LeaderCommit: rf.commitIndex,
}
// 必须要复制一遍才能免遭 data race
copy(args.Entries, rf.log[prevLogIndex+1-rf.lastIncludedIndex():])
go rf.foraHeartbeat(i, args)
  1. AppendEntries RPC 回复后应如何改变自身状态。
  • 先进行一个 term 的处理
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
if args.Term < rf.currentTerm {
  reply.Term, reply.Success = rf.currentTerm, false
  return
}
if args.Term > rf.currentTerm {
  rf.changeStateL(Follower, args.Term, NULL)
} else {
  rf.changeStateL(Follower, rf.currentTerm, rf.votedFor)
  rf.electionTimer.Reset(getElectionDuration())
}
  • 再看看自己的 log 与 RPC 传来的 entries 有没有冲突,有的话以 Leader 为准;返回参数调整自己在 Leader 那边的 nextIndex,直到和 Leader prevLogIndex 的 term 一致为止,才能真正 append entries。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// last log index is too small
// entry logAt prevLogIndex whose term doesn't match prevLogTerm
if args.PrevLogIndex > rf.getLastLogL().Index || args.PrevLogTerm != rf.logAt(args.PrevLogIndex).Term{
  reply.Success, reply.Term = false, rf.currentTerm
  return
}

// log 比 Leader 短 || log 比 Leader 长并且存在不匹配 -> 截断并补上
needReplace := rf.getLastLogL().Index <= args.PrevLogIndex+len(args.Entries)
if len(args.Entries) > 0 {
  for i := args.PrevLogIndex + 1; i <= args.PrevLogIndex+len(args.Entries); i++ {
    // idx and term can identify a log
    if rf.getLastLogL().Index >= i && rf.logAt(i).Term != args.Entries[i-args.PrevLogIndex-1].Term {
      needReplace = true
      break
    }
  }
  if needReplace {
    rf.log = append(rf.log[:args.PrevLogIndex+1-rf.lastIncludedIndex()], args.Entries...)
  }
}
  • commit 自己的日志。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
if args.LeaderCommit > rf.commitIndex {
  rf.commitIndex = args.LeaderCommit
  if len(args.Entries) > 0 {
    rf.commitIndex = min(rf.commitIndex, args.Entries[len(args.Entries)-1].Index)
  }
  DPrintf(dInfo, "S%v lastIndex:%v, commIndex:%v, lastApplied:%v", rf.me, rf.getLastLogL().Index, rf.commitIndex, rf.lastApplied)
  if rf.commitIndex > rf.lastApplied {
    rf.applyWaker <- 1
  }
}
reply.Term, reply.Success = rf.currentTerm, true
  1. Leader 方面,只有当大部分 Follower 的 matchIndex 更新(log 存到 Follower)了,Leader 的 commitIndex 才能同步更新。这部分我直接从后往前遍历,因为日志的任期是按顺序增长的,所以如果遇到日志任期小于当前任期直接 break,因为 Leader 只能将自己任期的日志 commit,对于其他任期的日志,只能被动 commit。在 lab 中,没有需要 Leader及时更新旧 commit 的情况,所以不做特殊处理(在成为 Leader 的时候发一个空日志)。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
for i := rf.getLastLogL().Index; i > rf.commitIndex; i-- {
  if rf.logAt(i).Term != rf.currentTerm {
    break
  }
  cnt := 1
  for p := range rf.peers {
    if p != rf.me && rf.matchIndex[p] >= i {
      cnt++
    }
  }
  if cnt > len(rf.peers)/2 {
    rf.commitIndex = i
    rf.applyWaker <- 1
    break
  }
}

Tips

  1. 在 currentTerm < args.Term 的时候,我们其实是不需要重置选举时间的,因为如果出现了一个 Leader 比自己任期大,说明自己没有给他投过票,但他得到了半数节点的支持,说明至少半数节点有成为 Candidate 的潜质,并且如果自己真的开始选举的话,会发送无用的 RPC,降低 Raft 的运行效率。

If you follow the rule from Figure 2, the servers with the more up-to-date logs won’t be interrupted by outdated servers’ elections, and so are more likely to complete the election and become the Leader.

  1. 发送 heartbeat 的时候需要注意自己还是不是 Leader。

因为我忽略了这个点,导致 Leader crash 再恢复的时候有很小的概率出现一个有点怪的 bug:恢复的一瞬间,Leader 想将自己积累已久的日志发给其他节点,被拒收后发现自己的 term 过期了,于是将自己转为 Follower…… 好像没什么问题,但无法完成一致性检验,通过打 log 发现 Leader 在转为 Follower 后的一瞬间发送了最后一波心跳,于是检查代码,发现我在发送心跳给不同的 peer 的时候用的是 go routine 套 go routine,未能保证原子操作,在将要发送心跳的时候,Leader 已经不再是 Leader 了,但还是做出了 Leader 的行为,所以需要在发送前核验自己的 Leader 身份。

  1. 在 updateCommit 的时候我想直接判断成功返回 AppendEntries RPC 的 Follower 数量,如果超过一半就更新 Leader commitIndex = lastLog.index,但因为发送和接受 RPC 并非原子,可能会存在前面的 Follower 未包含后面新增的 log 的情况。

If there exists an N such that N > commitIndex, a majority of matchIndex[i] ≥ N, and log[N].term == currentTerm: set commitIndex = N (§5.3, §5.4).

updatedupdated2024-04-302024-04-30