Hadoop从入门到精通30:MapReduce高级操作之对象排序

1.SQL中的排序

  • Order by可以对一个列或者多个列进行排序;
  • Order by后面的排序对象可以是列名、表达式、别名、序号;
  • Order by默认升序排列,作用于后面所有排序对象;
  • 如果要降序排列,可以在排序对象后面跟上desc;
  • desc只作用于其前面最近的一个排序对象;
  • 如果有多个对象都要降序则都要加上desc;

(1)单列排序

SQL> select * from emp order by sal desc;
SQL> select empno,ename,sal,sal12 from emp order by sal12 desc;
SQL> select empno,ename,sal,sal12 annsal from emp order by annsal desc;
SQL> select empno,ename,sal,sal
12 annsal from emp order by 4 desc;

(2)多列排序

SQL> select * from emp order by deptno,sal;
SQL> select * from emp order by deptno,sal desc;
SQL> select * from emp order by deptno desc,sal desc;

2.MapReduce中的排序

MapReduce中的对象排序需要满足以下条件:

  • 对象类实现接口WritableComparator;
  • 对象类重写方法write()和readFields();
  • 对象类重写方法compareTo();
  • 任务只有Mapper,没有Reducer(防止去重);

示例1:单列排序:查询员工信息,按照工资降序排列。

//对象类:Employee.java,实现WritableComparable接口,重写compareTo()方法
package demo.sort.mr.object;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
//实现接口:WritableComparable
public class Employee implements WritableComparable<Employee> {
  private int empno;
  private String ename;
  private String job;
  private int mgr;
  private String hiredate;
  private int sal;
  private int comm;
  private int deptno;
  @Override
  public String toString() {
    return "[" + this.ename + "\t" + this.sal + "]";
  }
  @Override
  public int compareTo(Employee other) {
    // 按工资降序排列
    if (this.sal >= other.getSal()) {
      return -1;
    } else {
      return 1;
    }
  }
  @Override
  public void readFields(DataInput input) throws IOException {
    // 反序列化
    this.empno = input.readInt();
    this.ename = input.readUTF();
    this.job = input.readUTF();
    this.mgr = input.readInt();
    this.hiredate = input.readUTF();
    this.sal = input.readInt();
    this.comm = input.readInt();
    this.deptno = input.readInt();
  }
  @Override
  public void write(DataOutput output) throws IOException {
    // 序列化
    output.writeInt(this.empno);
    output.writeUTF(this.ename);
    output.writeUTF(this.job);
    output.writeInt(this.mgr);
    output.writeUTF(this.hiredate);
    output.writeInt(this.sal);
    output.writeInt(this.comm);
    output.writeInt(this.deptno);
  }
  public int getEmpno() {
    return empno;
  }
  public void setEmpno(int empno) {
    this.empno = empno;
  }
  public String getEname() {
    return ename;
  }
  public void setEname(String ename) {
    this.ename = ename;
  }
  public String getJob() {
    return job;
  }
  public void setJob(String job) {
    this.job = job;
  }
  public int getMgr() {
    return mgr;
  }
  public void setMgr(int mgr) {
    this.mgr = mgr;
  }
  public String getHiredate() {
    return hiredate;
  }
  public void setHiredate(String hiredate) {
    this.hiredate = hiredate;
  }
  public int getSal() {
    return sal;
  }
  public void setSal(int sal) {
    this.sal = sal;
  }
  public int getComm() {
    return comm;
  }
  public void setComm(int comm) {
    this.comm = comm;
  }
  public int getDeptno() {
    return deptno;
  }
  public void setDeptno(int deptno) {
    this.deptno = deptno;
  }
}
//Mapper文件:EmployeeSortMapper.java
package demo.sort.mr.object;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class EmployeeSortMapper extends Mapper<LongWritable, Text, Employee, NullWritable> {
  @Override
  protected void map(LongWritable key1, Text value1, Context context) 
      throws IOException, InterruptedException {
    // 把读入的数据封装成一个员工对象
    // 数据格式:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
    String str = value1.toString();
    // 分词
    String[] words = str.split(",");
    // 创建一个员工对象
    Employee e = new Employee();
    // 设置员工的属性
    e.setEmpno(Integer.parseInt(words[0]));
    e.setEname(words[1]);
    e.setJob(words[2]);
    try {
      e.setMgr(Integer.parseInt(words[3]));
    } catch (Exception ex) {
      e.setMgr(0);
    }
    e.setHiredate(words[4]);
    e.setSal(Integer.parseInt(words[5]));
    try {
      e.setComm(Integer.parseInt(words[6]));
    } catch (Exception ex) {
      e.setComm(0);
    }
    e.setDeptno(Integer.parseInt(words[7]));
    // 输出:员工对象,null
    context.write(e, NullWritable.get());
  }
}
//Job文件:EmployeeSortMain.java
package demo.sort.mr.object;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class EmployeeSortMain {
  public static void main(String[] args) throws Exception {
    // 创建Job = Mapper (没有Reducer)
    Job job = Job.getInstance(new Configuration());
    // 指定任务的入口
    job.setJarByClass(EmployeeSortMain.class);
    // 指定任务的Mapper和输出的数据类型
    job.setMapperClass(EmployeeSortMapper.class);
    job.setMapOutputKeyClass(Employee.class);
    job.setMapOutputValueClass(NullWritable.class);
    // 没有Reducer,直接指定输出的类型
    job.setOutputKeyClass(Employee.class);
    job.setOutputValueClass(NullWritable.class);
    // 指定输入和输出的路径:HDFS目录
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    // 执行任务
    job.waitForCompletion(true);
  }
}

