MapReduce 练手
Part I
package mapreduce
import (
func doMap(
jobName string, // the name of the MapReduce job
mapTask int, // which map task this is
inFile string,
nReduce int, // the number of reduce task that will be run ("R" in the paper)
mapF func(filename string, contents string) []KeyValue,
) {
// doMap manages one map task: it should read one of the input files
// (inFile), call the user-defined map function (mapF) for that file's
// contents, and partition mapF's output into nReduce intermediate files.
// There is one intermediate file per reduce task. The file name
// includes both the map task number and the reduce task number. Use
// the filename generated by reduceName(jobName, mapTask, r)
// as the intermediate file for reduce task r. Call ihash() (see
// below) on each key, mod nReduce, to pick r for a key/value pair.
// mapF() is the map function provided by the application. The first
// argument should be the input file name, though the map function
// typically ignores it. The second argument should be the entire
// input file contents. mapF() returns a slice containing the
// key/value pairs for reduce; see common.go for the definition of
// KeyValue.
// Look at Go's ioutil and os packages for functions to read
// and write files.
// Coming up with a scheme for how to format the key/value pairs on
// disk can be tricky, especially when taking into account that both
// keys and values could contain newlines, quotes, and any other
// character you can think of.
// One format often used for serializing data to a byte stream that the
// other end can correctly reconstruct is JSON. You are not required to
// use JSON, but as the output of the reduce tasks *must* be JSON,
// familiarizing yourself with it here may prove useful. You can write
// out a data structure as a JSON string to a file using the commented
// code below. The corresponding decoding functions can be found in
// common_reduce.go.
// enc := json.NewEncoder(file)
// for _, kv := ... {
// err := enc.Encode(&kv)
// Remember to close the file after you have written all the values!
// Your code here (Part I).
inputFile, err := os.Open(inFile)
if err != nil {
log.Fatal("doMap: open input file: ", inFile, " error: ", err)
defer inputFile.Close()
fileInfo, err := inputFile.Stat()
if err != nil {
log.Fatal("doMap: getstat input file: ", inFile, " error: ", err)
inputFileContent := make([]byte, fileInfo.Size())
_, err = inputFile.Read(inputFileContent)
if err != nil {
log.Fatal("doMap: read input file: ", inFile, " error: ", err)
keyValues := mapF(inFile, string(inputFileContent))
for i := 0; i < nReduce; i++ {
fileName := reduceName(jobName, mapTask, i)
reduceFile, err := os.Create(fileName)
if err != nil {
log.Fatal("doMap: create input file: ", inFile, " error: ", err)
defer reduceFile.Close()
enc := json.NewEncoder(reduceFile)
for _, kv := range keyValues {
if ihash(kv.Key)%nReduce == i {
err := enc.Encode(&kv)
if err != nil {
log.Fatal("doMap: json encode error: ", err)
func ihash(s string) int {
h := fnv.New32a()
return int(h.Sum32() & 0x7fffffff)
package mapreduce
import (
func doReduce(
jobName string, // the name of the whole MapReduce job
reduceTask int, // which reduce task this is
outFile string, // write the output here
nMap int, // the number of map tasks that were run ("M" in the paper)
reduceF func(key string, values []string) string,
) {
// doReduce manages one reduce task: it should read the intermediate
// files for the task, sort the intermediate key/value pairs by key,
// call the user-defined reduce function (reduceF) for each key, and
// write reduceF's output to disk.
// You'll need to read one intermediate file from each map task;
// reduceName(jobName, m, reduceTask) yields the file
// name from map task m.
// Your doMap() encoded the key/value pairs in the intermediate
// files, so you will need to decode them. If you used JSON, you can
// read and decode by creating a decoder and repeatedly calling
// .Decode(&kv) on it until it returns an error.
// You may find the first example in the golang sort package
// documentation useful.
// reduceF() is the application's reduce function. You should
// call it once per distinct key, with a slice of all the values
// for that key. reduceF() returns the reduced value for that key.
// You should write the reduce output as JSON encoded KeyValue
// objects to the file named outFile. We require you to use JSON
// because that is what the merger than combines the output
// from all the reduce tasks expects. There is nothing special about
// JSON -- it is just the marshalling format we chose to use. Your
// output code will look something like this:
// enc := json.NewEncoder(file)
// for key := ... {
// enc.Encode(KeyValue{key, reduceF(...)})
// }
// file.Close()
// Your code here (Part I).
keyValues := make(map[string][]string, 0)
//find the keys in all map tasks for reduceTask i
for i := 0; i < nMap; i++ {
fileName := reduceName(jobName, i, reduceTask)
file, err := os.Open(fileName)
if err != nil {
log.Fatal("doReduce: open intermediate file ", fileName, " error: ", err)
defer file.Close()
decoder := json.NewDecoder(file)
for {
var kv KeyValue
err := decoder.Decode(&kv)
if err != nil {
_, ok := keyValues[kv.Key]
if !ok {
keyValues[kv.Key] = make([]string, 0)
keyValues[kv.Key] = append(keyValues[kv.Key], kv.Value)
var keys []string
for k := range keyValues {
keys = append(keys, k)
resultFile, err := os.Create(outFile)
if err != nil {
log.Fatal("doReduce: create reduced file ", outFile, " error: ", err)
defer resultFile.Close()
enc := json.NewEncoder(resultFile)
for _, k := range keys {
reducedValue := reduceF(k, keyValues[k])
err := enc.Encode(&KeyValue{k, reducedValue})
if err != nil {
log.Fatal("doReduce: encode error: ", err)
Part II
func mapF(filename string, contents string) []mapreduce.KeyValue {
// Your code here (Part II).
words := strings.FieldsFunc(contents, func(c rune) bool {
return !unicode.IsLetter(c)
keyValues := make([]mapreduce.KeyValue, 0)
for _, word := range words {
keyValues = append(keyValues, mapreduce.KeyValue{word, "1"})
return keyValues
func reduceF(key string, values []string) string {
// Your code here (Part II).
sum := 0
for _, i := range values {
count, err := strconv.Atoi(i)
if err != nil {
log.Fatal("reduceF: strconv string to int error: ", err)
sum += count
return strconv.Itoa(sum)
Part III
package mapreduce
import (
// schedule() starts and waits for all tasks in the given phase (mapPhase
// or reducePhase). the mapFiles argument holds the names of the files that
// are the inputs to the map phase, one per map task. nReduce is the
// number of reduce tasks. the registerChan argument yields a stream
// of registered workers; each item is the worker's RPC address,
// suitable for passing to call(). registerChan will yield all
// existing registered workers (if any) and new ones as they register.
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)
// All ntasks tasks have to be scheduled on workers. Once all tasks
// have completed successfully, schedule() should return.
// Your code here (Part III, Part IV).
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < ntasks; i++ {
go func(i int, phase jobPhase) {
defer wg.Done()
for {
worker := <-registerChan
var doTaskArgs DoTaskArgs
switch phase {
case mapPhase:
doTaskArgs = DoTaskArgs{
case reducePhase:
doTaskArgs = DoTaskArgs{
ok := call(worker, "Worker.DoTask", &doTaskArgs, nil)
if ok {
go func() {
select {
case registerChan <- worker:
fmt.Println("worker has gone phase: ", phase, " i:", i)
//prevent block with context
case <-ctx.Done():
fmt.Println("exit phase: ", phase, " i:", i)
}(i, phase)
fmt.Printf("Schedule: %v done\n", phase)