本次尝试自定义输出类型
手机流量分为上传流量和下载流量,统计的时候需要得到的结果表示为(手机号 上传流量 下载流量 总流量)例如(13333333333 200 400 600),数据集中包含(手机号 上传流量 下载流量)。因此需要自定义输出的类型。还是先新建maven项目,pom.xml和wordcount的一样。
1、先新建一个FlowBean类
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 用来保存对应号码上下行流量的对象
*/
public class FlowBean implements Writable{
private long upFlow;
private long dFlow;
private long sumFlow;
public FlowBean(){
}
public FlowBean(long upFlow, long dFlow){
this.upFlow = upFlow;
this.dFlow = dFlow;
this.sumFlow = upFlow + dFlow;
}
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(dFlow);
out.writeLong(sumFlow);
}
public void readFields(DataInput dataInput) throws IOException {
upFlow = dataInput.readLong();
dFlow = dataInput.readLong();
sumFlow = dataInput.readLong();
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getdFlow() {
return dFlow;
}
public void setdFlow(long dFlow) {
this.dFlow = dFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
}
2、新建流量统计类(包含map和reduce)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class FlowCount {
static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
//将一行内容转成String
String line = value.toString();
//切分字段
String[] fields = line.split("\t");
//取出手机号
String phoneNum = fields[0];
//取出上传下载流量
long upFlow = Long.parseLong(fields[1]);
long dFlow = Long.parseLong(fields[2]);
context.write(new Text(phoneNum), new FlowBean(upFlow, dFlow));
}
}
static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {
long sum_upFlow = 0;
long sum_dFlow = 0;
//遍历bean
for(FlowBean bean:values){
sum_upFlow += bean.getUpFlow();
sum_dFlow += bean.getdFlow();
}
FlowBean resultBean = new FlowBean(sum_upFlow, sum_dFlow);
context.write(key, resultBean);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//指定本程序的jar包所在的路径
job.setJarByClass(FlowCount.class);
//指定map
job.setMapperClass(FlowCountMapper.class);
//指定reduce
job.setReducerClass(FlowCountReducer.class);
//指定map输出
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//指定reduce输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定job的输出原始文件所在目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//提交job
boolean res = job.waitForCompletion(true);
System.exit(res?0:1);
}
}
将map和reduce作为内部类,代码结构更加简单
然后打包,上传到hadoop服务器,运行成功~检查输出结果如下:
统计应该是成功了,但是显示出来的是地址而不是值,修改一下代码,给FlowBean重写toString,然后reduce部分改成输出Text:
static class FlowCountReducer extends Reducer<Text, FlowBean, Text, Text>{
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long sum_upFlow = 0;
long sum_dFlow = 0;
//遍历bean
for(FlowBean bean:values){
sum_upFlow += bean.getUpFlow();
sum_dFlow += bean.getdFlow();
}
FlowBean resultBean = new FlowBean(sum_upFlow, sum_dFlow);
context.write(key, new Text(resultBean.toString()));
}
}
然后重新打包上传运行,结果变成我们想要的了: