1、 自定义排序
可通过实现writableComparable接口来实现自定义排序,并且使用此自定义类作为Mapper的输出key,例如有下面的订单
订单ID 商品ID 价格
0000001 Pdt_01 222.8
0000002 Pdt_06 722.4
0000001 Pdt_05 25.8
0000003 Pdt_01 222.8
0000003 Pdt_01 33.8
0000002 Pdt_03 522.8
0000002 Pdt_04 122.4
我们构造订单对象,并通过订单id排序
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class Order implements WritableComparable<Order> {
private String orderId;
private String goodsId;
private Double goodsPrice;
/**
* 按价格倒序
* @param o
* @return
*/
public int compareTo(Order o) {
//比较两个orderid
return this.orderId.compareTo(o.getOrderId());
}
//序列化
public void write(DataOutput dataOutput) throws IOException {
dataOutput.write((orderId + "\001" + goodsId + "\001" + goodsPrice).getBytes());
}
//反序列化
public void readFields(DataInput dataInput) throws IOException {
String[] res = dataInput.readLine().split("\001");
this.orderId = res[0];
this.goodsId = res[1];
this.goodsPrice = Double.parseDouble(res[2]);
}
@Override
public String toString() {
return this.orderId + "\t" + this.goodsId + "\t" + this.goodsPrice;
}
}
2、 二次排序
二次排序,即在原有排序中,有相同的数据,我们通过第二个参数来进行排序,参与字段为2个,例如上面的例子,我们在订单ID相同的时候,再使用价格倒序排序,代码如下
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class Order implements WritableComparable<Order> {
private String orderId;
private String goodsId;
private Double goodsPrice;
/**
* 按价格倒序
* @param o
* @return
*/
public int compareTo(Order o) {
//比较两个orderid
int order = this.orderId.compareTo(o.getOrderId());
//如果orderid相等,则返回价格的比较结果的相反值(倒序排列),否则返回order的比较结果
return order != 0 ? order : -1 * (this.goodsPrice).compareTo(o.goodsPrice);
}
//序列化
public void write(DataOutput dataOutput) throws IOException {
dataOutput.write((orderId + "\001" + goodsId + "\001" + goodsPrice).getBytes());
}
//反序列化
public void readFields(DataInput dataInput) throws IOException {
String[] res = dataInput.readLine().split("\001");
this.orderId = res[0];
this.goodsId = res[1];
this.goodsPrice = Double.parseDouble(res[2]);
}
@Override
public String toString() {
return this.orderId + "\t" + this.goodsId + "\t" + this.goodsPrice;
}
}
3、 分组排序
分组排序,即对数据进行分组,然后在组内排序,通过实现WritableComparator来创建一个自定义分组类,在job中,使用setGroupingComparatorClass方法设置分组类
例如上面例子,我们需要求出每个订单中价格最贵的商品
package top.gujm.gourpsort;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class Order implements WritableComparable<Order> {
private String orderId;
private String goodsId;
private Double goodsPrice;
/**
* 按orderid排序升序,按价格倒序
* 底层判断分组,只会比较相邻两个数据的WritableComparator.compare(data1,data2)的值是否为0,如果为0,是同一分组,否则不同分组
* 所以在这里要先通过订单id排序,确保相同订单号的订单是相邻的
* @param o
* @return
*/
public int compareTo(Order o) {
//比较两个orderid
int order = this.orderId.compareTo(o.getOrderId());
//如果orderid相等,则返回价格的比较结果的相反值(倒序排列),否则返回order的比较结果
return order != 0 ? order : -1 * (this.goodsPrice).compareTo(o.goodsPrice);
}
//序列化
public void write(DataOutput dataOutput) throws IOException {
dataOutput.write((orderId + "\001" + goodsId + "\001" + goodsPrice).getBytes());
}
//反序列化
public void readFields(DataInput dataInput) throws IOException {
String[] res = dataInput.readLine().split("\001");
this.orderId = res[0];
this.goodsId = res[1];
this.goodsPrice = Double.parseDouble(res[2]);
}
@Override
public String toString() {
return this.orderId + "\t" + this.goodsId + "\t" + this.goodsPrice;
}
}
package top.gujm.gourpsort;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class Group extends WritableComparator {
public Group(){
super(Order.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
//通过此方法,来判断两个订单是否是相同订单,即是否为同一分组,如果返回值为0,则为同一分组
//向上转型
Order a1 = (Order)a;
Order b1 = (Order)b;
//比较oderid进行分组及排序
return a1.getOrderId().compareTo(b1.getOrderId());
}
}
package top.gujm.gourpsort;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class OrderMapper extends Mapper<LongWritable, Text, Order, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//分割行数据
String[] fields = value.toString().split("\t");
//构造订单对象
Order order = new Order(fields[0], fields[1], Double.parseDouble(fields[2]));
//写出数据
context.write(order, NullWritable.get());
}
}
package top.gujm.gourpsort;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class OrderReduce extends Reducer<Order, NullWritable, Order, NullWritable> {
@Override
protected void reduce(Order key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
//写出数据
context.write(key, NullWritable.get());
}
}
package top.gujm.utils;
import org.apache.commons.beanutils.converters.DateTimeConverter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class JobUtils {
private static Class partitionerClass = null;
private static int reduceTasks = 1;
private static long maxSize = 0;
private static long minSize = 0;
private static boolean useCombineTextInputFormat = false;
private static Class groupingComparator = null;
/**
* 创建一个job任务
* @param jarClass 对应的Jar的类型
* @param mapperClass Mapper的类型
* @param mapperOutputKeyClass Mapper的输出key的类型
* @param mapperOutputValueClass Mapper的输出value类型
* @param reducerClass Reducer类型
* @param reducerOutputKeyClass Reducer输出key类型
* @param reducerOutputValueClass Reducer输出value类型
* @param inputPath 输入文件路径
* @param outPath 输出路径
* @return Job任务
* @throws IOException 异常
*/
public static Job createJob(
Class jarClass,
Class mapperClass, Class mapperOutputKeyClass, Class mapperOutputValueClass,
Class reducerClass, Class reducerOutputKeyClass, Class reducerOutputValueClass,
String inputPath, String outPath
) throws IOException {
//通过configuration获取到job实例
Job job = Job.getInstance(new Configuration());
//设置job的主类
job.setJarByClass(jarClass);
//设置Mapper的类
job.setMapperClass(mapperClass);
//设置Mapper的输出的key类型
job.setMapOutputKeyClass(mapperOutputKeyClass);
//设置Mapper的输出的value类型
job.setMapOutputValueClass(mapperOutputValueClass);
if(reducerClass != null) {
//设置Reducer的类
job.setReducerClass(reducerClass);
//设置Reducer的输出的key类型(也是整个job的输出类型)
job.setOutputKeyClass(reducerOutputKeyClass);
//设置Reducer的输出的value类型(也是整个job的输出类型)
job.setOutputValueClass(reducerOutputValueClass);
}
//job设置分组排序
if(groupingComparator != null) {
job.setGroupingComparatorClass(groupingComparator);
}
//设置自定义partiniter
if(partitionerClass != null){
//设置partiniter类
job.setPartitionerClass(partitionerClass);
//根据partitioner逻辑,设置reduce任务数
job.setNumReduceTasks(reduceTasks);
}
//使用CombineTextInputFormat
if(useCombineTextInputFormat){
job.setInputFormatClass(CombineTextInputFormat.class);
if(maxSize > 0) {
CombineTextInputFormat.setMaxInputSplitSize(job, maxSize);
}
if(minSize > 0) {
CombineTextInputFormat.setMinInputSplitSize(job, minSize);
}
}
//设置输入文件
FileInputFormat.setInputPaths(job, new Path(inputPath));
//设置输出文件
FileOutputFormat.setOutputPath(job, new Path(outPath));
return job;
}
/**
* 设置分组排序的类
* @param groupingComparator
*/
public static void setGroupingComparator(Class groupingComparator){
JobUtils.groupingComparator = groupingComparator;
}
/**
* 设置Partitioner类(分区)
* @param partitionerClass partitioner类
* @param reduceTasks reduce任务数
*/
public static void setPartitioner(Class partitionerClass, int reduceTasks){
JobUtils.partitionerClass = partitionerClass;
JobUtils.reduceTasks = reduceTasks;
}
/**
* 使用CombineTextInputFormat
* @param maxSize 最大
* @param minSize 最小
*/
public static void useCombineTextInputFormat(long maxSize, long minSize){
JobUtils.useCombineTextInputFormat = true;
JobUtils.maxSize = maxSize;
JobUtils.minSize = minSize;
}
}
package top.gujm.gourpsort;
import org.apache.hadoop.io.NullWritable;
import top.gujm.utils.JobUtils;
import java.io.IOException;
public class OrderDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
JobUtils.setGroupingComparator(Group.class);
JobUtils.createJob(
OrderDriver.class,
OrderMapper.class,
Order.class,
NullWritable.class,
OrderReduce.class,
Order.class,
NullWritable.class,
args[0],
args[1]
).waitForCompletion(true);
}
}