Lab page: http://nil.csail.mit.edu/6.824/2022/labs/lab-mr.html

Lab概述

正好这学期正好学校里要上”并行分布式系统”, 也有同学推荐我去学习这门MIT的课程,遂做一些尝试。

本次Lab1主要是实现一个简单的MapReduce框架,完成Coordinator和Worker的设计,实现一个Word Count程序。

全课程Lab主要采用go语言来实现,学习go语言语法也是开始本次Lab的第一步。

以下所有内容仅为个人记录所用,请依照课程要求独立完成Lab实验。

以上。记录一下本次的Lab1的实现思路。

一、安装go环境

上来首先先安装一下以前没用过的go环境。我是在实验室linux服务器上试的,windows系统就不太清楚了。。

首先下载go的安装包

1
wget https://go.dev/dl/go1.17.6.linux-amd64.tar.gz

然后,将其解压到 $HOME/local 下

之后需要将go命令加到PATH中

一种方法是 export PATH=$PATH:$HOME/local/go/bin ,但是这样是暂时的,下次启动就没有了

解决办法就是把这句话加到 .bashrc 中的最后即可

顺便使用 echo -e $PATH | tr ":" "\n" 可以查看当前的PATH

二、下载Lab框架

下载一下Lab的文件,看看有什么东西

1
git clone git://g.csail.mit.edu/6.824-golabs-2022 6.824

官方提供了一个sequential的MapReduce实现(src/main/mrsequential.go),我们可以先试着跑起来这个。

1
2
3
4
5
cd 6.824/src/main
go build -race -buildmode=plugin ../mrapps/wc.go
rm mr-out*
go run -race mrsequential.go wc.so pg*.txt
more mr-out-0

第一次用go可能还是比较懵的,简单介绍一下各步都做了什么。

go build就是编译go文件,以*.so的形式保存(我不知道说编译成为可执行文件准不准确)

-race 表示启用竞争检测,以避免可能的冲突

-buildmode=plugin 则表示将其编译为plugin,在 mrsequential.go 中,我们可以看到这一插件的用法(下面会介绍)

rm mr-out* 则是删除之前可能的输出文件,保证新一次的输出不会受到影响

go run -race mrsequential.go wc.so pg*.txt go run 就是运行,同样开启竞争检测。这次我们要传入wc.so中的函数,以及运行Word Count需要的所有文件名

最后,输出结果

注意在 wc.so 中是有 import "6.824/mr"(也就是需要我们编写的部分,所以记得每次写完测试时都要重新编译wc.so)

如何使用plugin

首先需要 import "plugin"

然后打开.so文件,读取其中的函数 p, err := plugin.Open(filename) ,其中filename是在命令行中传递的参数,也就是.so文件的名称

然后就可以使用 xmapf, err := p.Lookup(FuncName) 来调用定义在.so文件中的function了,比如说调用map函数: xmapf, err := p.Lookup("Map")

初次读进来的xmapf似乎是一种独特的类型,还需要转换该函数的形式,设置正确的形参和返回值类型

例:mapf := xmapf.(func(string, string) []mr.KeyValue)

mrapps

可以看到,mrapps文件夹下的各个文件都包含一个map函数和一个reduce函数,不同的函数实现可以让程序实现不同的功能,包括但不限于Word Count

三、Start

一开始肯定是很懵的,总之先试着看懂文件的结构。可以参照 mrsequential.go 的逻辑,这个是非常容易理解的。

第一步就是用mapf函数把文件拆成intermediate,然后将其按照键值排序,我们就把这个作为第一步。

当然在这之前,worker还需要向coordinator请求一个任务,因此还需要我们发送rpc请求,在mr/worker.go中,给我们展示了如何发送rpc请求,可以参考一下。简单来说就是调call函数,有三个参数,第一个是指定使用coordinator的哪个函数,第二个是传过去的参数,第三个是希望收到的回复。

暂时还不知道怎么设计,一步一步慢慢来,假设我们coordinator有一个handler函数,我们需要向其请求一个map任务。

这边有一个我不是很清楚的点,因为看论文,是需要将文件拆分成一个一个块的。这里我们就直接把文件名返回给worker,让worker去做读取这件事。

参考CallExample()

1
2
3
4
5
6
7
8
9
// worker.go/func Worker
// ask for a job
ok := call("Coordinator.Handler", &args, &reply)
if ok {
// reply.Y should be 100.
fmt.Printf("call success, Job name: %s\n", reply.Filename)
} else {
fmt.Printf("call failed!\n")
}

