6.824 Lab1

Lab1 MapReduce

Paper

MapReduce: Simplified Data Processing on Large Clusters

MapReduce 将分布式系统处理数据的细节(并行、容错、局部性优化、负载均衡等)隐藏起来,提供一套简化方案,解决了在多台机器处理大量逻辑简单的计算(分布式计算)实现复杂的问题。

实现

本实验可以由 1 个 coordinator/master 调度若干个 worker。先完成所有 map 任务,生成中间键值对并将其存在中间文件后再进行 reduce 任务,最后将 reduce 任务的结果输出到文本。

需要我们修改的文件为 mr/*

  1. worker 通过 RPC 循环请求任务。
  2. coordinator 读取 8 个给定的文件分给不同的 worker 进行 map 任务,这部分如果喜欢也可以按固定 bytes 进行切割但我懒得处理了
  3. worker 读取文件并扔给fmap进行处理,生成中间键值对,计算字符串的 hash 后将key-intermediate_value存在 mr-taskid-ihash(str) 里面。

fmap 函数通过编译生成的 *.so 调用

P.S.
看到一些同学是先 os.CreateTemp("", tmpFileName) 存在临时文件里面,等处理完了再 rename 成`mr-taskid-ihash(str)`。这样应该会更加规范一些,但不创建 tempfile 在 lab1 理论上应该也是可行的。

当文件没完全写完 worker 就 crash 时,这个任务会被 coordinator 发现并交给别的 worker 重做。因为用的是`os.Create()`,文件会被新的覆盖,直到 worker 成功完成任务为止。

如果逻辑允许两个 worker 同时处理一个任务的话,就需要创建 tempFile 再 rename 的方式,因为需要保证创建的(临时)文件内容不重复。
  1. map 任务全部完成后 coordinator 分配 NReduce 个 reduce 任务给不同的 worker 处理。
  2. worker 读取中间文件,并将key-list(intermediate_value)扔给freduce处理成key-value 写入mr-out-ihash(str)
  3. coordinator 发现任务全都处理完后踢走 worker,并在若干秒后退出。

Tips

  1. 结构

    • go 可以用 const 和 iota 代替 enum。

    • WorkerInfo 在这个实验可以省略,不需要保留 worker 的状态,但我写了就不想删了。

    • 本来还想往结构体内置一个 logger 的,但 RPC 调用需要 gob 序列化,而 logger 不知道为啥会序列化失败,不知道是啥 bug,以后再瞅瞅。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
const (
	_ = iota
	// MAP REDUCE WAIT: Task type
	MAP
	REDUCE
	WAIT
	// IDLE BUSY DONE: Task status || WorkerInfo status
	IDLE
	BUSY
	DONE
)

type Coordinator struct {
	NMap      int          // map 任务的个数
	NReduce   int          // reduce 任务的个数
	DMap      int          // 记录 map task 完成的个数
	DReduce   int          // 记录 reduce task 完成的个数
	WorkerId  int          // 为新的 worker 分配 workerIds
	TaskDone  map[int]bool // 记录 task 是否被完成
	Map       chan *Task   // 待取出的 map task
	Reduce    chan *Task   // 待取出的 reduce task
	TaskMutex sync.Mutex   // 用于任务分发相关数据的 mutex
	DoneMutex sync.Mutex   // 用于完成任务相关数据的 mutex
	Mutex     sync.Mutex   // 用于其他细节的 mutex
}

type Task struct {
	StartTime time.Time
	NReduce   int // 传给 worker,用于 ihash 的 mod
	Type      int // MAP REDUCE WAIT
	Status    int // IDLE BUSY DONE
	FileName  string
	TaskId    int
	WorkerId  int
}

type WorkerInfo struct {
	WorkerId int
	TaskId   int
	Type     int
	Status   int
	//Lg       *log.Logger
	//Why this error?
	//gob: type log.Logger has no exported fields
	//2023/01/31 14:34:13 RPC getWork failed
	//exit status 1
}

// Map functions return a slice of KeyValue.
type KeyValue struct {
	Key   string
	Value string
}

  1. RPC 写入 reply 的时候不能直接将 reply 指向一个新的地址,而是要修改 reply 地址的值。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
      //reply = &GetWorkReply{&Task{...}}
		//上面的方法不会返回想要的 reply,因为 reply 指向的是新的 GetWorkReply 值在 rpc 服务器的指针。这部分客户端的内存和服务器的不一样,所以得到的是空值;下面的方法因为 reply 是共享内存的,所以服务器和客户端都可以访问 &reply。
		//在 RPC 调用时,要避免传递指针参数,如果传递的参数是值类型,那么这个值将会被复制到另一个地址
		*reply = GetWorkReply{Task{
			StartTime: time.Now(),
			Type:      MAP,
			Status:    BUSY,
			FileName:  mapTask.FileName,
			NReduce:   c.NReduce,
			WorkerId:  args.WorkerId,
			TaskId:    mapTask.TaskId,
		}} // equal to reply.Task = &Task{...}
  1. clash test,需要 coordinator 在 worker 宕机 10s 后为同一个任务重新分配 worker,这部分需要考虑一下注意一下 data race
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
go func(taskId int, filename string) {
	select {
	case <-time.After(12 * time.Second):
		c.Mutex.Lock()
      if !c.TaskDone[taskId] {
         c.Mutex.Unlock()
         dosomething
      } else {
         c.Mutex.Unlock()
         dootherthing
      }   
   // default:{
   // }
   // default 不能写,如果写了就会直接跳到 default,不会进入 12s 的 case
   }
   return 
}(maptask.TaskId, mapTask.FileName)
  1. 并发编程细节

因为 MapReduce 的特性,map task 可以并发,reduce task 可以并发,但两种任务不能同时并发,所以需要等到 map task 跑完才可以进行 reduce task。如果同时读写同一个变量,需要加锁。其实还可以用 chan 实现 lock-free,以后试试,如果有时间的话。

  1. 随手写的 makefile
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
# location: src/Makefile
ntest := 50
lab1:
	@cd main;\
    	for i in $$(seq 1 $(ntest)) ; do\
    		bash test-mr.sh > res$$i;\
    	done;\
  	cnt=0;\
	for i in $$(seq 1 $$(expr $(ntest) - 1)); do\
		if cmp res$$i res$$(expr $$i + 1); then \
			cnt=$$(expr $$cnt + 1); \
		fi;\
	done;\
	echo "same file num:"$$(expr $$cnt + 1) ;\
	if [ $$cnt -eq $$(expr $(ntest) - 1) ]; then \
	  	echo "****************************************************************";\
		echo "all res is the same!";\
		echo "result content as follow.";\
	  	echo "****************************************************************";\
		cat main/res1;\
	  	echo "****************************************************************";\
	fi
  1. result
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
same file num:50
****************************************************************
all res is the same!
result content as follow.
****************************************************************
*** Starting wc test.
--- wc test: PASS
*** Starting indexer test.
--- indexer test: PASS
*** Starting map parallelism test.
--- map parallelism test: PASS
*** Starting reduce parallelism test.
--- reduce parallelism test: PASS
*** Starting job count test.
--- job count test: PASS
*** Starting early exit test.
--- early exit test: PASS
*** Starting crash test.
--- crash test: PASS
*** PASSED ALL TESTS
****************************************************************
updatedupdated2024-04-302024-04-30