打包并执行程序:
1.将demo.sort.mr.object目录打包成employeesort.jar,并指定主类是EmployeeSortMain.java
2.将employeesort.jar上传到服务器,/root/input/employeesort.jar
3.准备测试数据HDFS:/input/emp.csv
4.执行程序:# hadoop jar /root/input/employeesort.jar /input/emp.csv /output/employeesort
5.查看输出目录:# hdfs dfs -ls /output/employeesort
6.查看结果:# hdfs dfs -cat /output/employeesort/part-r-00000

[root@bigdata input]# hdfs dfs -cat /input/emp.csv
7369,SMITH,CLERK,7902,1980/12/17,800,,20
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,,10
7788,SCOTT,ANALYST,7566,1987/4/19,3000,,20
7839,KING,PRESIDENT,,1981/11/17,5000,,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/5/23,1100,,20
7900,JAMES,CLERK,7698,1981/12/3,950,,30
7902,FORD,ANALYST,7566,1981/12/3,3000,,20
7934,MILLER,CLERK,7782,1982/1/23,1300,,10

[root@bigdata input]# hadoop jar EmployeeSort.jar /input/emp.csv /output/employeesort
......
18/11/05 21:49:13 INFO mapreduce.Job: map 0% reduce 0%
18/11/05 21:49:17 INFO mapreduce.Job: map 100% reduce 0%
18/11/05 21:49:21 INFO mapreduce.Job: map 100% reduce 100%
18/11/05 21:49:22 INFO mapreduce.Job: Job job_1541425571272_0001 completed successfully
......

[root@bigdata input]# hdfs dfs -ls /output/employeesort
Found 2 items
-rw-r--r-- 1 root supergroup 0 2018-11-05 21:49 /output/employeesort/_SUCCESS
-rw-r--r-- 1 root supergroup 180 2018-11-05 21:49 /output/employeesort/part-r-00000

[root@bigdata input]# hdfs dfs -cat /output/employeesort/part-r-00000
[KING 5000]
[FORD 3000]
[SCOTT 3000]
[JONES 2975]
[BLAKE 2850]
[CLARK 2450]
[ALLEN 1600]
[TURNER 1500]
[MILLER 1300]
[MARTIN 1250]
[WARD 1250]
[ADAMS 1100]
[JAMES 950]
[SMITH 800]

示例2:多列排序:查询员工信息,按照部门号升序、工资降序排列。