这里要考虑的就是怎么设计reply。因为Worker有可能做map,有可能做reduce,所以首先需要的就是返回任务的类型,然后还有需要处理的文件名称。

1
2
3
4
5
// rpc.go
type RPCReply struct {
JobType string
Filename string
}

然后考虑Coordinator这边,Coordinator应该需要知道,自己要给worker分配什么任务,包括任务类型和任务内容。这就需要Coordinator维护一个任务列表,以及一个任务状态。

1
2
3
4
5
type Coordinator struct {
// Your definitions here.
FilenameList []string
JobType string
}

在Coordinator初始化的时候,它就应该拥有一个map任务列表,且当前的任务状态是”map”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{
JobType: "map", // 一开始做map操作
}

// Your code here.
for _, filename := range files {
c.FilenameList = append(c.FilenameList, filename)
fmt.Printf("我读取了文件:%s\n", filename)
}

c.server() // 注意这会创建一个新的go routine,导致可能的race
return &c
}

race问题之后会说明,此处暂时略过。然后就是具体的Handler()函数,在接收到请求后将任务发还给worker即可。如果map任务发完了就将状态改为”reduce”

1
2
3
4
5
6
7
8
9
10
11
12
13
func (c *Coordinator) Handler(args *ExampleArgs, reply *RPCReply) error {
if c.JobType == "map" {
reply.JobType = "map"
reply.Filename = c.FilenameList[0]
c.FilenameList = c.FilenameList[1:] // 关于这一步的性能问题,可以参见 https://zhuanlan.zhihu.com/p/430888116
if len(c.FilenameList) == 0 {
c.JobType = "reduce"
}
} else {
fmt.Printf("not implemented yet\n")
}
return nil
}

现在只是简单地测试一下rpc是否正确,启动一下看看。需要开两个terminal

1
2
3
4
5
6
7
// 第一个terminal
go build -race -buildmode=plugin ../mrapps/wc.go
rm mr-out*
go run -race mrcoordinator.go pg-*.txt

// 第二个terminal
go run -race mrworker.go wc.so

如果顺利的话,worker应该正确接收到了一个文件的map任务

不过,coordinator这边会被race检测器检测到问题。检查mrcoordinator.go文件,它需要不断地检查 m.Done()来判断Coordinator是否完成了所有的mapreduce任务;而我们在coordinator.go中,因为启动了server会创建一个新的go routine,这其中又在不断地修改我们的Coordinator的状态,这其中就有可能产生冲突。

一个办法就是给需要访问Coordinator状态的地方加上锁,锁的用法在MIT Lecture2的Crawler.go中有介绍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
type Coordinator struct {
// ...
mu sync.Mutex // 加一个锁来维护
}

func (c *Coordinator) Handler(args *ExampleArgs, reply *RPCReply) error {
c.mu.Lock()
if c.JobType == "map" {
reply.JobType = "map"
reply.Filename = c.FilenameList[0]
c.FilenameList = c.FilenameList[1:]
if len(c.FilenameList) == 0 {
c.JobType = "reduce"
}

} else {
fmt.Printf("not implemented yet\n")
}
c.mu.Unlock()
return nil
}

func (c *Coordinator) Done() bool {
ret := false

// Your code here.
c.mu.Lock()
if len(c.FilenameList) == 0 {
fmt.Printf("map completed\n")
ret = true
}
c.mu.Unlock()

return ret
}

再次启动可以发现没有race问题了。

当然以上仅是最简单的情况。我们可以以此为基础,整理一下我们接下来要做的事:

  • worker在收到map任务后的处理逻辑,需要应用map function,然后按键排序,最后写入到一个文件中。

  • 完成任务后应当给Coordinator一个反馈,表示已经完成了任务,这样Coordinator可以将intermediate文件加入到它的reduce任务列表中。这要求Coordinator在分配任务的时候分配一个工作id,以唯一指示。

  • worker还需要能够处理reduce任务,逻辑同 mrsequential.go

至此,思路已经很清晰了,无非是一边熟悉go语言一边完成Lab。

四、map任务

把逻辑优化一下,整一个专门的WorkerMap()函数来做这件事。需要传什么参数写着写着就知道了,对应修改结构体即可。

