MIT 6.5840 LAB1-MapReduce

参考MapReduce的单进程串行执行,实现单机多进程的版本


mr-01.PNG

单机多进程版本:
Cooradinator需要有以下功能
1、在启动时根据指定的输入文件数及Reduce Task数,生成MapTask及ReduceTask
2、响应Worker的Task申请RPC请求,分配可用的Task给到Worker
3、跟踪Task的完成情况,在所有MapTask完成后进入Reduce阶段,开始派发ReduceTask,在所有ReduceTask
完成后标记任务已经完成并退出
4、在Task超时未完成时,将Task重新分配给Worker执行

Worker需要有以下功能
1、在空闲时,通过RPC向Coordinator申请Task并执行
2、执行完后,通知Coordinator。

代码实现
RPC

type Task struct {
    FileName string
    Index    int
    WorkerID string
    StartTime time.Time
}

type ExampleArgs struct {
    X int
}

type ExampleReply struct {
    Y int
}

// Add your RPC definitions here.
type WorkerArgs struct {
    WorkerID string
    TaskIndex int
}

type WorkerReply struct {
    TaskIndex    int
    FileName     string
    TaskType     int
    NReduce      int
    NMap         int
}

Coordinator

type Coordinator struct {
    // Your definitions here.
    nReduce         int  // number of reduce task
    nMap            int  // number of map task
    phase           int // 当前任务阶段 0 Map 1 Reduce 2 Done
    lock            sync.Mutex
    mapTasks        chan Task
    reduceTasks     chan Task
    tasks           map[string]Task
}

// Your code here -- RPC handlers for the worker to call.

//
// an example RPC handler.
//
// the RPC argument and reply types are defined in rpc.go.
//
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
    reply.Y = args.X + 1
    return nil
}

func (c *Coordinator) ReceiveFinishedMap(args *WorkerArgs, reply *WorkerReply) error {
    c.lock.Lock()
    delete(c.tasks, GenTaskID(c.phase, args.TaskIndex))
    if len(c.tasks) == 0 {
        log.Printf("All map tasks is finished")
        close(c.mapTasks)
        c.phase = 1
        // 生成Reduce Task
        for i := 0; i < c.nReduce; i++ {
            task := Task {
                Index: i,
            }
            c.tasks[GenTaskID(c.phase, task.Index)] = task
            c.reduceTasks <- task
        }
    }
    c.lock.Unlock()
    return nil
}

func (c *Coordinator) ReceiveFinishedReduce(args *WorkerArgs, reply *WorkerReply) error {
    c.lock.Lock()
    delete(c.tasks, GenTaskID(c.phase, args.TaskIndex))
    if len(c.tasks) == 0 {
        log.Printf("All reduce tasks is finished")
        close(c.reduceTasks)
        c.phase = 2
    }
    c.lock.Unlock()
    return nil
}

func (c *Coordinator) ApplyForTask(args *WorkerArgs, reply *WorkerReply) error {
    if c.phase == 0 {
        task, ok := <- c.mapTasks
        if !ok {
            return nil
        }

        c.lock.Lock()
        defer c.lock.Unlock()
        // log.Printf("Allocate map %d task %d to worker %s\n", c.phase, task.Index, args.WorkerID)
        task.WorkerID = args.WorkerID
        task.StartTime = time.Now()
        c.tasks[GenTaskID(c.phase, task.Index)] = task
        reply.TaskType = c.phase
        reply.TaskIndex = task.Index
        reply.FileName = task.FileName
        reply.NReduce = c.nReduce
        reply.NMap = c.nMap
        return nil
    }

    if c.phase == 1 {
        task, ok := <- c.reduceTasks
        if !ok {
            return nil
        }
        c.lock.Lock()
        defer c.lock.Unlock()
        // log.Printf("Allocate reduce %d task %d to worker %s\n", c.phase, task.Index, args.WorkerID)
        task.WorkerID = args.WorkerID
        task.StartTime = time.Now()
        c.tasks[GenTaskID(c.phase, task.Index)] = task
        reply.TaskType = c.phase
        reply.TaskIndex = task.Index
        reply.NReduce = c.nReduce
        reply.NMap = c.nMap
        return nil
    }

    if c.phase == 2 {
        log.Printf("task is completed")
        c.lock.Lock()
        defer c.lock.Unlock()
        reply.TaskType = c.phase
    }

    return nil
}