//对象类:Employee.java,实现WritableComparable接口,重写compareTo()方法
package demo.sort.mr.object;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
//实现接口:WritableComparable
public class Employee implements WritableComparable<Employee> {
  private int empno;
  private String ename;
  private String job;
  private int mgr;
  private String hiredate;
  private int sal;
  private int comm;
  private int deptno;
  @Override
  public String toString() {
    return "[" + this.ename + "\t" + this.deptno + "\t" + this.sal + "]";
  }
  @Override
  public int compareTo(Employee other) {
    // 先按部门号升序排列
    if (this.deptno > other.getDeptno()) {
      return 1;
    } else if (this.deptno < other.getDeptno()) {
      return -1;
    }
    // 部门号相等时,再按工资降序排列
    if (this.sal >= other.getSal()) {
      return -1;
    } else {
      return 1;
    }
  }
  @Override
  public void readFields(DataInput input) throws IOException {
    // 反序列化
    this.empno = input.readInt();
    this.ename = input.readUTF();
    this.job = input.readUTF();
    this.mgr = input.readInt();
    this.hiredate = input.readUTF();
    this.sal = input.readInt();
    this.comm = input.readInt();
    this.deptno = input.readInt();
  }
  @Override
  public void write(DataOutput output) throws IOException {
    // 序列化
    output.writeInt(this.empno);
    output.writeUTF(this.ename);
    output.writeUTF(this.job);
    output.writeInt(this.mgr);
    output.writeUTF(this.hiredate);
    output.writeInt(this.sal);
    output.writeInt(this.comm);
    output.writeInt(this.deptno);
  }
  public int getEmpno() {
    return empno;
  }
  public void setEmpno(int empno) {
    this.empno = empno;
  }
  public String getEname() {
    return ename;
  }
  public void setEname(String ename) {
    this.ename = ename;
  }
  public String getJob() {
    return job;
  }
  public void setJob(String job) {
    this.job = job;
  }
  public int getMgr() {
    return mgr;
  }
  public void setMgr(int mgr) {
    this.mgr = mgr;
  }
  public String getHiredate() {
    return hiredate;
  }
  public void setHiredate(String hiredate) {
    this.hiredate = hiredate;
  }
  public int getSal() {
    return sal;
  }
  public void setSal(int sal) {
    this.sal = sal;
  }
  public int getComm() {
    return comm;
  }
  public void setComm(int comm) {
    this.comm = comm;
  }
  public int getDeptno() {
    return deptno;
  }
  public void setDeptno(int deptno) {
    this.deptno = deptno;
  }
}
//Mapper文件:EmployeeSortMapper.java
package demo.sort.mr.object;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class EmployeeSortMapper extends Mapper<LongWritable, Text, Employee, NullWritable> {
  @Override
  protected void map(LongWritable key1, Text value1, Context context) 
      throws IOException, InterruptedException {
    // 把读入的数据封装成一个员工对象
    // 数据格式:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
    String str = value1.toString();
    // 分词
    String[] words = str.split(",");
    // 创建一个员工对象
    Employee e = new Employee();
    // 设置员工的属性
    e.setEmpno(Integer.parseInt(words[0]));
    e.setEname(words[1]);
    e.setJob(words[2]);
    try {
      e.setMgr(Integer.parseInt(words[3]));
    } catch (Exception ex) {
      e.setMgr(0);
    }
    e.setHiredate(words[4]);
    e.setSal(Integer.parseInt(words[5]));
    try {
      e.setComm(Integer.parseInt(words[6]));
    } catch (Exception ex) {
      e.setComm(0);
    }
    e.setDeptno(Integer.parseInt(words[7]));
    // 输出:员工对象,null
    context.write(e, NullWritable.get());
  }
}
//Job文件:EmployeeSortMain.java
package demo.sort.mr.object;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class EmployeeSortMain {
  public static void main(String[] args) throws Exception {
    // 创建Job = Mapper (没有Reducer)
    Job job = Job.getInstance(new Configuration());
    // 指定任务的入口
    job.setJarByClass(EmployeeSortMain.class);
    // 指定任务的Mapper和输出的数据类型
    job.setMapperClass(EmployeeSortMapper.class);
    job.setMapOutputKeyClass(Employee.class);
    job.setMapOutputValueClass(NullWritable.class);
    // 没有Reducer,直接指定输出的类型
    job.setOutputKeyClass(Employee.class);
    job.setOutputValueClass(NullWritable.class);
    // 指定输入和输出的路径:HDFS目录
    FileInputFormat.setInputPaths(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    // 执行任务
    job.waitForCompletion(true);
  }
}

打包并执行程序:

  1. 将demo.sort.mr.object2目录打包成employeesort2.jar,并指定主类是EmployeeSortMain.java
  2. 将employeesort2.jar上传到服务器,/root/input/employeesort2.jar
  3. 准备测试数据HDFS:/input/emp.csv
  4. 执行程序:# hadoop jar /root/input/employeesort2.jar /input/emp.csv /output/employeesort2
  5. 查看输出目录:# hdfs dfs -ls /output/employeesort2
  6. 查看结果:# hdfs dfs -cat /output/employeesort2/part-r-00000

[root@bigdata input]# hdfs dfs -cat /input/emp.csv
7369,SMITH,CLERK,7902,1980/12/17,800,,20
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,,10
7788,SCOTT,ANALYST,7566,1987/4/19,3000,,20
7839,KING,PRESIDENT,,1981/11/17,5000,,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/5/23,1100,,20
7900,JAMES,CLERK,7698,1981/12/3,950,,30
7902,FORD,ANALYST,7566,1981/12/3,3000,,20
7934,MILLER,CLERK,7782,1982/1/23,1300,,10

[root@bigdata input]# hadoop jar EmployeeSort2.jar /input/emp.csv /output/employeesort2
......
18/11/05 22:12:38 INFO mapreduce.Job: map 0% reduce 0%
18/11/05 22:12:41 INFO mapreduce.Job: map 100% reduce 0%
18/11/05 22:12:45 INFO mapreduce.Job: map 100% reduce 100%
18/11/05 22:12:47 INFO mapreduce.Job: Job job_1541425571272_0003 completed successfully
......

[root@bigdata input]# hdfs dfs -ls /output/employeesort2
Found 2 items
-rw-r--r-- 1 root supergroup 0 2018-11-05 22:12 /output/employeesort2/_SUCCESS
-rw-r--r-- 1 root supergroup 222 2018-11-05 22:12 /output/employeesort2/part-r-00000
[root@bigdata input]# hdfs dfs -cat /output/employeesort2/part-r-00000
[KING 10 5000]
[CLARK 10 2450]
[MILLER 10 1300]
[SCOTT 20 3000]
[FORD 20 3000]
[JONES 20 2975]
[ADAMS 20 1100]
[SMITH 20 800]
[BLAKE 30 2850]
[ALLEN 30 1600]
[TURNER 30 1500]
[MARTIN 30 1250]
[WARD 30 1250]
[JAMES 30 950]

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容