一个神奇的点是我发现自己发送的nReduce字段无法被worker接受到,最后才发现,原来RPC只发送字段名以大写字母开头的结构体字段,这一点Hints里也提到了。根据chatgpt说小写字母开头的是私有字段,在其他包中是无法访问的。

读文件,map,sort,和之前一样

随后,根据Lab的Hints,为了避免程序写到一半中断,出现一个文件里面内容不全的情况,我们需要使用临时文件,待完全写入后再将其命名为目标文件。这样能保证保存下来的文件内容是全的。包括写入文件也需要使用Hints中提到的 encoding/json

所以我们创建nReduce个临时文件,以及对应的json encoder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// create temporary files and json encoder
var tempFiles []*os.File
var fileEncs []*json.Encoder

for i := 0; i < nReduce; i++{
// tmp files
tmpFile, err := ioutil.TempFile(".", "temp-intermediate-*")
if err != nil {
fmt.Printf("Failed to create temporary file %d: %v\n", i, err)
continue
}
fmt.Printf("Create temporary file %d: %s\n", i+1, tmpFile.Name())
tempFiles = append(tempFiles, tmpFile)

// json encoder
enc := json.NewEncoder(tmpFile) // 和对应文件输出流绑定
fileEncs = append(fileEncs, enc)
}

这里我一开始写的是ioutil.TempFile(“”, “temp-intermediate-*”),结果报错:invalid cross-device link,估计直接保存到服务器的tmp目录下去了?可能是跟我们实验室服务器的硬盘配置有关,把临时文件设到当前目录下就可以解决了。

然后在保存的时候,每个键值对具体保存到哪个文件,需要使用给出的ihash()函数进行映射,而不是按顺序存储

1
2
3
4
5
6
7
8
9
10
// save the intermediate
total_len := len(intermediate)
for i := 0; i < total_len; i++{
index := ihash(intermediate[i].Key) % nReduce
enc := fileEncs[index]
err := enc.Encode(&intermediate[i])
if err != nil {
fmt.Printf("json encode error\n")
}
}

最后,把临时文件重新命名保存为正式文件,保证删除临时文件。保存的中间intermediate文件名为”map-{JobId}-{i}”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// rename temporary files to formal files
for i, file := range tempFiles {
oname := fmt.Sprintf("map-%d-%d", JobId, i)
err = os.Rename(file.Name(), oname)
if err != nil {
fmt.Printf("Failed to rename temporary file %d: %v\n", i, err)
}
file.Close()
}

// delete temporary files
for _, file := range tempFiles {
defer os.Remove(file.Name())
}

只能说在尝试创建临时文件的时候遇到了各种各样的问题,,最后这是一个我个人可行的方法。

五、轮询 & 优化逻辑

正确到这里,我们启动coordinator进程后,每启动一个worker进程,它就会向coordinator发送一次rpc请求,得到一个任务以及对应的文件名,读取后将其分解为中间文件并保存。当然,我们希望worker能够持续的工作而不是做一次之后就停止,因此加一个循环是合理的。

再简单地加完循环后,我们会发现在coordinator的设计上还存在许多问题。首先,我们把取任务和完成任务的逻辑写在同一个handler接口里,这是不对的,这样直到派出去的任务完成返回前,coordinator的任务清单都不会更新。

比较容易想到的思路是,拆成两个分配任务和完成任务的接口函数,前者将任务列表中的任务提取到一个临时任务列表中。而完成任务接口收到信息后,就将其从临时任务列表删除,否则,若超时(即有可能派出去的任务没能被完成,worker中断了或是怎么),就将该任务再放回任务列表,交给其它worker工作

开写,简单地贴一些代码吧。比如说现在worker向coordinator传的arg需要包含jobid和完成情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (c *Coordinator) JobDone(args *RPCArgs, reply *RPCReply) error {
c.mu.Lock()
if args.DoneSituation == 0 {
c.FilenameList = append(c.FilenameList, c.TempMissionMap[args.JobId])
delete(c.TempMissionMap, args.JobId)
} else if args.DoneSituation == 1 {
delete(c.TempMissionMap, args.JobId)
c.ReduceList = append(c.ReduceList, args.JobId)
if len(c.FilenameList) == 0 && len(c.TempMissionMap) == 0 {
c.JobType = "reduce"
}
}
c.mu.Unlock()
return nil
}

