MapReduce 练手
Part I
common_map.go
package mapreduce
import (
"encoding/json"
"hash/fnv"
"log"
"os"
)
func doMap(
jobName string, // the name of the MapReduce job
mapTask int, // which map task this is
inFile string,
nReduce int, // the number of reduce task that will be run ("R" in the paper)
mapF func(filename string, contents string) []KeyValue,
) {
//
// doMap manages one map task: it should read one of the input files
// (inFile), call the user-defined map function (mapF) for that file's
// contents, and partition mapF's output into nReduce intermediate files.
//
// There is one intermediate file per reduce task. The file name
// includes both the map task number and the reduce task number. Use
// the filename generated by reduceName(jobName, mapTask, r)
// as the intermediate file for reduce task r. Call ihash() (see
// below) on each key, mod nReduce, to pick r for a key/value pair.
//
// mapF() is the map function provided by the application. The first
// argument should be the input file name, though the map function
// typically ignores it. The second argument should be the entire
// input file contents. mapF() returns a slice containing the
// key/value pairs for reduce; see common.go for the definition of
// KeyValue.
//
// Look at Go's ioutil and os packages for functions to read
// and write files.
//
// Coming up with a scheme for how to format the key/value pairs on
// disk can be tricky, especially when taking into account that both
// keys and values could contain newlines, quotes, and any other
// character you can think of.
//
// One format often used for serializing data to a byte stream that the
// other end can correctly reconstruct is JSON. You are not required to
// use JSON, but as the output of the reduce tasks *must* be JSON,
// familiarizing yourself with it here may prove useful. You can write
// out a data structure as a JSON string to a file using the commented
// code below. The corresponding decoding functions can be found in
// common_reduce.go.
//
// enc := json.NewEncoder(file)
// for _, kv := ... {
// err := enc.Encode(&kv)
//
// Remember to close the file after you have written all the values!
//
// Your code here (Part I).
//
inputFile, err := os.Open(inFile)
if err != nil {
log.Fatal("doMap: open input file: ", inFile, " error: ", err)
}
defer inputFile.Close()
fileInfo, err := inputFile.Stat()
if err != nil {
log.Fatal("doMap: getstat input file: ", inFile, " error: ", err)
}
inputFileContent := make([]byte, fileInfo.Size())
_, err = inputFile.Read(inputFileContent)
if err != nil {
log.Fatal("doMap: read input file: ", inFile, " error: ", err)
}
keyValues := mapF(inFile, string(inputFileContent))
for i := 0; i < nReduce; i++ {
fileName := reduceName(jobName, mapTask, i)
reduceFile, err := os.Create(fileName)
if err != nil {
log.Fatal("doMap: create input file: ", inFile, " error: ", err)
}
defer reduceFile.Close()
enc := json.NewEncoder(reduceFile)
for _, kv := range keyValues {
if ihash(kv.Key)%nReduce == i {
err := enc.Encode(&kv)
if err != nil {
log.Fatal("doMap: json encode error: ", err)
}
}
}
}
}
func ihash(s string) int {
h := fnv.New32a()
h.Write([]byte(s))
return int(h.Sum32() & 0x7fffffff)
}
common_reduce.go
package mapreduce
import (
"encoding/json"
"log"
"os"
"sort"
)
func doReduce(
jobName string, // the name of the whole MapReduce job
reduceTask int, // which reduce task this is
outFile string, // write the output here
nMap int, // the number of map tasks that were run ("M" in the paper)
reduceF func(key string, values []string) string,
) {
//
// doReduce manages one reduce task: it should read the intermediate
// files for the task, sort the intermediate key/value pairs by key,
// call the user-defined reduce function (reduceF) for each key, and
// write reduceF's output to disk.
//
// You'll need to read one intermediate file from each map task;
// reduceName(jobName, m, reduceTask) yields the file
// name from map task m.
//
// Your doMap() encoded the key/value pairs in the intermediate
// files, so you will need to decode them. If you used JSON, you can
// read and decode by creating a decoder and repeatedly calling
// .Decode(&kv) on it until it returns an error.
//
// You may find the first example in the golang sort package
// documentation useful.
//
// reduceF() is the application's reduce function. You should
// call it once per distinct key, with a slice of all the values
// for that key. reduceF() returns the reduced value for that key.
//
// You should write the reduce output as JSON encoded KeyValue
// objects to the file named outFile. We require you to use JSON
// because that is what the merger than combines the output
// from all the reduce tasks expects. There is nothing special about
// JSON -- it is just the marshalling format we chose to use. Your
// output code will look something like this:
//
// enc := json.NewEncoder(file)
// for key := ... {
// enc.Encode(KeyValue{key, reduceF(...)})
// }
// file.Close()
//
// Your code here (Part I).
//
keyValues := make(map[string][]string, 0)
//find the keys in all map tasks for reduceTask i
for i := 0; i < nMap; i++ {
fileName := reduceName(jobName, i, reduceTask)
file, err := os.Open(fileName)
if err != nil {
log.Fatal("doReduce: open intermediate file ", fileName, " error: ", err)
}
defer file.Close()
decoder := json.NewDecoder(file)
for {
var kv KeyValue
err := decoder.Decode(&kv)
if err != nil {
break
}
_, ok := keyValues[kv.Key]
if !ok {
keyValues[kv.Key] = make([]string, 0)
}
keyValues[kv.Key] = append(keyValues[kv.Key], kv.Value)
}
}
var keys []string
for k := range keyValues {
keys = append(keys, k)
}
sort.Strings(keys)
resultFile, err := os.Create(outFile)
if err != nil {
log.Fatal("doReduce: create reduced file ", outFile, " error: ", err)
}
defer resultFile.Close()
enc := json.NewEncoder(resultFile)
for _, k := range keys {
reducedValue := reduceF(k, keyValues[k])
err := enc.Encode(&KeyValue{k, reducedValue})
if err != nil {
log.Fatal("doReduce: encode error: ", err)
}
}
}
Part II
word-count
func mapF(filename string, contents string) []mapreduce.KeyValue {
// Your code here (Part II).
words := strings.FieldsFunc(contents, func(c rune) bool {
return !unicode.IsLetter(c)
})
keyValues := make([]mapreduce.KeyValue, 0)
for _, word := range words {
keyValues = append(keyValues, mapreduce.KeyValue{word, "1"})
}
return keyValues
}
func reduceF(key string, values []string) string {
// Your code here (Part II).
sum := 0
for _, i := range values {
count, err := strconv.Atoi(i)
if err != nil {
log.Fatal("reduceF: strconv string to int error: ", err)
}
sum += count
}
return strconv.Itoa(sum)
}
Part III
package mapreduce
import (
"context"
"fmt"
"sync"
)
//
// schedule() starts and waits for all tasks in the given phase (mapPhase
// or reducePhase). the mapFiles argument holds the names of the files that
// are the inputs to the map phase, one per map task. nReduce is the
// number of reduce tasks. the registerChan argument yields a stream
// of registered workers; each item is the worker's RPC address,
// suitable for passing to call(). registerChan will yield all
// existing registered workers (if any) and new ones as they register.
//
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
}
fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)
// All ntasks tasks have to be scheduled on workers. Once all tasks
// have completed successfully, schedule() should return.
//
// Your code here (Part III, Part IV).
//
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < ntasks; i++ {
wg.Add(1)
go func(i int, phase jobPhase) {
defer wg.Done()
for {
worker := <-registerChan
var doTaskArgs DoTaskArgs
switch phase {
case mapPhase:
doTaskArgs = DoTaskArgs{
jobName,
mapFiles[i],
phase,
i,
n_other,
}
case reducePhase:
doTaskArgs = DoTaskArgs{
jobName,
"",
phase,
i,
n_other,
}
}
ok := call(worker, "Worker.DoTask", &doTaskArgs, nil)
if ok {
go func() {
select {
//上下文传递超时信息,结束goroutine
case registerChan <- worker:
fmt.Println("worker has gone phase: ", phase, " i:", i)
return
//prevent block with context
case <-ctx.Done():
fmt.Println("exit phase: ", phase, " i:", i)
return
}
}()
break
}
}
}(i, phase)
}
wg.Wait()
cancel()
fmt.Printf("Schedule: %v done\n", phase)
}
利用context防止阻塞还不是美滋滋