[TOC]
PART1. Map/Reduce input and output
在test_test.go已经提供了TestSequentialSingle和TestSequentialMany,只需要填充相关的代码并运行如下命令:
1 | env "GOPATH=$PWD/../../" go test -v -run Sequential |
即可验证是否测试成功。其中test_test.go已经为你写好了MapFunc, ReduceFunc,需要做的就是完成Sequential函数。
1 | func TestSequentialSingle(t *testing.T) { |
其中这里的MapFunc只是简单地分割成KeyValue的方式,ReduceFunc只是return key
1 | // Split in words |
调用的Sequential方法:
1 | func Sequential(jobName string, files []string, nreduce int, |
需要我们完善的代码
doMap的结构定义如下:
1 | func doMap( |
doReduce
方法:
1 | func doReduce( |
Part II: Single-worker word count
测试结果的命令
1 | go run wc.go master sequential pg-*.txt |
同样是调用以下函数
1 | func Sequential(jobName string, files []string, nreduce int, |
要填充的mapF和reduceF
1 | func mapF(filename string, contents string) []mapreduce.KeyValue { |
Part III: Distributing MapReduce tasks
1 | go test -run TestParallel |
这个指令会调用mapreduce下的TestParallelBasic和TestParallelCheck
以TestParallerBasic为例子
1 | func TestParallelBasic(t *testing.T) { |
setup()的代码
1 | func setup() *Master { |
Distribued的代码
1 | // Distributed schedules map and reduce tasks on workers that register with the |
然后构建大概思路,需要去监听registerChan上还没有可用的WORKER,如果TASK还有多,来一个就分配一个。
分配的时候,为了并发,需要单独开线程去发RPC,等回复。OK了之后,把TASK CNT -1,随后把这个WORKER重新放回registerChan
schedule代码实现
1 | wg := sync.WaitGroup{} |
Part IV: Handling worker failures
其实就是WORKER会挂掉。挂掉的表现就是RPC响应超时。这个时候MASTER就需要找一个新的WORKER
那么其实我们就需要先拿到RPC的RESPONSE。如果是OK按照原来逻辑走。如果不OK,可能是网络超时,可能是节点挂了。
无论是啥我们都需要通知另一个线程,让他知道这个事情发生,他可以HANDLE.
从这点出发就会想到线程间通信在GO里是用CHANNEL来做。
那么我就设定一个TIMEOUT CH,如果一个任务失败了,我就通知这个CH 这个任务的ID号。
那么另外一个线程感知到了,就可以从这个CHANNEL里取到这个ID号,随后再找一个WORKER ADDR,把这个任务分配给这个WORKER去做。基于上述思路。
1 | wg := sync.WaitGroup{} |