然后worker这边加个循环,简单地设了一个超时检测(处理worker崩溃的情况)(实际上这个超时检测是完全错误,非常难蚌,之后测试的时候我才反应过来)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
for {
ok := call("Coordinator.JobAssign", &args, &reply)
if ok {
fmt.Printf("call success, Job name: %s, Job id: %d\n", reply.Filename, reply.JobId)
} else {
fmt.Printf("call failed!\n")
}

if reply.JobType == "map" {
startTime := time.Now()
WorkerMap(reply.Filename, reply.NReduce, reply.JobId, mapf)
elapsedTime := time.Since(startTime)
if elapsedTime >= 5 * time.Second { // 设置超时时间为5s
args.JobId = reply.JobId
args.DoneSituation = 0
call("Coordinator.JobDone", &args, &reply) // 没有处理返回值
} else {
args.JobId = reply.JobId
args.DoneSituation = 1
call("Coordinator.JobDone", &args, &reply)
}
} else if reply.JobType == "reduce" {
fmt.Printf("map job has done\n")
break
}
time.Sleep(5 * time.Second)
}

六、Reduce

参考sequential,一样写reduce的代码。

到这边应该已经驾轻就熟了,主要就是传什么参数让worker能定位到文件。注意一次reduce任务处理的是所有名称为map-{0~filenums}-reduceid的,所以首先rpc参数中需要包含filenums, 以及本次reduce任务的id

还是有问题的,遇到了一个有意思的问题,就是第一次reduce任务的JobId获取的不对,让我debug一下。

结果发现coordinator那边给的是正确的,但是worker这边接收的是错的,我直接问号。

最后反应过来去看lab页面中的hints,原来调用rpc请求的时候,每次传的reply都要清空,否则会有奇怪的错误。清空完以后就ok了。

随便贴一些我觉得重要的代码吧,记得把nReduce个文件中的键值对kva提取整合到intermediate,要做一次排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func WorkerReduce(ReduceId int, nReduce int, JobId int, Filenums int,
reducef func(string, []string) string) {
// 读取的文件格式: map-{0~Filenums-1}-{ReduceId}

intermediate := []KeyValue{}
for iter := 0; iter<Filenums; iter++ {
fileName := fmt.Sprintf("map-%s-%s", strconv.Itoa(iter), strconv.Itoa(ReduceId))
file, err := os.Open(fileName)
if err != nil {
log.Fatalf("cannot open %v", fileName)
}
kva := []KeyValue{}

dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv) // 提取得到该文件中的所有键值对
}
intermediate = append(intermediate, kva...)
file.Close()
}

oname := fmt.Sprintf("mr-out-%s", strconv.Itoa(ReduceId))
ofile, _ := os.Create(oname)

sort.Sort(ByKey(intermediate))

i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)

// this is the correct format for each line of Reduce output.
fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)

i = j
}

ofile.Close()
}

随后测试输出:cat mr-out-* | sort | more

但是非常奇怪的是,好像sort失效了,没有正确按照字典序排???数量是对的

1
2
3
4
a 13382
A 509
Ab 3
...

这个问题太奇怪了,一通询问GPT后,它告诉我可以指定export LC_COLLATE=C,结果居然就对了,,不知道原来这个环境变量值是什么。我很担心一会儿测试的时候结果能不能对。

为了让程序正常退出,就在最后给worker发送一个”Done”消息,然后Coordinator这边我就简单粗暴地等一会儿,再退出(以免worker最后一次请求”Done”消息前Coordinator已经退出)。只能说不太严谨,出问题再看吧。

七、最后的Debug

运行一下测试看看,结果如下:bash test-mr.sh

测试 结果 报错信息
wc test FAIL sort: cannot read: ‘mr-out*’
indexer test PASS /
map parallelism test PASS /
reduce parallelism test PASS BUT:call failed!dialing:dial unix /var/tmp/824-mr-3149: connect: connection refused
job count test PASS /
early exit test FAIL output changed after first worker exited
crash test FAIL crash output is not the same as mr-correct-crash.txt

总之就是非常惨烈,只能再debug一下了。

wc test

看了报错信息是因为map阶段报错了,取任务的时候取了空列表。这时我才反应过来,我要等到取了最后一个任务的worker返回才会改变coordinator的状态,但是此时其它worker还会取任务,这就会报错了。

所以在worker取空集的情况下,我先加一个判断,如果已经空了,就返回一个sleep信息,让worker休息1s,这样就能正确等到Coordinator状态改变了。

