hadoop入门-MapReduce实例(二)

本次尝试自定义输出类型
手机流量分为上传流量和下载流量,统计的时候需要得到的结果表示为(手机号 上传流量 下载流量 总流量)例如(13333333333 200 400 600),数据集中包含(手机号 上传流量 下载流量)。因此需要自定义输出的类型。还是先新建maven项目,pom.xml和wordcount的一样。
1、先新建一个FlowBean类

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * 用来保存对应号码上下行流量的对象
 */
public class FlowBean implements Writable{
    private long upFlow;
    private long dFlow;
    private long sumFlow;

    public FlowBean(){

    }

    public FlowBean(long upFlow, long dFlow){
        this.upFlow = upFlow;
        this.dFlow = dFlow;
        this.sumFlow = upFlow + dFlow;
    }

    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(dFlow);
        out.writeLong(sumFlow);
    }

    public void readFields(DataInput dataInput) throws IOException {
        upFlow = dataInput.readLong();
        dFlow = dataInput.readLong();
        sumFlow = dataInput.readLong();
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getdFlow() {
        return dFlow;
    }

    public void setdFlow(long dFlow) {
        this.dFlow = dFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }
}

2、新建流量统计类(包含map和reduce)

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;


public class FlowCount {

    static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
            //将一行内容转成String
            String line = value.toString();
            //切分字段
            String[] fields = line.split("\t");
            //取出手机号
            String phoneNum = fields[0];
            //取出上传下载流量
            long upFlow = Long.parseLong(fields[1]);
            long dFlow = Long.parseLong(fields[2]);
            context.write(new Text(phoneNum), new FlowBean(upFlow, dFlow));
        }
    }

    static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException {
            long sum_upFlow = 0;
            long sum_dFlow = 0;
            //遍历bean
            for(FlowBean bean:values){
                sum_upFlow += bean.getUpFlow();
                sum_dFlow += bean.getdFlow();
            }
            FlowBean resultBean = new FlowBean(sum_upFlow, sum_dFlow);
            context.write(key, resultBean);
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        //指定本程序的jar包所在的路径
        job.setJarByClass(FlowCount.class);
        //指定map
        job.setMapperClass(FlowCountMapper.class);
        //指定reduce
        job.setReducerClass(FlowCountReducer.class);
        //指定map输出
        job.setMapOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        //指定reduce输出
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        //指定job的输入原始文件所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //指定job的输出原始文件所在目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //提交job
        boolean res = job.waitForCompletion(true);
        System.exit(res?0:1);

    }
}

将map和reduce作为内部类,代码结构更加简单
然后打包,上传到hadoop服务器,运行成功~检查输出结果如下:


查看输出

统计应该是成功了,但是显示出来的是地址而不是值,修改一下代码,给FlowBean重写toString,然后reduce部分改成输出Text:

static class FlowCountReducer extends Reducer<Text, FlowBean, Text, Text>{
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
        long sum_upFlow = 0;
        long sum_dFlow = 0;
        //遍历bean
        for(FlowBean bean:values){
            sum_upFlow += bean.getUpFlow();
            sum_dFlow += bean.getdFlow();
        }
        FlowBean resultBean = new FlowBean(sum_upFlow, sum_dFlow);
        context.write(key, new Text(resultBean.toString()));
    }
}

然后重新打包上传运行,结果变成我们想要的了:


新的结果
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容