6.824:MapReduce

MapReduce 练手

Part I

common_map.go

package mapreduce

import (
    "encoding/json"
    "hash/fnv"
    "log"
    "os"
)

func doMap(
    jobName string, // the name of the MapReduce job
    mapTask int, // which map task this is
    inFile string,
    nReduce int, // the number of reduce task that will be run ("R" in the paper)
    mapF func(filename string, contents string) []KeyValue,
) {
    //
    // doMap manages one map task: it should read one of the input files
    // (inFile), call the user-defined map function (mapF) for that file's
    // contents, and partition mapF's output into nReduce intermediate files.
    //
    // There is one intermediate file per reduce task. The file name
    // includes both the map task number and the reduce task number. Use
    // the filename generated by reduceName(jobName, mapTask, r)
    // as the intermediate file for reduce task r. Call ihash() (see
    // below) on each key, mod nReduce, to pick r for a key/value pair.
    //
    // mapF() is the map function provided by the application. The first
    // argument should be the input file name, though the map function
    // typically ignores it. The second argument should be the entire
    // input file contents. mapF() returns a slice containing the
    // key/value pairs for reduce; see common.go for the definition of
    // KeyValue.
    //
    // Look at Go's ioutil and os packages for functions to read
    // and write files.
    //
    // Coming up with a scheme for how to format the key/value pairs on
    // disk can be tricky, especially when taking into account that both
    // keys and values could contain newlines, quotes, and any other
    // character you can think of.
    //
    // One format often used for serializing data to a byte stream that the
    // other end can correctly reconstruct is JSON. You are not required to
    // use JSON, but as the output of the reduce tasks *must* be JSON,
    // familiarizing yourself with it here may prove useful. You can write
    // out a data structure as a JSON string to a file using the commented
    // code below. The corresponding decoding functions can be found in
    // common_reduce.go.
    //
    //   enc := json.NewEncoder(file)
    //   for _, kv := ... {
    //     err := enc.Encode(&kv)
    //
    // Remember to close the file after you have written all the values!
    //
    // Your code here (Part I).
    //

    inputFile, err := os.Open(inFile)
    if err != nil {
        log.Fatal("doMap: open input file: ", inFile, " error: ", err)
    }
    defer inputFile.Close()

    fileInfo, err := inputFile.Stat()
    if err != nil {
        log.Fatal("doMap: getstat input file: ", inFile, " error: ", err)
    }

    inputFileContent := make([]byte, fileInfo.Size())
    _, err = inputFile.Read(inputFileContent)
    if err != nil {
        log.Fatal("doMap: read input file: ", inFile, " error: ", err)
    }

    keyValues := mapF(inFile, string(inputFileContent))
    for i := 0; i < nReduce; i++ {
        fileName := reduceName(jobName, mapTask, i)
        reduceFile, err := os.Create(fileName)
        if err != nil {
            log.Fatal("doMap: create input file: ", inFile, " error: ", err)
        }
        defer reduceFile.Close()
        enc := json.NewEncoder(reduceFile)
        for _, kv := range keyValues {
            if ihash(kv.Key)%nReduce == i {
                err := enc.Encode(&kv)
                if err != nil {
                    log.Fatal("doMap: json encode error: ", err)
                }
            }
        }
    }
}

func ihash(s string) int {
    h := fnv.New32a()
    h.Write([]byte(s))
    return int(h.Sum32() & 0x7fffffff)
}

common_reduce.go

package mapreduce

import (
    "encoding/json"
    "log"
    "os"
    "sort"
)

