Example 2-3. Mapper for the maximum temperature example
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
The Mapper class is a generic type, with four formal type parameters that specify the input key, input value, output key, and output value types of the map function. For the present example, the input key is a long integer offset, the input value is a line of text, the output key is a year, and the output value is an air temperature (an integer). Rather than using built-in Java types, Hadoop provides its own set of basic types that are optimized for network serialization. These are found in the org.apache.hadoop.io package. Here we use LongWritable, which corresponds to a Java Long, Text (like Java String), and IntWritable (like Java Integer). The map() method is passed a key and a value. We convert the Text value containing the line of input into a Java String, then use its substring() method to extract the columns we are interested in. The map() method also provides an instance of Context to write the output to. In this case, we write the year as a Text object (since we are just using it as a key), and the temperature is wrapped in an IntWritable. We write an output record only if the temperature is present and the quality code indicates the temperature reading is OK. The reduce function is similarly defined using a Reducer, as illustrated in Example 2-4.
Example 2-4. Reducer for the maximum temperature example
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
Again, four formal type parameters are used to specify the input and output types, this time for the reduce function. The input types of the reduce function must match the output types of the map function: Text and IntWritable. And in this case, the output types of the reduce function are Text and IntWritable, for a year and its maximum temperature,
which we find by iterating through the temperatures and comparing each with a record of the highest found so far.
The third piece of code runs the MapReduce job (see Example 2-5).
Example 2-5. Application to find the maximum temperature in the weather dataset
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MaxTemperature {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max temperature")
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
A Job object forms the specification of the job and gives you control over how the job is
run. When we run this job on a Hadoop cluster, we will package the code into a JAR file
(which Hadoop will distribute around the cluster). Rather than explicitly specifying the
name of the JAR file, we can pass a class in the Job’s setJarByClass() method, which
Hadoop will use to locate the relevant JAR file by looking for the JAR file containing this
class.
Having constructed a Job object, we specify the input and output paths. An input path is
specified by calling the static addInputPath() method on FileInputFormat, and it can be
a single file, a directory (in which case, the input forms all the files in that directory), or a
file pattern. As the name suggests, addInputPath() can be called more than once to use
input from multiple paths.
The output path (of which there is only one) is specified by the static setOutputPath()
method on FileOutputFormat. It specifies a directory where the output files from the
reduce function are written. The directory shouldn’t exist before running the job because
Hadoop will complain and not run the job. This precaution is to prevent data loss (it can
be very annoying to accidentally overwrite the output of a long job with that of another).
Next, we specify the map and reduce types to use via the setMapperClass() and
setReducerClass() methods.
The setOutputKeyClass() and setOutputValueClass() methods control the output types
for the reduce function, and must match what the Reduce class produces. The map output
types default to the same types, so they do not need to be set if the mapper produces the
same types as the reducer (as it does in our case). However, if they are different, the map
output types must be set using the setMapOutputKeyClass() and
setMapOutputValueClass() methods.
The input types are controlled via the input format, which we have not explicitly set
because we are using the default TextInputFormat.
After setting the classes that define the map and reduce functions, we are ready to run the
job. The waitForCompletion() method on Job submits the job and waits for it to finish.
The single argument to the method is a flag indicating whether verbose output is
generated. When true, the job writes information about its progress to the console.
The return value of the waitForCompletion() method is a Boolean indicating success
(true) or failure (false), which we translate into the program’s exit code of 0 or 1.