MapReduce怎么优雅地实现全局排序

思考

想到全局排序,是否第一想到的是,从map端收集数据,shuffle到reduce来,设置一个reduce,再对reduce中的数据排序,显然这样和单机器并没有什么区别,要知道mapreduce框架默认是对key来排序的,当然也可以将value放到key上面来达到对value排序,最后在reduce时候对调回去,另外排序是针对相同分区,即一个reduce来排序的,这样其实也不能充分运用到集群的并行,那么如何更优雅地实现全局排序呢?

摘要

hadoop中的排序分为部分排序,全局排序,辅助排序,二次排序等,本文主要介绍如何实现key全局排序,共有三种实现方式:

  1. 设置一个reduce
  2. 利用自定义partition 将数据按顺序分批次分流到多个分区
  3. 利用框架自实现TotalOrderPartitioner 分区器来实现

实现

首先准备一些输入数据:https://github.com/hulichao/bigdata-code/tree/master/data/job,如下,

/data/job/file.txt
2
32
654
32
15
756
65223

通过设置一 个reduce来实现全局排序

利用一个reduce来实现全局排序,可以说不需要做什么特别的操作,mapper,reduce,driver实现如下:

package com.hoult.mr.job;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class JobMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value,
                       Context context) throws IOException, InterruptedException {
        IntWritable intWritable = new IntWritable(Integer.parseInt(value.toString()));
        context.write(intWritable, intWritable);
    }
}
package com.hoult.mr.job;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class JobReducer  extends
        Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

    private int index = 0;//全局排序计数器
    @Override
    protected void reduce(IntWritable key, Iterable<IntWritable> values,
                          Context context) throws IOException, InterruptedException {
        for (IntWritable value : values)
            context.write(new IntWritable(++index), value);
    }
}
package com.hoult.mr.job;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class JobDriver extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("input-path output-path");
            System.exit(1);
        }

        Job job = Job.getInstance(getConf());
        job.setJarByClass(JobDriver.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(JobMapper.class);
        job.setReducerClass(JobReducer.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(NullWritable.class);
        //使用一个reduce来排序
        job.setNumReduceTasks(1);
        job.setJobName("JobDriver");
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args)throws Exception{

//        int exitCode = ToolRunner.run(new JobDriver(), args);
        int exitCode = ToolRunner.run(new JobDriver(), new String[] {"data/job/", "data/job/output"});
        System.exit(exitCode);
    }
}

//加了排序索引,最后输出一个文件,内容如下:
1   2
2   6
3   15
4   22
5   26
6   32
7   32
8   54
9   92
10  650
11  654
12  756
13  5956
14  65223

PS; 以上通过hadoop自带的ToolRunner工具来启动任务,后续代码涉及到重复的不再列出,只针对差异性的代码。

利用自定义partition 将数据按顺序分批次分流到多个分区

通过自定义分区如何保证数据的全局有序呢?我们知道key值分区,会通过默认分区函数HashPartition将不同范围的key发送到不同的reduce,所以利用这一点,这样来实现分区器,例如有数据分布在1-1亿,可以将1-1000万的数据让reduce1来跑,1000万+1-2000万的数据来让reduce2来跑。。。。最后可以对这十个文件,按顺序组合即可得到所有数据按分区有序的全局排序数据,由于数据量较小,采用分11个分区,分别是1-1000,10001-2000,。跟第一种方式实现不同的有下面两个点,

//partitionner实现
package com.hoult.mr.job;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * @author hulichao
 * @date 20-9-20
 **/
public class JobPartitioner extends Partitioner<IntWritable, IntWritable> {
    @Override
    public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
        int keyValue = Integer.parseInt(key.toString());

        for (int i = 0; i < 10; i++) {
            if (keyValue < 1000 * (i+1) && keyValue >= 1000 * (i-1)) {
                System.out.println("key:" + keyValue + ", part:" + i);
                return i;
            }
        }

        return 10;
    }
}

//driver处需要增加:
        //设置自定义分区器
        job.setPartitionerClass(JobPartitioner.class);
        
//driver处需要修改reduce数量
        job.setNumReduceTasks(10);

执行程序,结果会产生10个文件,文件内有序。

part-r-00000
part-r-00001
part-r-00002
part-r-00003
part-r-00004
part-r-00005
part-r-00006
part-r-00007
part-r-00008
part-r-00009

注意:需要注意一点,partition含有数据的分区要小于等于reduce数,否则会包Illegal partiion错误。另外缺点分区的实现如果对数据知道较少可能会导致数据倾斜和OOM问题。

利用框架自实现TotalOrderPartitioner 分区器来实现

