6.824 lab1

[TOC]

PART1. Map/Reduce input and output

在test_test.go已经提供了TestSequentialSingle和TestSequentialMany,只需要填充相关的代码并运行如下命令:

1
env "GOPATH=$PWD/../../" go test -v -run Sequential

即可验证是否测试成功。其中test_test.go已经为你写好了MapFunc, ReduceFunc,需要做的就是完成Sequential函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func TestSequentialSingle(t *testing.T) {
mr := Sequential("test", makeInputs(1), 1, MapFunc, ReduceFunc)
mr.Wait()
check(t, mr.files)
checkWorker(t, mr.stats)
cleanup(mr)
}

func TestSequentialMany(t *testing.T) {
mr := Sequential("test", makeInputs(5), 3, MapFunc, ReduceFunc)
mr.Wait()
check(t, mr.files)
checkWorker(t, mr.stats)
cleanup(mr)
}

其中这里的MapFunc只是简单地分割成KeyValue的方式,ReduceFunc只是return key

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Split in words
func MapFunc(file string, value string) (res []KeyValue) {
debug("Map %v\n", value)
words := strings.Fields(value)
for _, w := range words {
kv := KeyValue{w, ""}
res = append(res, kv)
}
return
}

// Just return key
func ReduceFunc(key string, values []string) string {
for _, e := range values {
debug("Reduce %s %v\n", key, e)
}
return ""
}

调用的Sequential方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func Sequential(jobName string, files []string, nreduce int,
mapF func(string, string) []KeyValue,
reduceF func(string, []string) string,
) (mr *Master) {
mr = newMaster("master")
go mr.run(jobName, files, nreduce, func(phase jobPhase) {
switch phase {
case mapPhase:
for i, f := range mr.files {
// 这里就是我们需要完善的函数了
doMap(mr.jobName, i, f, mr.nReduce, mapF)
}
case reducePhase:
for i := 0; i < mr.nReduce; i++ {
// 这里就是我们需要完善的函数了
doReduce(mr.jobName, i, mergeName(mr.jobName, i), len(mr.files), reduceF)
}
}
}, func() {
mr.stats = []int{len(files) + nreduce}
})
return
}

需要我们完善的代码

doMap的结构定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func doMap(
jobName string, // the name of the MapReduce job 比如test
mapTask int, // which map task this is 下标,比如1、2、3、4
inFile string, // 输入的文件名字
nReduce int, // the number of reduce task that will be run ("R" in the paper)
mapF func(filename string, contents string) []KeyValue,
) {
//1.read file content
// 读取数据
data, err := ioutil.ReadFile(inFile)
if err != nil {
fmt.Println("File reading error", err)
return
}
//调用mapF分割成{w, ""}的方式
kvResult := mapF(inFile,string(data))
//3.generate intermediate files

// 分割成多个文件
intermediateFiles := make([]*os.File,nReduce)
for i := 0; i < nReduce; i++ {
// 根据nReduce来创建相应的文件
//reduceName()是提供给我们的函数
intermediateFiles[i],err = os.Create(reduceName(jobName,mapTask,i))
if err != nil {
log.Fatal("create file failed", reduceName(jobName,mapTask,i))
}
}
//4.foreach keyValue,ihash the key mod nReduce , use json.NewEncoder to write into a correct file
// 根据不同kv.key hash到不同的文件
for _,kv := range kvResult {
enc := json.NewEncoder(intermediateFiles[ihash(kv.Key) % nReduce])
err := enc.Encode(&kv)
if err != nil {
log.Fatal("encode kv failed", kv)
}
}
//依次关闭file
// Remember to close the file after you have written all the values!
for _,file := range intermediateFiles {
file.Close()
}
}

doReduce

方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func doReduce(
jobName string, // the name of the whole MapReduce job
reduceTask int, // which reduce task this is
outFile string, // write the output here
nMap int, // the number of map tasks that were run ("M" in the paper)
reduceF func(key string, values []string) string,
) {
//step 1 open every intermediate file to read key value into memory
keyValues := make(map[string][]string)
for i := 0; i < nMap; i++ {
// 打开先前创建好的文件
file,err := os.Open(reduceName(jobName,i,reduceTask))
if err != nil {
log.Fatal("create intermediate file failed", reduceName(jobName,i,reduceTask))
}
var kv KeyValue
dec := json.NewDecoder(file)
//把文件的键值赋予kv
err = dec.Decode(&kv)
for err == nil {

keyValues[kv.Key] = append(keyValues[kv.Key],kv.Value)
err = dec.Decode(&kv)
}
file.Close()
}
//step 2 sort key
var keys []string
for k := range keyValues {
keys = append(keys,k)
}

sort.Strings(keys)
//step 3 foreach key call reduceF then write to output
川江一个outFile
out, err := os.Create(outFile)
if err != nil {
log.Fatal("failed to create outfile",outFile)
}
// 写入到文件
enc := json.NewEncoder(out)
for _,k := range keys {
v := reduceF(k,keyValues[k])
err = enc.Encode(KeyValue{k,v})
if err != nil {
log.Fatal("failed to encode",KeyValue{k,v})
}
}
out.Close()
}

Part II: Single-worker word count

测试结果的命令

1
go run wc.go master sequential pg-*.txt

