Lab1 MapReduce
Paper
MapReduce: Simplified Data Processing on Large Clusters
MapReduce 将分布式系统处理数据的细节(并行、容错、局部性优化、负载均衡等)隐藏起来,提供一套简化方案,解决了在多台机器处理大量逻辑简单的计算(分布式计算)实现复杂的问题。
实现
本实验可以由 1 个 coordinator/master 调度若干个 worker。先完成所有 map 任务,生成中间键值对并将其存在中间文件后再进行 reduce 任务,最后将 reduce 任务的结果输出到文本。
需要我们修改的文件为 mr/*
- worker 通过 RPC 循环请求任务。
- coordinator 读取 8 个给定的文件分给不同的 worker 进行 map 任务,这部分如果喜欢也可以按固定 bytes 进行切割
但我懒得处理了。
- worker 读取文件并扔给
fmap
进行处理,生成中间键值对,计算字符串的 hash 后将key-intermediate_value
存在 mr-taskid-ihash(str)
里面。
fmap 函数通过编译生成的 *.so 调用
P.S.
看到一些同学是先 os.CreateTemp("", tmpFileName) 存在临时文件里面,等处理完了再 rename 成`mr-taskid-ihash(str)`。这样应该会更加规范一些,但不创建 tempfile 在 lab1 理论上应该也是可行的。
当文件没完全写完 worker 就 crash 时,这个任务会被 coordinator 发现并交给别的 worker 重做。因为用的是`os.Create()`,文件会被新的覆盖,直到 worker 成功完成任务为止。
如果逻辑允许两个 worker 同时处理一个任务的话,就需要创建 tempFile 再 rename 的方式,因为需要保证创建的(临时)文件内容不重复。
- map 任务全部完成后 coordinator 分配 NReduce 个 reduce 任务给不同的 worker 处理。
- worker 读取中间文件,并将
key-list(intermediate_value)
扔给freduce
处理成key-value
写入mr-out-ihash(str)
。
- coordinator 发现任务全都处理完后踢走 worker,并在若干秒后退出。
Tips
-
结构
-
go 可以用 const 和 iota 代替 enum。
-
WorkerInfo 在这个实验可以省略,不需要保留 worker 的状态,但我写了就不想删了。
-
本来还想往结构体内置一个 logger 的,但 RPC 调用需要 gob 序列化,而 logger 不知道为啥会序列化失败,不知道是啥 bug,以后再瞅瞅。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
const (
_ = iota
// MAP REDUCE WAIT: Task type
MAP
REDUCE
WAIT
// IDLE BUSY DONE: Task status || WorkerInfo status
IDLE
BUSY
DONE
)
type Coordinator struct {
NMap int // map 任务的个数
NReduce int // reduce 任务的个数
DMap int // 记录 map task 完成的个数
DReduce int // 记录 reduce task 完成的个数
WorkerId int // 为新的 worker 分配 workerIds
TaskDone map[int]bool // 记录 task 是否被完成
Map chan *Task // 待取出的 map task
Reduce chan *Task // 待取出的 reduce task
TaskMutex sync.Mutex // 用于任务分发相关数据的 mutex
DoneMutex sync.Mutex // 用于完成任务相关数据的 mutex
Mutex sync.Mutex // 用于其他细节的 mutex
}
type Task struct {
StartTime time.Time
NReduce int // 传给 worker,用于 ihash 的 mod
Type int // MAP REDUCE WAIT
Status int // IDLE BUSY DONE
FileName string
TaskId int
WorkerId int
}
type WorkerInfo struct {
WorkerId int
TaskId int
Type int
Status int
//Lg *log.Logger
//Why this error?
//gob: type log.Logger has no exported fields
//2023/01/31 14:34:13 RPC getWork failed
//exit status 1
}
// Map functions return a slice of KeyValue.
type KeyValue struct {
Key string
Value string
}
|
- RPC 写入 reply 的时候不能直接将 reply 指向一个新的地址,而是要修改 reply 地址的值。
1
2
3
4
5
6
7
8
9
10
11
12
|
//reply = &GetWorkReply{&Task{...}}
//上面的方法不会返回想要的 reply,因为 reply 指向的是新的 GetWorkReply 值在 rpc 服务器的指针。这部分客户端的内存和服务器的不一样,所以得到的是空值;下面的方法因为 reply 是共享内存的,所以服务器和客户端都可以访问 &reply。
//在 RPC 调用时,要避免传递指针参数,如果传递的参数是值类型,那么这个值将会被复制到另一个地址
*reply = GetWorkReply{Task{
StartTime: time.Now(),
Type: MAP,
Status: BUSY,
FileName: mapTask.FileName,
NReduce: c.NReduce,
WorkerId: args.WorkerId,
TaskId: mapTask.TaskId,
}} // equal to reply.Task = &Task{...}
|
- clash test,需要 coordinator 在 worker 宕机 10s 后为同一个任务重新分配 worker,这部分需要考虑一下注意一下 data race
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
go func(taskId int, filename string) {
select {
case <-time.After(12 * time.Second):
c.Mutex.Lock()
if !c.TaskDone[taskId] {
c.Mutex.Unlock()
dosomething
} else {
c.Mutex.Unlock()
dootherthing
}
// default:{
// }
// default 不能写,如果写了就会直接跳到 default,不会进入 12s 的 case
}
return
}(maptask.TaskId, mapTask.FileName)
|
- 并发编程细节
因为 MapReduce 的特性,map task 可以并发,reduce task 可以并发,但两种任务不能同时并发,所以需要等到 map task 跑完才可以进行 reduce task。如果同时读写同一个变量,需要加锁。其实还可以用 chan 实现 lock-free,以后试试,如果有时间的话。
- 随手写的 makefile
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
# location: src/Makefile
ntest := 50
lab1:
@cd main;\
for i in $$(seq 1 $(ntest)) ; do\
bash test-mr.sh > res$$i;\
done;\
cnt=0;\
for i in $$(seq 1 $$(expr $(ntest) - 1)); do\
if cmp res$$i res$$(expr $$i + 1); then \
cnt=$$(expr $$cnt + 1); \
fi;\
done;\
echo "same file num:"$$(expr $$cnt + 1) ;\
if [ $$cnt -eq $$(expr $(ntest) - 1) ]; then \
echo "****************************************************************";\
echo "all res is the same!";\
echo "result content as follow.";\
echo "****************************************************************";\
cat main/res1;\
echo "****************************************************************";\
fi
|
- result
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
same file num:50
****************************************************************
all res is the same!
result content as follow.
****************************************************************
*** Starting wc test.
--- wc test: PASS
*** Starting indexer test.
--- indexer test: PASS
*** Starting map parallelism test.
--- map parallelism test: PASS
*** Starting reduce parallelism test.
--- reduce parallelism test: PASS
*** Starting job count test.
--- job count test: PASS
*** Starting early exit test.
--- early exit test: PASS
*** Starting crash test.
--- crash test: PASS
*** PASSED ALL TESTS
****************************************************************
|