既然想到了第二种自定义方式,其实可以解决多数倾斜问题,但是实际上,在数据分布不了解之前,对数据的分布评估,只能去试,看结果值有哪些,进而自定义分区器,这不就是取样吗,针对取样然后实现分区器这种方式,hadoop已经帮我们实现好了,并且解决了数据倾斜和OOM 问题,那就是TotalOrderPartitioner类,其类提供了数据采样器,对key值进行部分采样,然后按照采样结果寻找key值的最佳分割点,从而将key均匀分布在不同分区中。

TotalOrderPartitioner提供了三个采样器如下:

  • SplitSampler 分片采样器,从数据分片中采样数据,该采样器不适合已经排好序的数据
  • RandomSampler随机采样器,按照设置好的采样率从一个数据集中采样
  • IntervalSampler间隔采样机,以固定的间隔从分片中采样数据,对于已经排好序的数据效果非常好

采样器实现了K[] getSample(InputFormat<K,V> info, Job job) 方法,返回的是采样数组,其中InputFormat是map输入端前面的输入辅助类,根据返回的K[]的长度进而生成数组长度-1个partition,最后按照分割点范围将对应数据发送到相应分区中。

代码实现:

//mapper和driver的类型略有不同
package com.hoult.mr.job.totalsort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author hulichao
 * @date 20-9-20
 **/
public class TotalMapper extends Mapper<Text, Text, Text, IntWritable> {
    @Override
    protected void map(Text key, Text value,
                       Context context) throws IOException, InterruptedException {
        System.out.println("key:" + key.toString() + ", value:" + value.toString());
        context.write(key, new IntWritable(Integer.parseInt(key.toString())));
    }
}
package com.hoult.mr.job.totalsort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author hulichao
 * @date 20-9-20
 **/
public class TotalReducer extends Reducer<Text, IntWritable, IntWritable, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
                          Context context) throws IOException, InterruptedException {
        for (IntWritable value : values)
            context.write(value, NullWritable.get());
    }
}
//比较器
package com.hoult.mr.job.totalsort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * 自定义比较器来比较key的顺序
 * @author hulichao
 * @date 20-9-20
 **/
public class KeyComparator extends WritableComparator {
    protected KeyComparator() {
        super(Text.class, true);
    }

    @Override
    public int compare(WritableComparable w1, WritableComparable w2) {
        int num1 = Integer.valueOf(w1.toString());
        int num2 = Integer.valueOf(w2.toString());
        return num1 - num2;
    }
}
package com.hoult.mr.job.totalsort;

//driver 实现
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author hulichao
 * @date 20-9-20
 **/
public class TotalDriver extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        //设置非分区排序
        conf.set("mapreduce.totalorderpartitioner.naturalorder", "false");
        Job job = Job.getInstance(conf, "Total Driver");
        job.setJarByClass(TotalDriver.class);

        //设置读取文件的路径,都是从HDFS中读取。读取文件路径从脚本文件中传进来
        FileInputFormat.addInputPath(job,new Path(args[0]));
        //设置mapreduce程序的输出路径,MapReduce的结果都是输入到文件中
        FileOutputFormat.setOutputPath(job,new Path(args[1]));
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        //设置比较器,用于比较数据的大小,然后按顺序排序,该例子主要用于比较两个key的大小
        job.setSortComparatorClass(KeyComparator.class);
        job.setNumReduceTasks(10);//设置reduce数量

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(NullWritable.class);

        //设置保存partitions文件的路径
        TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path(args[2]));
        //key值采样,0.01是采样率,
        InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.1, 3, 100);
        //将采样数据写入到分区文件中
        InputSampler.writePartitionFile(job, sampler);

        job.setMapperClass(TotalMapper.class);
        job.setReducerClass(TotalReducer.class);
        //设置分区类。
        job.setPartitionerClass(TotalOrderPartitioner.class);
        return job.waitForCompletion(true) ? 0 : 1;
    }
    public static void main(String[] args)throws Exception{
//        int exitCode = ToolRunner.run(new TotalDriver(), new String[] {"data/job/input", "data/job/output", "data/job/partition","data/job/partitio2"});
        int exitCode = ToolRunner.run(new TotalDriver(), args);
        System.exit(exitCode);
    }
}

结果和第二种实现类似,需要注意只在集群测试时候才有效,本地测试可能会报错

2020-09-20 16:36:10,664 WARN [org.apache.hadoop.util.NativeCodeLoader] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 0
    at com.hoult.mr.job.totalsort.TotalDriver.run(TotalDriver.java:32)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90)
    at com.hoult.mr.job.totalsort.TotalDriver.main(TotalDriver.java:60)

吴邪,小三爷,混迹于后台,大数据,人工智能领域的小菜鸟。
更多请关注
<center> <img src="https://raw.githubusercontent.com/hulichao/myblog_pic/master/blog/wechat.png" width="40%" /> </center>

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342