MIT6.824 Lab1 MapReduce

Introduction

在本实验中,将用Go编程构建一个MapReduce库。在第一部分中,将编写一个简单的MapReduce程序。在第二部分中,将编写一个Master,将任务分发给MapReduce的worker,并处理worker的失败。库的接口和容错方法类似于MapReduce论文 中的描述。

Software

代码仓库的URL是 git://g.csail.mit.edu/6.824-golabs-2018

$ git clone git://g.csail.mit.edu/6.824-golabs-2018 6.824
$ cd 6.824
$ ls
Makefile src

Preamble

mapreduce包提供了1个简单的Map/Reduce库的串行实现。正常应用应该调用Distributed函数[master.go]来启动1个任务,但是可以通过调用Sequential函数[master.go]来进行debug。

$ go test -run Sequential
  • mapreduce实现流程:
    1. 应用提供一些输入文件,1个map函数,1个reduce函数,reduce worker的数目(nReduce)。
    2. 建立1个master节点,它启动1个RPC server(master_rpc.go),然后等待worker来注册(使用RPC 调用 Register函数[master.go]). 当worker可用时(在第4、5部分),schedule函数[schedule.go]决定如何分配任务到worker以及如何处理worker的failures。
    3. master节点认为每个输入文件对应1个map任务,为每个任务至少调用1次doMap函数[common_map.go]。每次调用doMap函数会读取合适的文件,并调用map函数来处理文件内容,为每个map文件生成nReduce个文件。
    4. master节点接下去为每个reduce任务至少调用1次doReduce函数[common_reduce.go]。doReduce函数收集nReduce个reduce文件,然后调用reduce函数处理这些文件,产生nReduce个结果文件。
    5. master节点调用mr.merge函数[master_splitmerge.go],来整合nReduce个结果文件成1个最终文件
    6. master节点发送1个Shutdown的RPC调用到每个worker,来关闭它们的RPC server。

Part I: Map/Reduce input and output

给出的Map / Reduce实现缺少一些部分。在编写第一个Map / Reduce函数对之前,需要修复顺序实现。特别是,给出的代码缺少两个关键部分:分割map任务输出的函数,以及收集reduce任务的所有输入的函数。这些任务分别由common_map.go中的doMap()函数和common_reduce.go中的doReduce()函数执行。

  • 测试
$ cd 6.824
$ export "GOPATH=$PWD"  # go needs $GOPATH to be set to the project's working directory
$ cd "$GOPATH/src/mapreduce"
$ go test -run Sequential
ok      mapreduce   2.694s
  • 实现

在common_map.go文件中有关于doMap函数功能的描述注释,主要操作是打开文件名为inFile的输入文件,读取文件内容,然后调用mapF函数来处理内容,返回值为KeyVaule结构体[common.go]实例,然后生成nReduce个中间文件,提示使用json格式写入。

doMap实现:

file, err := os.Open(inFile)
if err != nil {
    log.Fatal("ERROR[doMap]: Open file error ", err)
}
defer file.Close()
// 获取文件状态信息
fileInfo, err := file.Stat()
if err != nil {
    log.Fatal("ERROR[doMap]: Get file state error ", err)
}
// 读文件
fileSize := fileInfo.Size()
buffer := make([]byte, fileSize)
_, err = file.Read(buffer)
if err != nil {
    log.Fatal("ERROR[doMap]:Read error ", err)
}
// 处理文件内容
middleRes := mapF(inFile, string(buffer))
rSize := len(middleRes)
// 生成中间文件
for i := 0; i < nReduce; i++ {
    fileName := reduceName(jobName, mapTask, i)
    midFile, err := os.Create(fileName)
    if err != nil {
        log.Fatal("ERROR[doMap]: Create intermediate file fail ", err)
    }
    enc := json.NewEncoder(midFile)
    for r := 0; r < rSize; r++ {
        kv := middleRes[r]
        if ihash(kv.Key)%nReduce == i {
            err := enc.Encode(&kv)
            if err != nil {
                log.Fatal("ERROR[doMap]: Encode error: ", err)
            }
        }
    }
    midFile.Close()
}

在common_reduce.go文件中有关于doReduce函数功能的描述注释,主要操作是先从每个map函数的输出文件中获取该reduce任务相应的中间文件,然后根据key值进行排序,最后调用reduce函数来生成最终的结果并写入文件。

doReduce实现:

keyValues := make(map[string][]string)

for i := 0; i < nMap; i++ {
    fileName := reduceName(jobName, i, reduceTask)
    file, err := os.Open(fileName)
    if err != nil {
        log.Fatal("ERROR[doReduce]: Open error: ", err)
    }
    dec := json.NewDecoder(file)
    for {
        var kv KeyValue
        err := dec.Decode(&kv)
        if err != nil {
            break
        }
        _, ok := keyValues[kv.Key]
        if !ok {
            keyValues[kv.Key] = make([]string, 0)
        }
        keyValues[kv.Key] = append(keyValues[kv.Key], kv.Value)
    }
    file.Close()
}

