6.S081

添加用户函数

在 Makefile 的 UPROGS 处添加

添加系统调用

系统调用的用户空间代码在user/user.h和user/usys.pl中。

内核空间代码是kernel/syscall.h、kernel/syscall.c。

Redis Cluster 公网部署

RT,如果要公网访问服务器的 Redis,要将 redis-cluster 部署在公网上(似乎是废话,但是如果 Redis 服务发现端口是本地的话,就没法公网访问)。

config 中有几个很重要的参数:

# redis-cluster.conf
cluster-announce-ip 116.205.130.21 # 公网ip
cluster-announce-port 8003 # redis service 暴露给外部的端口
cluster-announce-bus-port 18003 #  redis 集群总线端口

其他就不说了,主要是总线端口,它是 cluster 服务发现的端口,如果填的是局域网 ip(192.168.xxx.xxx)那么就无法被外部访问,因为在 Redis 对 key 进行 hash 的时候,如果处理 key 分片不是本 Redis 服务器的话,会提示Redirected to slot [5798] located at 192.168.xxx.xxx:$xxx(重定向到其他 Redis 服务器),无法被公网访问。 所以要将 cluster 服务发现的 ip 设为公网 ip,还要记得在服务器上暴露服务 ip 和服务发现 ip 以及关闭对应的防火墙。

6.824 Lab4

Task

完成 Multi-Raft

  • lab4A:完成 Multi-Raft 控制中心(和 lab3 内容差不多,一句话来说就是 shardctrler 是一个将 Config 作为日志进行维护的单 Raft 集群,并且不需要实现快照和持久化) 需要注意的地方是执行节点的 Join 和 Leave 操作时 shards 应该怎么平均且有序地分配给 gids(注意在 Leave 操作中,Group 里面可能有多余的 gid,此时需要注意将其算在 gids 里面),主要逻辑和 lab3 差不多,比较简单。

6.824 Lab3

Task

在 raft 框架的基础上建立一个容错的 KV 数据库。重点在于理解 Service 层和 Raft 层的交互,代码方面较为简单,故不进行赘述。

Step

  1. 在 client 和 service 两层完成 Get(), Append(), Put()
  2. 对重复的 Append 或 Put 进行去重
  3. service 层的快照与持久化(快照存储的是 Key/Value 等元数据) lab3

tips:

值得一提的是在写完 lab3 的大概框架后,TestSpeed3A 一直不通过,因为要求是平均 33ms 一个 commit,但正常来说 commit 时间应该和 heartbeat timeout 差不多,遂修改 Start() 函数,在 command 被 Service 送来的时候发起一次心跳,进行日志同步。

6.824 Lab2D

前置芝士

Task