//
// start a thread that listens for RPCs from worker.go
//
func (c *Coordinator) server() {
    rpc.Register(c)
    rpc.HandleHTTP()
    //l, e := net.Listen("tcp", ":1234")
    sockname := coordinatorSock()
    os.Remove(sockname)
    l, e := net.Listen("unix", sockname)
    if e != nil {
        log.Fatal("listen error:", e)
    }

    go http.Serve(l, nil)
}

//
// main/mrcoordinator.go calls Done() periodically to find out
// if the entire job has finished.
//
func (c *Coordinator) Done() bool {
    // ret := false

    // Your code here.
    ret := c.phase == 2

    return ret
}

//
// create a Coordinator.
// main/mrcoordinator.go calls this function.
// nReduce is the number of reduce tasks to use.
//
func MakeCoordinator(files []string, nReduce int) *Coordinator {
    c := Coordinator{
        phase: 0,
        nMap: len(files),
        nReduce: nReduce,
        mapTasks: make(chan Task, len(files)),
        reduceTasks: make(chan Task, nReduce),
        tasks: make(map[string]Task),
    }

    // Your code here.
    for i, file := range files {
        task := Task{
            FileName: file,
            Index: i,
        }
        c.tasks[GenTaskID(0, i)] = task
        c.mapTasks <- task
    }

    c.server()

    go c.recycle()
    return &c
}

func (c *Coordinator) recycle() {
    for {
        time.Sleep(3 * time.Second)
        c.lock.Lock()
        for _, task := range c.tasks {
            if task.WorkerID != "" && time.Now().After(task.StartTime.Add(10 * time.Second)) {
                // 回收task,并重新调度
                task.WorkerID = ""
                if c.phase == 0 {
                    c.mapTasks <- task
                } else if c.phase == 1 {
                    c.reduceTasks <- task
                }
            }
        }
        c.lock.Unlock()
    }
}

func GenTaskID(phase int, index int) string {
    return fmt.Sprintf("%d-%d", phase, index)
}

Worker

type KeyValue struct {
    Key   string
    Value string
}

type ByKey []KeyValue

// for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {
    h := fnv.New32a()
    h.Write([]byte(key))
    return int(h.Sum32() & 0x7fffffff)
}

