1.Map端
package Task7.productsSales;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// k2 v2
public class ProductsMapper extends Mapper<LongWritable, Text, Text, Products> {
@Override
protected void map(LongWritable key1, Text value1, Context context)
throws IOException, InterruptedException {
//存储数据
String data = value1.toString();
//分词
String[] words = data.split(",");
//创建商品对象
Products p = new Products();
//日期---将日期只保留年份,即words数组的第三位
String date = words[2];
Date d = null;
try {
d = new SimpleDateFormat("yyyy-mm-dd").parse(date);
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
SimpleDateFormat Syear = new SimpleDateFormat("yyyy");
String year = Syear.format(d);
p.setPdate(year);
//销售数量
p.setPnum(Integer.parseInt(words[5]));
//销售总额
p.setPtotal(Double.valueOf(words[6]));
//输出:k2 年份 v2 销售笔数
context.write(new Text(p.getPdate()), p);
}
}
2.Reduce端
package Task7.productsSales;
import java.io.IOException;
import java.text.DecimalFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class ProductsReducer extends Reducer<Text, Products, Text, Text> {
protected void reduce(Text k3, Iterable<Products> v3,Context context) throws IOException, InterruptedException {
/*
* k3 v3
*/
int total_number = 0;
double total_money = 0;
for(Products p:v3){
//总销量
total_number = total_number+p.getPnum();
//总金额
total_money = total_money+p.getPtotal();
}
DecimalFormat df = new DecimalFormat("#.00");
String show = "销售量为:" + Integer.toString(total_number) + "---------" + "销售总金额为: " + df.format(total_money);
context.write(k3, new Text(show));
}
}
3.Main主类
package Task7.productsSales;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ProductsMain {
public static void main(String[] args) throws Exception {
//创建一个job
Job job = Job.getInstance(new Configuration());
job.setJarByClass(ProductsMain.class);
//指定job的mapper和输出的类型 k2 v2
job.setMapperClass(ProductsMapper.class);
job.setMapOutputKeyClass(Text.class); //年份
job.setMapOutputValueClass(Products.class); //销售量
//指定job的reducer和输出的类型 k4 v4
job.setReducerClass(ProductsReducer.class);
job.setOutputKeyClass(Text.class); //年份
job.setOutputValueClass(Text.class); //销售量
//指定job的输入和输出的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//执行任务
job.waitForCompletion(true);
}
}
4.Product序列化
package Task7.productsSales;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
//数据: 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
public class Products implements Writable {
private int pno;
private int bno;
private String pdate;
private int pmethod;
private int psales;
private int pnum;
private Double ptotal;
public void readFields(DataInput input) throws IOException {
// 反序列化
this.pno = input.readInt();
this.bno = input.readInt();
this.pdate = input.readUTF();
this.pmethod = input.readInt();
this.psales = input.readInt();
this.pnum = input.readInt();
this.ptotal = input.readDouble();
}
public void write(DataOutput output) throws IOException {
// 序列化
output.writeInt(this.pno);
output.writeInt(this.bno);
output.writeUTF(this.pdate);
output.writeInt(this.pmethod);
output.writeInt(this.psales);
output.writeInt(this.pnum);
output.writeDouble(this.ptotal);
}
public int getPno() {
return pno;
}
public void setPno(int pno) {
this.pno = pno;
}
public int getBno() {
return bno;
}
public void setBno(int bno) {
this.bno = bno;
}
public String getPdate() {
return pdate;
}
public void setPdate(String pdate) {
this.pdate = pdate;
}
public int getPmethod() {
return pmethod;
}
public void setPmethod(int pmethod) {
this.pmethod = pmethod;
}
public int getPsales() {
return psales;
}
public void setPsales(int psales) {
this.psales = psales;
}
public int getPnum() {
return pnum;
}
public void setPnum(int pnum) {
this.pnum = pnum;
}
public Double getPtotal() {
return ptotal;
}
public void setPtotal(Double potal) {
this.ptotal = potal;
}
}
5.在Linux终端,使用命令将要统计的txt文件上传到HDFS中
hdfs dfs -put linux下的文件路径 要上传到hdfs中的路径
6.在Linux终端输入进入上述java代码所在路径,打包成jar包
mvn clean package
7.接着使用命令:cd target 进入target目录
8.运行程序
hadoop jar ...具体命令忘记了。。。书上有,看书
注意此处运行后存放结果的那个文件,必须是不存在的,不能事先创建好
9.得到并查看结果
hdfs dfs -cat ...填写你自己设置的显示文件的地址