Lab Page:http://nil.csail.mit.edu/6.824/2022/labs/lab-raft.html

Lab概述

本次lab需要实现共识算法raft。Lab2A我看应该是只需要做出election就可以,一步一步来吧。

Raft知识点

网课讲的有些慢哈哈,我直接看博客学习了:https://zhuanlan.zhihu.com/p/404315977

在Raft中,节点的状态一共三种,follower,candidate和leader,每个节点在加入时都会默认成为follower。

follower并不主动发出消息,它所做的操作如下:

  • 收到leader的heartbeat,维持follower状态。如果之前没识别到leader或识别了别的leader,则更新。

  • 如果一段时间electionTime内没有收到heartbeat,则认为leader已经嘎了,将自己的任期term加1,成为candidate,并向其它成员发起投票请求。

  • 接收到别的candidate的投票请求,依据先来后到原则,只给一个candidate投票。candidate的term以及日志index必须大于自己,否则不会投票。(这边我暂时不是很确定是大于还是大于等于)(会不会有自己的任期是3,有一个任期为4的candidate的request过来之后,又有一个任期为5的candidate的request。这个时候应该还是要投票吧?所以一张票的限制应该是以任期为单位的?)

  • 接收到客户端的请求后,转发给leader。(这边也有一个问题,对于新加入的节点,它还没认识到集群中的leader,这里的请求是直接丢弃还是按下不表)

candidate是一种临时状态,它所做的操作如下:

  • 向其它follower发起请求,需要传递自己的任期号term和日志进度index。

  • 如果收到了多数成员的赞成票,则自己成为leader(因为一个follower最多投一次票,看上去多数赞成票只能有一个,但是不同节点维护的peer状态可能产生偏差,导致对“多数”产生误解,生成两个甚至多个leader。raft对这一问题的要求是,一个term内只能有leader。(所以具体怎么控制的呢?))

  • 如果收到了别的leader的heartbeat,则转变回follower状态。(那如果在另一个leader的heartbeat到达前,自己也变成了leader怎么办?(有时候就是有这么巧合))

  • 感觉我对Term的理解不太对哈哈。除了上面的情况,如果收到了一个reply请求携带的term比自己大。说明选举已经进行到下一阶段,已经产生了下一代的共识的leader,自己就该转变回follower状态。

  • candidate有一个随机的选举时限,超过时限还没胜出则宣布失败,重新变回follower。随机是为了避免陷入某种选票瓜分的死局。(随机多少也要具体调整)

leader我们暂时不考虑它处理客户端请求的问题,只考虑它在选举过程中的操作:

  • 定期向其它节点发送heartbeat

以上,可以看见还是有很多疑惑的点的,还有没写出来的比如peer列表怎么维护等等,写lab的过程中就会慢慢想明白的吧。

接下来应该就是看代码了。这次我希望能够看得仔细一点吧,因为后面还有2B,2C,2D是承接Lab2A进行下去的。

基础代码框架

还是上来一头雾水,总之要写的代码在raft/raft.go里

最基础的一个节点应该就是这个raft结构体:

1
2
3
4
5
6
7
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()
}

先去看一下 *Persister 是个什么东西。

还是不太明白go语言的import逻辑。 Persister是定义在同属package raft下的 persister.go 中的结构体,定义如下:

1
2
3
4
5
type Persister struct {
mu sync.Mutex
raftstate []byte
snapshot []byte
}

好吧,论文里介绍了什么是persister。总之就是,persister表示的是一个节点要存的持久化状态。至于持久化状态有什么呢?在论文的figure2里有。

我不是很懂为什么这里还要加一个锁?嘛,暂时没想到会冲突的情况。以后操作一个节点时内部还会遇到冲突吗。哦,也合理,因为以后可能会有往本地存储写Persister的时候,不可能同时允许Raft过程修改Persister。所以Persister这次Lab2A应该用不到吧。

rpc参数与Make函数

根据Hint4,我们先来补全requestVote,按照图示的内容。记得RPC的参数要是大写字母开头,Lab1里踩过坑了。

1
2
3
4
5
6
7
8
type RequestVoteArgs struct {
Term int // candidate’s term
CandidateId int // candidate requesting vote
}
type RequestVoteReply struct {
Term int
VoteGranted bool // 表示是否收到投票
}

然后就是 Make() 函数的作用是初始化一个raft节点,要理清需要做的事。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{} // rf是一个指针
rf.peers = peers // 在go中,指针指向成员变量不需要使用->
rf.persister = persister
rf.me = me