然而,在多次测试的时候遇到了有些map任务丢了的情况?再debug一下,感觉可能是超时时间的问题,程序是不会出错的,只不过有可能有的任务做的慢了点,超时时间开大就可以了。

early-exit test

看一下bash文件能知道这一项测试的测试逻辑:

1
2
3
4
5
6
7
8
9
# a process has exited. this means that the output should be finalized
# otherwise, either a worker or the coordinator exited early
sort mr-out* | grep . > mr-wc-all-initial

# wait for remaining workers and coordinator to exit.
wait

# compare initial and final outputs
sort mr-out* | grep . > mr-wc-all-final

简单来说就是应该是一个进程退出就应该说明整个任务完成了

经过debug,我发现自己出了一个很逆天的bug,Coordinator在每次给完任务后会go一个新的判断函数,如果过了一定时间,这个任务还没完成,就判定worker出错了。

1
2
3
4
5
6
7
8
9
10
go func(jobID int) {
time.Sleep(5 * time.Second)
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.TempMissionMap[jobID]; ok {
fmt.Printf("%d map job 超时\n", jobID)
c.FilenameList = append(c.FilenameList, c.TempMissionMap[jobID])
delete(c.TempMissionMap, jobID)
}
}(reply.JobId)

然而,我因为偷懒,所以把map任务的临时列表和reduce任务的临时列表用的是一个。结果!map任务时发起的判断函数,竟然在reduce阶段造成了问题,误删了任务。。很难蚌。

重新给reduce任务单独开一个列表就通过了。

crash test

我发现是我对超时检测的理解出了问题。超时检测应该是在coordinator端进行的,否则worker端crash了,根本就不会触发那边的超时检测,传一个错误信息回来什么的。我那个超时就很呆,完全没有一点作用,应该删掉的。

利用我们之前的临时任务列表,我们需要再写一个周期执行的函数,判断它们距任务发布时间过去多久了。

或者,一个比较好的设计是,发送一个rpc请求后,它会发起另一个函数,比如说10s后是否接收到回复。

是否接收到回复我们只要看临时任务列表中对应的任务有没有被删去就可以了,复用一下结构

将类似以下的函数插在coordinator的rpc回复之后就可以了。它会在10s后检测有没有被正确删去,如果没有,那就说明这个worker爆了,把任务再返回任务列表。

1
2
3
4
5
6
7
8
9
go func(jobID int) {
time.Sleep(10 * time.Second)
c.mu.Lock()
defer c.mu.Unlock()
if _, ok := c.TempMissionMap[jobID]; ok {
c.FilenameList = append(c.FilenameList, c.TempMissionMap[jobID])
delete(c.TempMissionMap, jobID)
}
}(reply.JobId)

但是还是不对,我debug后发现是文件命名的问题,,我在map阶段对中间文件的命名是 map-{JobId}-{0~nReduce-1} ,而reduce的时候读的文件名却是 map-{0~Filenums-1}-{ReduceId}

感觉就记录一下map成功的jobid,然后传给reduce?或者其实应该使用通配符,直接去匹配 map-*-{ReduceId} ,这样肯定更好,问问gpt有没有这种方法。

gpt提供了一个库方法 path/filepath ,可以使用 files, err := filepath.Glob(pattern) 去匹配,修改完后通过测试。

最后还有两个小小的问题:

一个是测试的bash脚本中,每做完一项任务就会把生成的文件 mr-* 删了,但是我的map文件命名的是map-X-Y,导致删不掉就会影响下次的结果。把中间文件名换了就可以了。

还有就是crash-test,可以看到它会让程序随机delay一段时间,可能长达10s,所以我们的超时检测时间需要开的长一点。

1
2
3
4
5
6
} else if rr.Int64() < 660 {
// delay for a while.
maxms := big.NewInt(10 * 1000)
ms, _ := crand.Int(crand.Reader, maxms)
time.Sleep(time.Duration(ms.Int64()) * time.Millisecond)
}

八、复盘

最后展示一下通过截图吧:

虽然是很不容易地做完了,但是感觉还是有很多做的不太好的地方。

现在想想感觉应该维护一个worker状态的,每个worker也分配一个worker ID,这样便于管理。也不用临时任务列表了,直接维护worker列表就可以了(同时记录每个worker分配的任务)。唉,只能说开始做的时候还是太仓促了。

Appendix

论文链接:http://research.google.com/archive/mapreduce-osdi04.pdf

代码链接:后续会放到我的github上