完成 Raft 的快照功能(涉及到较多的与 Service 层的交互。

  • 为什么要有 snapshot?
    1. snapshot 可以将 log 压缩,比如将 10 个 log 压缩成 9 个(当两个 log 修改的 key 值一样时)。
    2. snapshot 可以减少 Raft 层 log 的长度,帮助进度较慢的 Raft 节点快速恢复状态机(减短 raft 中 log 的长度)。snapshot 区别于持久化 log,后者主要是不让宕机的 raft 丢失太多日志。
  • 在 2D 中,snapshot 会涉及到 raft 层与 service 层的多次交互,看这个 diagram of Raft interactions  或许可以帮助理解 Raft 协议不同层次的功能与特性。
  • snapshot 作用于每一个 Raft 节点,我们需要记录 snapshot 最后一个 index 和 term,用于一致性检查。

Step

  1. Snapshot() 被 service 层调用,约莫 10 次 commit 调用一次,用于保存快照。需要注意的是快照是 service 传给 raft 层的,而不是我们在 raft 层写入日志创建的,我们不需要创建快照,仅需要处理 Service 层传进来的 snapshot,进行日志截断和快照持久化。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
func (rf *Raft) Snapshot(index int, snapshot []byte) {
	rf.mu.Lock()
	defer rf.mu.Unlock()
	if rf.lastIncludedIndex() >= index {
		return
	}
	term := rf.logAt(index).Term
	rf.log = rf.log[index-rf.lastIncludedIndex():]
	rf.setLastIncludedIndex(index)
	rf.setLastIncludedTerm(term)
	rf.persister.SaveStateAndSnapshot(rf.getEncodeStateL(), snapshot)
	DPrintf(dSnap, "S%v last: %v", rf.me, index)
}

snapshot 2. InstallSnapshot:

6.824 Lab2C

前置芝士

Task

完成 Raft 的持久化。

Step

如果前面做得好,只需要完成持久化。

  1. 持久化至 Service 层
1
persist()
  1. Crash 后从 Service 层恢复
1
readPersist(data []byte)
  1. 如果和我一样前面差点意思,就要小修一下 guide
  • AppendEntries Handler 中,Follower 引入 XTerm 和 XIndex 来快速调整 nextIndex。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
if args.PrevLogIndex > rf.getLastLogL().Index {
  reply.Success, reply.Term = false, rf.currentTerm
  reply.XIndex = rf.getLastLogL().Index + 1 // Follower's nextIndex
  return
}
// entry logAt prevLogIndex whose term doesn't match prevLogTerm
DPrintf(dInfo, "S%v prevLogTerm:%v, prevLogIndex:%v,log[prevLogIndex].Term:%v", rf.me, args.PrevLogTerm, args.PrevLogIndex, rf.logAt(args.PrevLogIndex).Term)
if args.PrevLogTerm != rf.logAt(args.PrevLogIndex).Term {
  reply.Success, reply.Term = false, rf.currentTerm
  reply.XIndex, reply.XTerm = rf.commitIndex+1, rf.logAt(args.PrevLogIndex).Term
  for i := args.PrevLogIndex; i > rf.commitIndex+1; i-- {
    if reply.XTerm != rf.logAt(i-1).Term {
      reply.XIndex = i
      return
    }
  }
  DPrintf(dInfo, "S%v XIndex = %v", rf.me, rf.commitIndex+1)
  return
}
  • Leader 对 XTerm 和 XIndex 的处理
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
if rf.sendAppendEntries(peer, args, &reply) {
  rf.mu.Lock()
  defer rf.mu.Unlock()
  if reply.Term > rf.currentTerm {
    //changeState but not update election timer
  } else if args.Term == rf.currentTerm {
    if reply.Success {
      // update nextIndex, matchIndex & commit
    } else if reply.Term == rf.currentTerm{
        // adjust nextIndex
        if reply.XIndex != NULL {
        DPrintf(dHeart, "S%v <- S%v heartbeat XIndex:%v, XTerm:%v", rf.me, peer, reply.XIndex, reply.XTerm)
        if reply.XTerm == NULL {
          rf.nextIndex[peer] = reply.XIndex
        } else {
          ok := false
          for i := rf.nextIndex[peer] - 1; i > rf.lastIncludedIndex() && reply.XTerm <= rf.logAt(i).Term; i-- {
            if rf.logAt(i).Term == reply.XTerm {
              ok = true
              rf.nextIndex[peer] = i + 1
              break
            }
          }
          if !ok {
            rf.nextIndex[peer] = reply.XIndex
          }
        }
      }
    }
  }
}
  • electionTimer 不能在每次变成 Follower 的时候重置。

If election timeout elapses without receiving AppendEntries RPC from current Leader or granting vote to Candidate: convert to Candidate.

6.824 Lab2B

前置芝士

Task

处理日志,具体来说是接收 Service 层发来的日志,日志复制,应用日志。

涉及到一点 Raft 层与 Service 层的交互,比如 Leader 接受 command 并保存为日志;Apply 日志到 Service 层。

Step

  1. 选举的时候需要额外注意一个限制:只投票给 log 比自己新的 Candidate。
  2. Leader 接受 command,append 进 log,返回此 command 的下标,此时 log 在状态机中还没被应用。
  3. 心跳时顺便发送日志给 Follower(也可以分开)。
  • AppendEntries RPC 参数中的 log 需要深拷贝才能免遭 data race。
  • 发送的 log 并不是从头开始,而是从 nextIndex 开始。
  • 在不可靠网络中,收到 RPC 结果的时候可能已经过了几个任期,此时需要先检查一下 args.Term 还等不等于 currentTerm,但在此之前,如果返回的任期比 currentTerm 还要大,那么无论是不是不可靠网络,这个 Leader 都要转为 Follower(同样的,因为不确定他有没有资格成为 Candidate,所以没有必要将其转为 Candidate,反正任期增加后总会有 Candidate 产生);如果返回的任期小于等于 currentTerm,那么 Leader 只需要正常处理。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
if rf.sendAppendEntries(peer, args, &reply) {
  rf.mu.Lock()
  	defer rf.mu.Unlock()
  	if reply.Term > rf.currentTerm {
    //changeState but not update election timer
  } else if args.Term == rf.currentTerm {
    if reply.Success {
      // update nextIndex, matchIndex & commit
    } else if reply.Term == rf.currentTerm{
      // adjust nextIndex
      rf.nextIndex[peer] = max(1,rf.nextIndex[peer]-1)
    }
  }
  1. Follower 进行日志复制(需严格按照论文)。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
prevLogIndex := rf.nextIndex[i] - 1
prevLogTerm := rf.logAt(prevLogIndex).Term
args := &AppendEntriesArgs{
  Term:         rf.currentTerm,
  LeaderId:     rf.me,
  PrevLogIndex: prevLogIndex,
  PrevLogTerm:  prevLogTerm,
  Entries:      make([]Entry, rf.getLastLogL().Index-prevLogIndex),
  LeaderCommit: rf.commitIndex,
}
// 必须要复制一遍才能免遭 data race
copy(args.Entries, rf.log[prevLogIndex+1-rf.lastIncludedIndex():])
go rf.foraHeartbeat(i, args)
  1. AppendEntries RPC 回复后应如何改变自身状态。
  • 先进行一个 term 的处理
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
if args.Term < rf.currentTerm {
  reply.Term, reply.Success = rf.currentTerm, false
  return
}
if args.Term > rf.currentTerm {
  rf.changeStateL(Follower, args.Term, NULL)
} else {
  rf.changeStateL(Follower, rf.currentTerm, rf.votedFor)
  rf.electionTimer.Reset(getElectionDuration())
}
  • 再看看自己的 log 与 RPC 传来的 entries 有没有冲突,有的话以 Leader 为准;返回参数调整自己在 Leader 那边的 nextIndex,直到和 Leader prevLogIndex 的 term 一致为止,才能真正 append entries。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// last log index is too small
// entry logAt prevLogIndex whose term doesn't match prevLogTerm
if args.PrevLogIndex > rf.getLastLogL().Index || args.PrevLogTerm != rf.logAt(args.PrevLogIndex).Term{
  reply.Success, reply.Term = false, rf.currentTerm
  return
}

// log 比 Leader 短 || log 比 Leader 长并且存在不匹配 -> 截断并补上
needReplace := rf.getLastLogL().Index <= args.PrevLogIndex+len(args.Entries)
if len(args.Entries) > 0 {
  for i := args.PrevLogIndex + 1; i <= args.PrevLogIndex+len(args.Entries); i++ {
    // idx and term can identify a log
    if rf.getLastLogL().Index >= i && rf.logAt(i).Term != args.Entries[i-args.PrevLogIndex-1].Term {
      needReplace = true
      break
    }
  }
  if needReplace {
    rf.log = append(rf.log[:args.PrevLogIndex+1-rf.lastIncludedIndex()], args.Entries...)
  }
}
  • commit 自己的日志。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
if args.LeaderCommit > rf.commitIndex {
  rf.commitIndex = args.LeaderCommit
  if len(args.Entries) > 0 {
    rf.commitIndex = min(rf.commitIndex, args.Entries[len(args.Entries)-1].Index)
  }
  DPrintf(dInfo, "S%v lastIndex:%v, commIndex:%v, lastApplied:%v", rf.me, rf.getLastLogL().Index, rf.commitIndex, rf.lastApplied)
  if rf.commitIndex > rf.lastApplied {
    rf.applyWaker <- 1
  }
}
reply.Term, reply.Success = rf.currentTerm, true
  1. Leader 方面,只有当大部分 Follower 的 matchIndex 更新(log 存到 Follower)了,Leader 的 commitIndex 才能同步更新。这部分我直接从后往前遍历,因为日志的任期是按顺序增长的,所以如果遇到日志任期小于当前任期直接 break,因为 Leader 只能将自己任期的日志 commit,对于其他任期的日志,只能被动 commit。在 lab 中,没有需要 Leader及时更新旧 commit 的情况,所以不做特殊处理(在成为 Leader 的时候发一个空日志)。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
for i := rf.getLastLogL().Index; i > rf.commitIndex; i-- {
  if rf.logAt(i).Term != rf.currentTerm {
    break
  }
  cnt := 1
  for p := range rf.peers {
    if p != rf.me && rf.matchIndex[p] >= i {
      cnt++
    }
  }
  if cnt > len(rf.peers)/2 {
    rf.commitIndex = i
    rf.applyWaker <- 1
    break
  }
}

Tips

  1. 在 currentTerm < args.Term 的时候,我们其实是不需要重置选举时间的,因为如果出现了一个 Leader 比自己任期大,说明自己没有给他投过票,但他得到了半数节点的支持,说明至少半数节点有成为 Candidate 的潜质,并且如果自己真的开始选举的话,会发送无用的 RPC,降低 Raft 的运行效率。

If you follow the rule from Figure 2, the servers with the more up-to-date logs won’t be interrupted by outdated servers’ elections, and so are more likely to complete the election and become the Leader.

6.824 Lab2A

Task

完成各种情况的领导人选举。

我们首先需要熟悉一下 Raft 的工作原理,建议先过一遍 前置芝士

Step

  1. 先完善 Raft 结构和 Make 函数,再结合 Gif 思考单个节点的状态。
  2. 节点开始时的状态是 Follower,election timeout 后,状态会改为 Candidate。我们应该如何设计选举超时,当然是开一个 go routinerf.ticker() 对超时进行检测。那么是用time.Sleep还是time.Timer实现呢?官方的建议是用 sleep,但我用的是 timer,也能够正常实现,并且我还实现了 sleep 的版本,感觉速度上相差无几。
  • ticker
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func (rf *Raft) ticker() {
	for !rf.killed() {
		select {
		case <-rf.heartbeatTimer.C:
			rf.mu.Lock()
			if rf.state == Leader {
				rf.startHeartbeatL(true)
				rf.heartbeatTimer.Reset(getHeartbeatDuration())
				rf.electionTimer.Reset(getElectionDuration())
			}
			rf.mu.Unlock()
		case <-rf.electionTimer.C:
			rf.mu.Lock()
			rf.startElectionL()
			rf.electionTimer.Reset(getElectionDuration())
			rf.mu.Unlock()
		}
	}
}
  1. 节点成为 Candidate 后并发发送 RequestVote RPC,自己如何处理 RPC 返回的结果,如何判断投给自己的节点数量是否超过半数?
  • Leader election
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

func (rf *Raft) foraVote(peer int, args *RequestVoteArgs, cnt *int) {
	reply := RequestVoteReply{}
	DPrintf(dVote, "S%v -> S%v send vote", rf.me, peer)
	if rf.sendRequestVote(peer, args, &reply) {
		rf.mu.Lock()
		defer rf.mu.Unlock()
		if args.Term == rf.currentTerm && rf.state == Candidate {
			if reply.VoteGranted {
				DPrintf(dVote, "S%v <- S%v voted", rf.me, peer)
				*cnt++
				if *cnt > len(rf.peers)/2 {
					for i := range rf.peers {
						rf.nextIndex[i] = rf.getLastLogL().Index + 1
						rf.matchIndex[i] = 0
						DPrintf(dHeart, "S%v <- S%v nextIndex:%v", rf.me, i, rf.nextIndex[i])
					}
					rf.changeStateL(Leader)
				}
			} else if reply.Term > rf.currentTerm {
				DPrintf(dVote, "S%v <- S%v ~voted", rf.me, peer)
				rf.changeStateL(Follower, reply.Term, NULL)
				rf.persist()
			}
		}
	}
}
  1. peer 节点从哪些方面处理投票逻辑,任期?日志情况(2B)?
  • Follower RequestVote RPC handler
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
	rf.mu.Lock()
	defer rf.mu.Unlock()
	defer rf.persist()
	// currTerm higher || voted
	if rf.currentTerm > args.Term || (rf.votedFor != NULL && rf.currentTerm == args.Term) {
		reply.Term, reply.VoteGranted = rf.currentTerm, false
		return
	}
	if rf.currentTerm < args.Term {
		rf.changeStateL(Follower, args.Term, NULL)
	}
	//  check log leader
	//  candidate lastLogTerm to short || (lastLogTerm=rf.lastLogTerm && candidate lastLogIndex to short)
	if rf.getLastLogL().Term > args.LastLogTerm || (rf.getLastLogL().Term == args.LastLogTerm && rf.getLastLogL().Index > args.LastLogIndex) {
		reply.Term, reply.VoteGranted = rf.currentTerm, false
		return
	}
	rf.changeStateL(Follower, args.Term, args.CandidateId)
	rf.electionTimer.Reset(getElectionDuration())
	reply.Term, reply.VoteGranted = rf.currentTerm, true
}

Tips

  1. Candidate 选举前要先自增 currentTerm