var keys []string
for k := range keyValues {
    keys = append(keys, k)
}

sort.Strings(keys)
mergeFileName := mergeName(jobName, reduceTask)
mergeFile, err := os.Create(mergeFileName)
if err != nil {
    log.Fatal("ERROR[doReduce]: Create file error: ", err)
}
enc := json.NewEncoder(mergeFile)
for _, k := range keys {
    res := reduceF(k, keyValues[k])
    enc.Encode(&KeyValue{k, res})
}
mergeFile.Close()

Part II: Single-worker word count

现在,你将实现字数统计 - 一个简单的Map / Reduce示例。看看main / wc.go;你会发现空的mapF()reduceF()函数。你的工作是插入代码,以便wc.go报告其输入中每个单词的出现次数。一个单词是任何连续的字母序列,由unicode.IsLetter确定。 有些输入文件的路径名为pg - * .txt,位于〜/ 6.824 / src / main。以下是如何使用输入文件运行wc:

$ cd 6.824
$ export "GOPATH=$PWD"
$ cd "$GOPATH/src/main"
$ go run wc.go master sequential pg-*.txt
# command-line-arguments
./wc.go:14: missing return at end of function
./wc.go:21: missing return at end of function

运行结果是编译失败,因为mapF()reduceF()未完成。

更简单的运行方法是使用源代码提供的测试脚本:

$ bash ./test-wc.sh
  • 实现

mapF函数的参数filename为输入文件的文件名,contents为文件内容,需要实现生成[word, “1”]这样的中间结果。在main/wc.go中有关于mapF函数实现的注释。先对于文件内容contents进行分割,用strings.FieldsFunc函数来分割成单词。然后对于每个单词,将[word,”1”]加入到中间结果中。

mapF实现:

values := strings.FieldsFunc(contents, func(c rune) bool {
    return !unicode.IsLetter(c)
})
res := make([]mapreduce.KeyValue, 0)
for _, v := range values {
    res = append(res, mapreduce.KeyValue{v, "1"})
}
return res

对于reduceF函数,参数key为word,参数values就是[“1”,”1”, …]形式的字符串切片,主要操作就是统计该单词的出现次数,即累加values中的元素即可,使用strconv库提供的函数将字符串转换为数值,最后将统计和结果转换为字符串返回。

var sum int
for _, v := range values {
    count, err := strconv.Atoi(v)
    if err != nil {
        log.Fatal("ERROR[reduceF]: atoi failed ", err)
    }
    sum += count
}
return strconv.Itoa(sum)

Part III: Distributing MapReduce tasks

你当前的实现运行map并一次减少一个任务。 Map / Reduce最大的卖点之一是它可以自动并行化普通的顺序代码而无需开发人员的任何额外工作。在本练习的这一部分中,你将完成一个MapReduce的版本,该版本将工作拆分为在多核上并行运行的一组工作线程。虽然不像在实际的Map / Reduce部署中那样分布在多台机器上,但您的实现将使用RPC来模拟分布式计算。

为了协同任务的并行执行,我们将使用1个特殊的master线程,来分发任务到worker线程并等待它们完成。实验中提供了worker的实现代码和启动代码(mapreduce/worker.go)以及RPC消息处理的代码(mapreduce/common_rpc.go)。
我们的任务实现mapreduce包中的schedule.go文件,尤其是其中的schedule函数来分发map和reduce任务到worker,并当它们完成后才返回。
mr.run函数[master.go]里面会调用schedule函数来运行map和reduce任务,然后调用merge函数来将每个reduce任务的结果文件整合成1个最终文件。schedule函数只需要告诉worker输入文件的文件名(mr.files[task])和任务号。master节点通过RPC调用Worker.DoTask,传递1个DoTaskArgs对象作为RPC的参数来告诉worker新的任务。
当1个worker启动时,它会发送1个注册RPC给master,传递新worker的信息到mr.registerChannel。我们的schedule函数通过读取mr.registerChannel来获得可用的worker。

  • 测试方法
$ cd 6.824/src/mapreduce
$ go test -run TestParallel
  • 实现