// Your initialization code here (2A, 2B, 2C).

rf.state = Follower
rf.leaderId = -1 // 表示还没有leader
go rf.StartElection()

// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())

// start ticker goroutine to start elections
go rf.ticker()


return rf
}

首先是需要将节点的状态设为follower,不妨给raft节点加上一个state属性。可以定义一个const映射,来增强代码的可读性。

1
2
3
4
5
const (
Follower = 0
Candidate = 1
Leader = 2
)

这边,应该把选举过程重新go一个进程。选举是需要定期发起的(除非收到leader的heartbeat),不妨设置成无限循环,每150ms发起一次(之后会调整)。

记录下开始的时间startTime,等待150ms。使用一个lastReceiveTime属性,记录在这150ms内,rf是否收到了heartbeat(可能还有别的操作,后续补充)。如果收到了,那么就不会发起选举。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
func (rf *Raft) StartElection() {
for {
electionTimeout := rand.Intn(150)
startTime := time.Now()
time.Sleep(time.Duration(electionTimeout) * time.Millisecond) // 每150s一次
rf.mu.Lock()
if atomic.LoadInt32(&rf.dead) == Dead { // 加一个节点是否挂了的判断。如果Kill()一个节点,rf.dead就会被设为1
rf.mu.Unlock()
return
}
if rf.lastReceiveTime > startTime {
continue
}
if rf.state != Leader {
rf.convertToCandidate()
args := RequestVoteArgs{
Term: rf.currentTerm,
CandidateId: rf.me,
}
numVote := 1 // 收到的选票数
// 开始向每个节点发送投票请求。
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
go func(peerId int) { // 这里必须单独go一个进程,因为rpc需要时间
replay := RequestVoteReply{}
ok := sendRequestVote(i, args, reply)
if !ok {
return
}
rf.mu.Lock()
if replay.Term > rf.currentTerm {
rf.convertToFollower(replay.Term) // 同步到该节点的任期
return
}
if replay.VoteGranted {
numVote++
if numVote > len(rf.peers)/2 && rf.state == Candidate {
rf.convertToLeader()
}
}
rf.mu.Unlock()
} (i)
}
}
rf.mu.Unlock()
}
}

这里有一步转变状态可以做一定的封装增强可读性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (rf *Raft) convertFollower(newTerm int) {
rf.state = Follower
rf.currentTerm = newTerm
rf.votedFor = -1
rf.lastReceiveTime = time.Now()
}

func (rf *Raft) convertToCandidate() {
rf.state = Candidate
rf.currentTerm++ // 增加自己的任期
rf.votedFor = rf.me // 投票给自己
}

func (rf *Raft) convertToLeader() {
rf.state = Leader
rf.leaderId = rf.me
}

sendRequestVote()

这里,我们的RPC请求由 sendRequestVote 封装了 RequestVote,还要实现一下。

整理好args和reply的参数即可。强调一下任期这一概念,本身就是通过节点间的共识推进的,所以落后就代表着某个节点错过了一次的leader统筹,低任期无条件向高任期的节点投降。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (2A, 2B).
rf.mu.Lock()
reply.Term = rf.currentTerm
if args.Term < rf.currentTerm {
reply.VoteGranted = false
} else {
if args.Term > rf.currentTerm { // 如果别人的任期更大,那自己必不可能竞争过,直接投降变为follower
rf.convertToFollower(args.Term)
}
if rf.votedFor == -1 {
rf.votedFor = args.CandidateId
reply.VoteGranted = true
}
}
rf.mu.Unlock()
}

测试框架逻辑

只能说go的调用确实比较复杂,还是python用太多了。

啊首先要解释一下,我们编译运行的是 go test -run 2A,这是go的test框架的约定,意思是,它会执行当前文件夹下的一个以 “_test” 结尾的go文件中的所有 “以Test开头,以2A结尾” 的函数。很神奇吧

我们简单看一下 test_test.go 中的 TestInitialElection2A函数

它会调用 config.go 中的 make_config 函数,来初始化节点状态保存到cfg变量中。

make_config 中,主要一步是 start1 ,作用是初始化一个raft节点,其中就调用了我们在 raft.go 中写的 Make 函数:

1
2
3
4
rf := Make(ends, i, cfg.saved[i], applyCh)
cfg.mu.Lock()
cfg.rafts[i] = rf
cfg.mu.Unlock()

嗯,后面等会再看。