同样是调用以下函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func Sequential(jobName string, files []string, nreduce int,
mapF func(string, string) []KeyValue,
reduceF func(string, []string) string,
) (mr *Master) {
mr = newMaster("master")
go mr.run(jobName, files, nreduce, func(phase jobPhase) {
switch phase {
case mapPhase:
for i, f := range mr.files {
// 这里就是我们需要完善的函数了
doMap(mr.jobName, i, f, mr.nReduce, mapF)
}
case reducePhase:
for i := 0; i < mr.nReduce; i++ {
// 这里就是我们需要完善的函数了
doReduce(mr.jobName, i, mergeName(mr.jobName, i), len(mr.files), reduceF)
}
}
}, func() {
mr.stats = []int{len(files) + nreduce}
})
return
}

要填充的mapF和reduceF

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func mapF(filename string, contents string) []mapreduce.KeyValue {

//分割成wordList
wordList := strings.FieldsFunc(contents, func(c rune) bool {
return !unicode.IsLetter(c)
})
用一个map进行统计
retMap := make(map[string]int)
for _,word := range wordList {
retMap[word]++
}
var res []mapreduce.KeyValue
for k,v := range retMap {
res = append(res,mapreduce.KeyValue{k,strconv.Itoa(v)})
}
return res
}

//
// The reduce function is called once for each key generated by the
// map tasks, with a list of all the values created for that key by
// any map task.
//

//本来是reduceF(k,keyValues[k])
//直接循环统计就行了
func reduceF(key string, values []string) string {
total := 0
for _,cnt := range values {
cur,err := strconv.Atoi(cnt)
if err != nil {
log.Fatal("convert value to int failed",cnt)
}
total += cur
}
return strconv.Itoa(total)
}

Part III: Distributing MapReduce tasks

1
go test -run TestParallel

这个指令会调用mapreduce下的TestParallelBasic和TestParallelCheck

以TestParallerBasic为例子

1
2
3
4
5
6
7
8
9
10
11
12
13
func TestParallelBasic(t *testing.T) {
// 调用setup
mr := setup()
for i := 0; i < 2; i++ {
// RunWorker 连接master并注册自身的地址,等待被调度
go RunWorker(mr.address, port("worker"+strconv.Itoa(i)),
MapFunc, ReduceFunc, -1, nil)
}
mr.Wait()
check(t, mr.files)
checkWorker(t, mr.stats)
cleanup(mr)
}

setup()的代码

1
2
3
4
5
6
7
8
9
func setup() *Master {
// 创建文件
files := makeInputs(nMap)
//创建master服务
master := port("master")
//调用Distribued方法
mr := Distributed("test", files, nReduce, master)
return mr
}

Distribued的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Distributed schedules map and reduce tasks on workers that register with the
// master over RPC.
func Distributed(jobName string, files []string, nreduce int, master string) (mr *Master) {
mr = newMaster(master)
mr.startRPCServer()
go mr.run(jobName, files, nreduce,
func(phase jobPhase) {
ch := make(chan string)
go mr.forwardRegistrations(ch)
//这就是我们要实现的函数的功能
schedule(mr.jobName, mr.files, mr.nReduce, phase, ch)
},
func() {
mr.stats = mr.killWorkers()
mr.stopRPCServer()
})
return
}

然后构建大概思路,需要去监听registerChan上还没有可用的WORKER,如果TASK还有多,来一个就分配一个。

分配的时候,为了并发,需要单独开线程去发RPC,等回复。OK了之后,把TASK CNT -1,随后把这个WORKER重新放回registerChan

schedule代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
   wg := sync.WaitGroup{}
wg.Add(ntasks)
sendTaskToWorker := func(availWorker string,idx int) {
args := DoTaskArgs{jobName,mapFiles[idx],phase,idx,n_other}
call(addr,"Worker.DoTask",args,nil)
wg.Done()
registerChan <- availWorker
};
for i := 0; i < ntasks; i++ {
availWorker := <-registerChan
go sendTaskToWorker(availWorker,i)
}
wg.Wait()
fmt.Printf("Schedule: %v done\n", phase)

img

Part IV: Handling worker failures

其实就是WORKER会挂掉。挂掉的表现就是RPC响应超时。这个时候MASTER就需要找一个新的WORKER

那么其实我们就需要先拿到RPC的RESPONSE。如果是OK按照原来逻辑走。如果不OK,可能是网络超时,可能是节点挂了。
无论是啥我们都需要通知另一个线程,让他知道这个事情发生,他可以HANDLE.
从这点出发就会想到线程间通信在GO里是用CHANNEL来做。

那么我就设定一个TIMEOUT CH,如果一个任务失败了,我就通知这个CH 这个任务的ID号。

那么另外一个线程感知到了,就可以从这个CHANNEL里取到这个ID号,随后再找一个WORKER ADDR,把这个任务分配给这个WORKER去做。基于上述思路。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
wg := sync.WaitGroup{}
wg.Add(ntasks)
timeoutCh := make(chan int,ntasks)
sendTaskToWorker := func(addr string,idx int) {
args := DoTaskArgs{jobName,mapFiles[idx],phase,idx,n_other}
done := call(addr,"Worker.DoTask",args,nil)
if done {
wg.Done()
} else {
timeoutCh <- idx
}
registerChan <-addr
};
for i := 0; i < ntasks; i++ {
availWorker := <-registerChan
go sendTaskToWorker(availWorker,i)
}
go func() {
for {
idx := <-timeoutCh
availWorker := <-registerChan
go sendTaskToWorker(availWorker,idx)
}
}()
wg.Wait()
fmt.Printf("Schedule: %v done\n", phase)