//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
    reducef func(string, []string) string) {

    // Your worker implementation here.

    // uncomment to send the Example RPC to the coordinator.
    // CallExample()
    // 单机,直接使用PID作为WorkerID
    id := strconv.Itoa(os.Getpid())
    for {
        args := WorkerArgs {
            WorkerID: id,
        }
        reply := WorkerReply{}
        ok := call("Coordinator.ApplyForTask", &args, &reply)
        if !ok || reply.TaskType == 2 {
            break
        }

        if reply.TaskType == 0 {
            // map task
            intermediate := []KeyValue{}

            // open and read the file
            file, err := os.Open(reply.FileName)
            if err != nil {
                log.Fatalf("cannot open %v", reply.FileName)
            }
            content, err := ioutil.ReadAll(file)
            if err != nil {
                log.Fatalf("cannot read %v", reply.FileName)
            }
            file.Close()

            // call mapf
            kva := mapf(reply.FileName, string(content))
            intermediate = append(intermediate, kva...)

            // hash into buckets
            buckets := make([][]KeyValue, reply.NReduce)
            for i := range buckets {
                buckets[i] = []KeyValue{}
            }
            for _, kva := range intermediate {
                buckets[ihash(kva.Key) % reply.NReduce] = append(buckets[ihash(kva.Key) % reply.NReduce], kva)
            }

            // write into intermediate files
            for i := range buckets {
                oname := "mr-" + strconv.Itoa(reply.TaskIndex) + "-" + strconv.Itoa(i)
                ofile, _ := ioutil.TempFile("", oname+"*")
                enc := json.NewEncoder(ofile)
                for _, kva := range buckets[i] {
                    err := enc.Encode(&kva)
                    if err != nil {
                        log.Fatalf("cannot write into %v", oname)
                    }
                }

                os.Rename(ofile.Name(), oname)
                ofile.Close()
            }

            finishedArgs := WorkerArgs {
                WorkerID: id,
                TaskIndex: reply.TaskIndex,
            }
            finishedReply := WorkerReply {
            }
            // call master to send the finish message
            call("Coordinator.ReceiveFinishedMap", &finishedArgs, &finishedReply)
        } else if reply.TaskType == 1 {
            // reduce task
            intermediate := []KeyValue{}
            for i := 0; i < reply.NMap; i++ {
                iname := "mr-" + strconv.Itoa(i) + "-" + strconv.Itoa(reply.TaskIndex)
                file, err := os.Open(iname)
                if err != nil {
                    log.Fatalf("cannot open %v", file)
                }

                dec := json.NewDecoder(file)
                for {
                    var kv KeyValue
                    if err := dec.Decode(&kv); err != nil {
                        break
                    }
                    intermediate = append(intermediate, kv)
                }
                file.Close()
            }
            sort.Sort(ByKey(intermediate))

            // output file
            oname := "mr-out" + strconv.Itoa(reply.TaskIndex)
            ofile, _ := ioutil.TempFile("", oname + "*")

            i := 0
            for i < len(intermediate) {
                j := i + 1
                for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
                    j++
                }
                values := []string{}
                for k := i; k < j; k++ {
                    values = append(values, intermediate[k].Value)
                }
                output := reducef(intermediate[i].Key, values)

                fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)
                i = j
            }

            os.Rename(ofile.Name(), oname)
            ofile.Close()

            for i := 0; i < reply.NMap; i++ {
                iname := "mr-" + strconv.Itoa(i) + "-" + strconv.Itoa(reply.TaskIndex)
                err := os.Remove(iname)
                if err != nil {
                    log.Fatalf("cannot open delete" + iname)
                }
            }

            finishedArgs := WorkerArgs {
                TaskIndex: reply.TaskIndex,
            }
            finishedReply := WorkerReply {
            }
            call("Coordinator.ReceiveFinishedReduce", &finishedArgs, &finishedReply)
        }
        time.Sleep(time.Second)
    }
    return
}

//
// example function to show how to make an RPC call to the coordinator.
//
// the RPC argument and reply types are defined in rpc.go.
//
func CallExample() {

    // declare an argument structure.
    args := ExampleArgs{}

    // fill in the argument(s).
    args.X = 99

    // declare a reply structure.
    reply := ExampleReply{}

    // send the RPC request, wait for the reply.
    // the "Coordinator.Example" tells the
    // receiving server that we'd like to call
    // the Example() method of struct Coordinator.
    ok := call("Coordinator.Example", &args, &reply)
    if ok {
        // reply.Y should be 100.
        fmt.Printf("reply.Y %v\n", reply.Y)
    } else {
        fmt.Printf("call failed!\n")
    }
}

//
// send an RPC request to the coordinator, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) bool {
    // c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
    sockname := coordinatorSock()
    c, err := rpc.DialHTTP("unix", sockname)
    if err != nil {
        log.Fatal("dialing:", err)
    }
    defer c.Close()

    err = c.Call(rpcname, args, reply)
    if err == nil {
        return true
    }

    fmt.Println(err)
    return false
}

测试结果:


mr-02.PNG
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 设计一个MapReduce 设计目的 设计一个简单的 MapReduce 系统,实现一个 master,多个并行w...
    不宜空腹吃早餐阅读 2,144评论 0 0
  • 为什么要有Hadoop? 从计算机诞生到现今,积累了海量的数据,这些海量的数据有结构化、半结构化、非 结构的数据...
    _Levi__阅读 4,276评论 1 0
  • MapReduce是由JeffreyDean提出的一种处理大数据的编程模型,作为在Go中编程和构建容错分布式系统的...
    yddeng阅读 2,860评论 0 1
  • lab1是在单机上实现mapreduce库,因为没有分布式环境,所以只能实现序列化操作和用并行操作代替分布式操作。...
    马天猫Masn阅读 3,666评论 1 2
  • MAPREDUCE概念 Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应...
    张鱼猫阅读 8,368评论 0 8

友情链接更多精彩内容