使用MapReduce计算每年的销售量和销售额

1.Map端

package Task7.productsSales;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; 
// k2  v2  
public class ProductsMapper extends Mapper<LongWritable, Text, Text, Products> { 
@Override 
protected void map(LongWritable key1, Text value1, Context context) 
throws IOException, InterruptedException { 
//存储数据
String data = value1.toString(); 
//分词 
String[] words = data.split(","); 
//创建商品对象 
Products p = new Products(); 

//日期---将日期只保留年份,即words数组的第三位
String date = words[2];
Date d = null;
try {
    d = new SimpleDateFormat("yyyy-mm-dd").parse(date);
} catch (ParseException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
}
SimpleDateFormat Syear = new SimpleDateFormat("yyyy");
String year = Syear.format(d);
p.setPdate(year);

//销售数量
p.setPnum(Integer.parseInt(words[5])); 

//销售总额
p.setPtotal(Double.valueOf(words[6])); 

//输出:k2 年份 v2 销售笔数 
context.write(new Text(p.getPdate()), p); 
} 
}

2.Reduce端

package Task7.productsSales;

import java.io.IOException;
import java.text.DecimalFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; 
public class ProductsReducer extends Reducer<Text, Products, Text, Text> { 
protected void reduce(Text k3, Iterable<Products> v3,Context context) throws IOException, InterruptedException {
/* 
* k3  v3 
*/ 
    int total_number = 0;
    double total_money = 0;
    
for(Products p:v3){ 
//总销量
total_number = total_number+p.getPnum();
//总金额
total_money = total_money+p.getPtotal();
} 
DecimalFormat df = new DecimalFormat("#.00");
String show = "销售量为:" + Integer.toString(total_number) + "---------" + "销售总金额为: " + df.format(total_money);
context.write(k3, new Text(show)); 
} 
}

3.Main主类

package Task7.productsSales;

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
public class ProductsMain { 
public static void main(String[] args) throws Exception { 
//创建一个job 
Job job = Job.getInstance(new Configuration()); 
job.setJarByClass(ProductsMain.class); 
//指定job的mapper和输出的类型 k2 v2 
job.setMapperClass(ProductsMapper.class); 
job.setMapOutputKeyClass(Text.class); //年份
job.setMapOutputValueClass(Products.class); //销售量

//指定job的reducer和输出的类型 k4 v4 
job.setReducerClass(ProductsReducer.class); 
job.setOutputKeyClass(Text.class); //年份
job.setOutputValueClass(Text.class); //销售量

//指定job的输入和输出的路径 
FileInputFormat.setInputPaths(job, new Path(args[0])); 
FileOutputFormat.setOutputPath(job, new Path(args[1])); 

//执行任务 
job.waitForCompletion(true); 
} 
}

4.Product序列化

package Task7.productsSales;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

//数据: 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30 
public class Products implements Writable {
    private int pno;
    private int bno;
    private String pdate;
    private int pmethod;
    private int psales;
    private int pnum;
    private Double ptotal;

    public void readFields(DataInput input) throws IOException {
        // 反序列化
        this.pno = input.readInt();
        this.bno = input.readInt();
        this.pdate = input.readUTF();
        this.pmethod = input.readInt();
        this.psales = input.readInt();
        this.pnum = input.readInt();
        this.ptotal = input.readDouble();
    }

    public void write(DataOutput output) throws IOException {
        // 序列化
        output.writeInt(this.pno);
        output.writeInt(this.bno);
        output.writeUTF(this.pdate);
        output.writeInt(this.pmethod);
        output.writeInt(this.psales);
        output.writeInt(this.pnum);
        output.writeDouble(this.ptotal);
    }

    public int getPno() {
        return pno;
    }

    public void setPno(int pno) {
        this.pno = pno;
    }

    public int getBno() {
        return bno;
    }

    public void setBno(int bno) {
        this.bno = bno;
    }

    public String getPdate() {
        return pdate;
    }

    public void setPdate(String pdate) {
        this.pdate = pdate;
    }

    public int getPmethod() {
        return pmethod;
    }

    public void setPmethod(int pmethod) {
        this.pmethod = pmethod;
    }
    
    public int getPsales() {
        return psales;
    }

    public void setPsales(int psales) {
        this.psales = psales;
    }


    public int getPnum() {
        return pnum;
    }

    public void setPnum(int pnum) {
        this.pnum = pnum;
    }
    
    public Double getPtotal() {
        return ptotal;
    }

    public void setPtotal(Double potal) {
        this.ptotal = potal;
    }

}

5.在Linux终端,使用命令将要统计的txt文件上传到HDFS中

hdfs dfs -put   linux下的文件路径  要上传到hdfs中的路径

6.在Linux终端输入进入上述java代码所在路径,打包成jar包

mvn clean package

7.接着使用命令:cd target 进入target目录
8.运行程序

hadoop  jar ...具体命令忘记了。。。书上有,看书
注意此处运行后存放结果的那个文件,必须是不存在的,不能事先创建好

9.得到并查看结果

hdfs dfs -cat ...填写你自己设置的显示文件的地址
image.png
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。