func doReduce(
    jobName string, // the name of the whole MapReduce job
    reduceTask int, // which reduce task this is
    outFile string, // write the output here
    nMap int, // the number of map tasks that were run ("M" in the paper)
    reduceF func(key string, values []string) string,
) {
    //
    // doReduce manages one reduce task: it should read the intermediate
    // files for the task, sort the intermediate key/value pairs by key,
    // call the user-defined reduce function (reduceF) for each key, and
    // write reduceF's output to disk.
    //
    // You'll need to read one intermediate file from each map task;
    // reduceName(jobName, m, reduceTask) yields the file
    // name from map task m.
    //
    // Your doMap() encoded the key/value pairs in the intermediate
    // files, so you will need to decode them. If you used JSON, you can
    // read and decode by creating a decoder and repeatedly calling
    // .Decode(&kv) on it until it returns an error.
    //
    // You may find the first example in the golang sort package
    // documentation useful.
    //
    // reduceF() is the application's reduce function. You should
    // call it once per distinct key, with a slice of all the values
    // for that key. reduceF() returns the reduced value for that key.
    //
    // You should write the reduce output as JSON encoded KeyValue
    // objects to the file named outFile. We require you to use JSON
    // because that is what the merger than combines the output
    // from all the reduce tasks expects. There is nothing special about
    // JSON -- it is just the marshalling format we chose to use. Your
    // output code will look something like this:
    //
    // enc := json.NewEncoder(file)
    // for key := ... {
    //  enc.Encode(KeyValue{key, reduceF(...)})
    // }
    // file.Close()
    //
    // Your code here (Part I).
    //
    keyValues := make(map[string][]string, 0)
    //find the keys in all map tasks for reduceTask i
    for i := 0; i < nMap; i++ {
        fileName := reduceName(jobName, i, reduceTask)
        file, err := os.Open(fileName)
        if err != nil {
            log.Fatal("doReduce: open intermediate file ", fileName, " error: ", err)
        }
        defer file.Close()

        decoder := json.NewDecoder(file)
        for {
            var kv KeyValue
            err := decoder.Decode(&kv)
            if err != nil {
                break
            }
            _, ok := keyValues[kv.Key]
            if !ok {
                keyValues[kv.Key] = make([]string, 0)
            }
            keyValues[kv.Key] = append(keyValues[kv.Key], kv.Value)
        }
    }

    var keys []string
    for k := range keyValues {
        keys = append(keys, k)
    }
    sort.Strings(keys)
    resultFile, err := os.Create(outFile)
    if err != nil {
        log.Fatal("doReduce: create reduced file ", outFile, " error: ", err)
    }
    defer resultFile.Close()
    enc := json.NewEncoder(resultFile)
    for _, k := range keys {
        reducedValue := reduceF(k, keyValues[k])
        err := enc.Encode(&KeyValue{k, reducedValue})
        if err != nil {
            log.Fatal("doReduce: encode error: ", err)
        }
    }
}

Part II

word-count

func mapF(filename string, contents string) []mapreduce.KeyValue {
    // Your code here (Part II).
    words := strings.FieldsFunc(contents, func(c rune) bool {
        return !unicode.IsLetter(c)
    })
    keyValues := make([]mapreduce.KeyValue, 0)
    for _, word := range words {
        keyValues = append(keyValues, mapreduce.KeyValue{word, "1"})
    }
    return keyValues
}
func reduceF(key string, values []string) string {
    // Your code here (Part II).
    sum := 0
    for _, i := range values {
        count, err := strconv.Atoi(i)
        if err != nil {
            log.Fatal("reduceF: strconv string to int error: ", err)
        }
        sum += count
    }
    return strconv.Itoa(sum)
}

Part III

package mapreduce

import (
    "context"
    "fmt"
    "sync"
)

//
// schedule() starts and waits for all tasks in the given phase (mapPhase
// or reducePhase). the mapFiles argument holds the names of the files that
// are the inputs to the map phase, one per map task. nReduce is the
// number of reduce tasks. the registerChan argument yields a stream
// of registered workers; each item is the worker's RPC address,
// suitable for passing to call(). registerChan will yield all
// existing registered workers (if any) and new ones as they register.
//
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
    var ntasks int
    var n_other int // number of inputs (for reduce) or outputs (for map)
    switch phase {
    case mapPhase:
        ntasks = len(mapFiles)
        n_other = nReduce
    case reducePhase:
        ntasks = nReduce
        n_other = len(mapFiles)
    }
    fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)

    // All ntasks tasks have to be scheduled on workers. Once all tasks
    // have completed successfully, schedule() should return.
    //
    // Your code here (Part III, Part IV).
    //
    var wg sync.WaitGroup
    ctx, cancel := context.WithCancel(context.Background())
    for i := 0; i < ntasks; i++ {
        wg.Add(1)
        go func(i int, phase jobPhase) {
            defer wg.Done()
            for {
                worker := <-registerChan
                var doTaskArgs DoTaskArgs
                switch phase {
                case mapPhase:
                    doTaskArgs = DoTaskArgs{
                        jobName,
                        mapFiles[i],
                        phase,
                        i,
                        n_other,
                    }
                case reducePhase:
                    doTaskArgs = DoTaskArgs{
                        jobName,
                        "",
                        phase,
                        i,
                        n_other,
                    }
                }
                ok := call(worker, "Worker.DoTask", &doTaskArgs, nil)
                if ok {
                    go func() {
                        select {
                        //上下文传递超时信息,结束goroutine
                        case registerChan <- worker:
                            fmt.Println("worker has gone phase: ", phase, " i:", i)
                            return
                                                //prevent block with context
                        case <-ctx.Done():
                            fmt.Println("exit phase: ", phase, " i:", i)
                            return
                        }
                    }()
                    break
                }
            }
        }(i, phase)
    }
    wg.Wait()
    cancel()
    fmt.Printf("Schedule: %v done\n", phase)
}

利用context防止阻塞还不是美滋滋

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,607评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,239评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,960评论 0 355
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,750评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,764评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,604评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,347评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,253评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,702评论 1 315
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,893评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,015评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,734评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,352评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,934评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,052评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,216评论 3 371
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,969评论 2 355