主要过程是先区分一下这是map任务还是reduce任务,对于map任务,任务数ntask为输入文件的个数,n_other为reduce worker的数目nReduce,对于reduce任务,任务数ntask为reduce worker的数目nReduce,n_other为map worker的数目即输入文件的个数。然后创建1个同步包sync中的等待组WaitGroup,对于每个任务,将其加入到等待组中,并运行1个goroutine来运行进行分发任务。首先从mr.registerChannel中获得1个可用的worker,构建DoTaskArgs对象,作为参数调用worker的Worker.DoTask来执行任务,当其完成任务后将其重新加入到mr.registerChannel表示可用。最后使用WaitGroup的wait函数等待所有任务完成。因为只有当map任务都完成后才能执行reduce任务。

schedule()实现:

var wg sync.WaitGroup
for i := 0; i < ntasks; i++ {
    wg.Add(1)
    go func(taskNum int, n_other int, phase jobPhase) {
        defer wg.Done()
        worker := <-registerChan
        var args DoTaskArgs
        args.JobName = jobName
        args.File = mapFiles[taskNum]
        args.Phase = phase
        args.TaskNumber = taskNum
        args.NumOtherPhase = n_other
        ok := call(worker, "Worker.DoTask", &args, new(struct{}))
        if ok {
            go func() {
                registerChan <- worker
            }()
        }
    }(i, n_other, phase)
}
wg.Wait()
fmt.Printf("Schedule: %v done\n", phase)

Part IV: Handling worker failures

在这部分中,你需要让master处理失败的worker。 MapReduce使这相对容易,因为worker没有持久状态。如果工作程序在从master处理RPC时失败,则master的call()最终会因超时而返回false。在这种情况下,master应该将失败worker的任务重新分配给另一个worker。 RPC故障并不一定意味着worker没有执行任务;可能是worker已经执行了但是回复丢失了,或者worker可能仍在执行但master的RPC超时。因此,可能会发生两个worker收到相同的任务,计算它并生成输出。MapReduce框架确保map和reduce函数输出以原子方式显示:输出文件不存在,或者将包含map或reduce函数的单个执行的整个输出。

我们的任务是修改mapreduce包中的schedule.go文件,使其具有简单的容错性。使master节点能处理worker的宕机。当1个worker宕机时,master发送的RPC都会失败,那么久需要重新安排任务,将宕机worker的任务分配给其它worker。
RPC的失败并不是表示worker的宕机,worker可能只是网络不可达,仍然在工作计算。所以如果重新分配任务可能造成2个worker接受相同的任务并计算。但是这没关系,因为相同的任务生成相同的结果。我们只要实现重新分配任务即可。

  • 测试方法
$ cd 6.824/src/mapreduce
$ go test -run Failure
  • 实现

使用无限for循环中,当RPC的call失败时,仅仅就是重新选取1个worker,只有当成功时,才会break。

schedule()实现:

var wg sync.WaitGroup
for i := 0; i < ntasks; i++ {
    wg.Add(1)
    go func(taskNum int, n_other int, phase jobPhase) {
        defer wg.Done()
        for {
            worker := <-registerChan
            var args DoTaskArgs
            args.JobName = jobName
            args.File = mapFiles[taskNum]
            args.Phase = phase
            args.TaskNumber = taskNum
            args.NumOtherPhase = n_other
            ok := call(worker, "Worker.DoTask", &args, new(struct{}))
            if ok {
                go func() {
                    registerChan <- worker
                }()
                break
            }
        }
    }(i, n_other, phase)
}
wg.Wait()
fmt.Printf("Schedule: %v done\n", phase)

Part V: Inverted index generation

在这个部分,你将构建用于生成倒排索引的Map和Reduce函数。 在main包中有一个ii.go文件,与之前任务修改的wc.go非常相似。你应该在main / ii.go中修改mapF和reduceF,以便它们一起生成倒排索引。

  • 测试方法
$ go run ii.go master sequential pg-*.txt
  • 实现

在mapF函数中操作与原先的word count类似,只是生成的中间结果形式变为[word, document]。

values := strings.FieldsFunc(value, func(c rune) bool {
    return !unicode.IsLetter(c)
})
for _, v := range values {
    res = append(res, mapreduce.KeyValue{v, document})
}
return res

在reduceF函数中,此时values为document的字符串切片,需要先去冗余,即实现set,由于go语言不提供set,可以用map来模拟实现,然后根据输出构造结果字符串。

valuesNoRepeat := make([]string, 0)
set := make(map[string]int)
for _, v := range values {
    _, ok := set[v]
    if !ok {
        set[v] = 1
        valuesNoRepeat = append(valuesNoRepeat, v)
    }
}
sort.Strings(valuesNoRepeat)
valuesLen := len(valuesNoRepeat)
res := strconv.Itoa(valuesLen) + " "
for i, v := range valuesNoRepeat {
    if i == valuesLen-1 {
        res += v
    } else {
        res += v + ","
    }
}
return res

运行lab 1所有Part的测试

$ cd src/main
$ bash ./test-mr.sh
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容