1 mapReduce的排序
前面讲过在map数据传入reduce之前需要进行数据的排序、分组。而排序的实现也很简单。map的输出在进行排序的时候都是通过key进行的排序,而要对key进行排序,那么key就需要是先jdk的Comparable接口。同时key还需要序列化,需要实现hadoop自定义的Writable接口。所以呢为了方便,hadoop提供了WritableComparable接口。这个接口继承了上面的两个接口。
所以这里引申出了一个问题,当最后的排序,和key的比较值不相等时如何处理呢?也就是我的key需要进行自定义的排序,但是排序的字段不能唯一确定我的这行数据
是不是需要进行两次mapreduce才行呢?
2 实现
数据是已经经过合并后的:
13480253104 180 180 360
13502468823 7335 110349 117684
13560436666 1116 954 2070
13560439658 2034 5892 7926
13602846565 1938 2910 4848
13660577991 6960 690 7650
13719199419 240 0 240
13726230503 2481 24681 27162
13726238888 2481 24681 27162
13760778710 120 120 240
13826544101 264 0 264
13922314466 3008 3720 6728
13925057413 11058 48243 59301
13926251106 240 0 240
13926435656 132 1512 1644
15013685858 3659 3538 7197
15920133257 3156 2936 6092
15989002119 1938 180 2118
18211575961 1527 2106 3633
18320173382 9531 2412 11943
84138413 4116 1432 5548
map:
package com.jiyx.test.mapred.sort;
import com.jiyx.test.mapred.sort.DataBean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* key是自定义javaBean,只需要实现了WritableComparable接口,
* 那么shuffle在进行排序的时候就会按照用户自动的方法进行排序了
* @author jiyx
* @create 2018-10-20-16:04
*/
public class SortMapper extends Mapper<LongWritable, Text, DataBean, NullWritable> {
private DataBean k = new DataBean();
@Override
protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
String[] datas = value.toString().split("\t");
long phoneNum = Long.parseLong(datas[0]);
long upFlow = Long.parseLong(datas[1]);
long downFlow = Long.parseLong(datas[2]);
context.write(k.set(phoneNum, upFlow, downFlow), NullWritable.get());
}
}
reduce:
package com.jiyx.test.mapred.sort;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author jiyx
* @create 2018-10-20-16:19
*/
public class SortReducer extends Reducer<DataBean, NullWritable, DataBean, NullWritable> {
@Override
protected void reduce(DataBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
自定义key:
package com.jiyx.test.mapred.sort;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* @author jiyx
* @create 2018-10-15-19:22
*/
public class DataBean implements WritableComparable<DataBean> {
private long phoneNum;
private long upFlow;
private long downFlow;
private long totalFlow;
public DataBean() {
}
public DataBean(long phoneNum, long upFlow, long downFlow) {
this.set(phoneNum, upFlow, downFlow);
}
/**
* 序列化
* @param dataOutput
* @throws IOException
*/
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(phoneNum);
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(totalFlow);
}
public DataBean set(long phoneNum, long upFlow, long downFlow) {
this.phoneNum = phoneNum;
this.downFlow = downFlow;
this.upFlow = upFlow;
this.totalFlow = upFlow + downFlow;
return this;
}
/**
* 反序列化
* @param dataInput
* @throws IOException
*/
@Override
public void readFields(DataInput dataInput) throws IOException {
phoneNum = dataInput.readLong();
upFlow = dataInput.readLong();
downFlow = dataInput.readLong();
totalFlow = dataInput.readLong();
}
/**
* 按照totalFlow升序排列
* @param o
* @return
*/
@Override
public int compareTo(DataBean o) {
// 这里不能返回0,因为排序的是已经统计过的数据,所以没有重复的数据了
// 所以这里需要按照自己的逻辑处理
if (this.totalFlow == o.getTotalFlow()) {
return 1;
}
return this.totalFlow > o.getTotalFlow() ? 1 : -1;
}
/**
* 重写toString主要是为了后面的写入文件
* @return
*/
@Override
public String toString() {
return this.phoneNum + "\t" + this.upFlow + "\t" + this.downFlow + "\t" + this.totalFlow;
}
public long getPhoneNum() {
return phoneNum;
}
public void setPhoneNum(long phoneNum) {
this.phoneNum = phoneNum;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getTotalFlow() {
return totalFlow;
}
public void setTotalFlow(long totalFlow) {
this.totalFlow = totalFlow;
}
}
job:
package com.jiyx.test.mapred.sort;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* @author jiyx
* @create 2018-10-20-16:35
*/
public class SortJob {
public static void main(String[] args) throws Exception {
Job job = Job.getInstance();
job.setJarByClass(SortJob.class);
job.setMapperClass(SortMapper.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(SortReducer.class);
// 这块需要注意的是自己踩了一个坑,就是将key和value整反了
// 然后就会出现异常java.io.IOException: Initialization of all the collectors failed. Error in last collector was:java.lang.ClassCastException: class com.jiyx.test.mapred.flowStatistics.bo.DataBean
// 所以这里最好注意下
job.setOutputKeyClass(DataBean.class);
job.setOutputValueClass(NullWritable.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
排序后的结果展示:
13760778710 120 120 240
13719199419 240 0 240
13926251106 240 0 240
13826544101 264 0 264
13480253104 180 180 360
13926435656 132 1512 1644
13560436666 1116 954 2070
15989002119 1938 180 2118
18211575961 1527 2106 3633
13602846565 1938 2910 4848
84138413 4116 1432 5548
15920133257 3156 2936 6092
13922314466 3008 3720 6728
15013685858 3659 3538 7197
13660577991 6960 690 7650
13560439658 2034 5892 7926
18320173382 9531 2412 11943
13726230503 2481 24681 27162
13726238888 2481 24681 27162
13925057413 11058 48243 59301
13502468823 7335 110349 117684