参考MapReduce的单进程串行执行,实现单机多进程的版本
mr-01.PNG
单机多进程版本:
Cooradinator需要有以下功能
1、在启动时根据指定的输入文件数及Reduce Task数,生成MapTask及ReduceTask
2、响应Worker的Task申请RPC请求,分配可用的Task给到Worker
3、跟踪Task的完成情况,在所有MapTask完成后进入Reduce阶段,开始派发ReduceTask,在所有ReduceTask
完成后标记任务已经完成并退出
4、在Task超时未完成时,将Task重新分配给Worker执行
Worker需要有以下功能
1、在空闲时,通过RPC向Coordinator申请Task并执行
2、执行完后,通知Coordinator。
代码实现
RPC
type Task struct {
FileName string
Index int
WorkerID string
StartTime time.Time
}
type ExampleArgs struct {
X int
}
type ExampleReply struct {
Y int
}
// Add your RPC definitions here.
type WorkerArgs struct {
WorkerID string
TaskIndex int
}
type WorkerReply struct {
TaskIndex int
FileName string
TaskType int
NReduce int
NMap int
}
Coordinator
type Coordinator struct {
// Your definitions here.
nReduce int // number of reduce task
nMap int // number of map task
phase int // 当前任务阶段 0 Map 1 Reduce 2 Done
lock sync.Mutex
mapTasks chan Task
reduceTasks chan Task
tasks map[string]Task
}
// Your code here -- RPC handlers for the worker to call.
//
// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
//
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
reply.Y = args.X + 1
return nil
}
func (c *Coordinator) ReceiveFinishedMap(args *WorkerArgs, reply *WorkerReply) error {
c.lock.Lock()
delete(c.tasks, GenTaskID(c.phase, args.TaskIndex))
if len(c.tasks) == 0 {
log.Printf("All map tasks is finished")
close(c.mapTasks)
c.phase = 1
// 生成Reduce Task
for i := 0; i < c.nReduce; i++ {
task := Task {
Index: i,
}
c.tasks[GenTaskID(c.phase, task.Index)] = task
c.reduceTasks <- task
}
}
c.lock.Unlock()
return nil
}
func (c *Coordinator) ReceiveFinishedReduce(args *WorkerArgs, reply *WorkerReply) error {
c.lock.Lock()
delete(c.tasks, GenTaskID(c.phase, args.TaskIndex))
if len(c.tasks) == 0 {
log.Printf("All reduce tasks is finished")
close(c.reduceTasks)
c.phase = 2
}
c.lock.Unlock()
return nil
}
func (c *Coordinator) ApplyForTask(args *WorkerArgs, reply *WorkerReply) error {
if c.phase == 0 {
task, ok := <- c.mapTasks
if !ok {
return nil
}
c.lock.Lock()
defer c.lock.Unlock()
// log.Printf("Allocate map %d task %d to worker %s\n", c.phase, task.Index, args.WorkerID)
task.WorkerID = args.WorkerID
task.StartTime = time.Now()
c.tasks[GenTaskID(c.phase, task.Index)] = task
reply.TaskType = c.phase
reply.TaskIndex = task.Index
reply.FileName = task.FileName
reply.NReduce = c.nReduce
reply.NMap = c.nMap
return nil
}
if c.phase == 1 {
task, ok := <- c.reduceTasks
if !ok {
return nil
}
c.lock.Lock()
defer c.lock.Unlock()
// log.Printf("Allocate reduce %d task %d to worker %s\n", c.phase, task.Index, args.WorkerID)
task.WorkerID = args.WorkerID
task.StartTime = time.Now()
c.tasks[GenTaskID(c.phase, task.Index)] = task
reply.TaskType = c.phase
reply.TaskIndex = task.Index
reply.NReduce = c.nReduce
reply.NMap = c.nMap
return nil
}
if c.phase == 2 {
log.Printf("task is completed")
c.lock.Lock()
defer c.lock.Unlock()
reply.TaskType = c.phase
}
return nil
}
//
// start a thread that listens for RPCs from worker.go
//
func (c *Coordinator) server() {
rpc.Register(c)
rpc.HandleHTTP()
//l, e := net.Listen("tcp", ":1234")
sockname := coordinatorSock()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}
//
// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
//
func (c *Coordinator) Done() bool {
// ret := false
// Your code here.
ret := c.phase == 2
return ret
}
//
// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
//
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{
phase: 0,
nMap: len(files),
nReduce: nReduce,
mapTasks: make(chan Task, len(files)),
reduceTasks: make(chan Task, nReduce),
tasks: make(map[string]Task),
}
// Your code here.
for i, file := range files {
task := Task{
FileName: file,
Index: i,
}
c.tasks[GenTaskID(0, i)] = task
c.mapTasks <- task
}
c.server()
go c.recycle()
return &c
}
func (c *Coordinator) recycle() {
for {
time.Sleep(3 * time.Second)
c.lock.Lock()
for _, task := range c.tasks {
if task.WorkerID != "" && time.Now().After(task.StartTime.Add(10 * time.Second)) {
// 回收task,并重新调度
task.WorkerID = ""
if c.phase == 0 {
c.mapTasks <- task
} else if c.phase == 1 {
c.reduceTasks <- task
}
}
}
c.lock.Unlock()
}
}
func GenTaskID(phase int, index int) string {
return fmt.Sprintf("%d-%d", phase, index)
}
Worker
type KeyValue struct {
Key string
Value string
}
type ByKey []KeyValue
// for sorting by key.
func (a ByKey) Len() int { return len(a) }
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32() & 0x7fffffff)
}
//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
// Your worker implementation here.
// uncomment to send the Example RPC to the coordinator.
// CallExample()
// 单机,直接使用PID作为WorkerID
id := strconv.Itoa(os.Getpid())
for {
args := WorkerArgs {
WorkerID: id,
}
reply := WorkerReply{}
ok := call("Coordinator.ApplyForTask", &args, &reply)
if !ok || reply.TaskType == 2 {
break
}
if reply.TaskType == 0 {
// map task
intermediate := []KeyValue{}
// open and read the file
file, err := os.Open(reply.FileName)
if err != nil {
log.Fatalf("cannot open %v", reply.FileName)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", reply.FileName)
}
file.Close()
// call mapf
kva := mapf(reply.FileName, string(content))
intermediate = append(intermediate, kva...)
// hash into buckets
buckets := make([][]KeyValue, reply.NReduce)
for i := range buckets {
buckets[i] = []KeyValue{}
}
for _, kva := range intermediate {
buckets[ihash(kva.Key) % reply.NReduce] = append(buckets[ihash(kva.Key) % reply.NReduce], kva)
}
// write into intermediate files
for i := range buckets {
oname := "mr-" + strconv.Itoa(reply.TaskIndex) + "-" + strconv.Itoa(i)
ofile, _ := ioutil.TempFile("", oname+"*")
enc := json.NewEncoder(ofile)
for _, kva := range buckets[i] {
err := enc.Encode(&kva)
if err != nil {
log.Fatalf("cannot write into %v", oname)
}
}
os.Rename(ofile.Name(), oname)
ofile.Close()
}
finishedArgs := WorkerArgs {
WorkerID: id,
TaskIndex: reply.TaskIndex,
}
finishedReply := WorkerReply {
}
// call master to send the finish message
call("Coordinator.ReceiveFinishedMap", &finishedArgs, &finishedReply)
} else if reply.TaskType == 1 {
// reduce task
intermediate := []KeyValue{}
for i := 0; i < reply.NMap; i++ {
iname := "mr-" + strconv.Itoa(i) + "-" + strconv.Itoa(reply.TaskIndex)
file, err := os.Open(iname)
if err != nil {
log.Fatalf("cannot open %v", file)
}
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
intermediate = append(intermediate, kv)
}
file.Close()
}
sort.Sort(ByKey(intermediate))
// output file
oname := "mr-out" + strconv.Itoa(reply.TaskIndex)
ofile, _ := ioutil.TempFile("", oname + "*")
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)
fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)
i = j
}
os.Rename(ofile.Name(), oname)
ofile.Close()
for i := 0; i < reply.NMap; i++ {
iname := "mr-" + strconv.Itoa(i) + "-" + strconv.Itoa(reply.TaskIndex)
err := os.Remove(iname)
if err != nil {
log.Fatalf("cannot open delete" + iname)
}
}
finishedArgs := WorkerArgs {
TaskIndex: reply.TaskIndex,
}
finishedReply := WorkerReply {
}
call("Coordinator.ReceiveFinishedReduce", &finishedArgs, &finishedReply)
}
time.Sleep(time.Second)
}
return
}
//
// example function to show how to make an RPC call to the coordinator.
//
// the RPC argument and reply types are defined in rpc.go.
//
func CallExample() {
// declare an argument structure.
args := ExampleArgs{}
// fill in the argument(s).
args.X = 99
// declare a reply structure.
reply := ExampleReply{}
// send the RPC request, wait for the reply.
// the "Coordinator.Example" tells the
// receiving server that we'd like to call
// the Example() method of struct Coordinator.
ok := call("Coordinator.Example", &args, &reply)
if ok {
// reply.Y should be 100.
fmt.Printf("reply.Y %v\n", reply.Y)
} else {
fmt.Printf("call failed!\n")
}
}
//
// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) bool {
// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
sockname := coordinatorSock()
c, err := rpc.DialHTTP("unix", sockname)
if err != nil {
log.Fatal("dialing:", err)
}
defer c.Close()
err = c.Call(rpcname, args, reply)
if err == nil {
return true
}
fmt.Println(err)
return false
}
测试结果:
mr-02.PNG