MapReduce应用

MapReduce应用

二次排序

<div class='mdContent'>
二次排序的需求说明

在mapreduce操作时,shuffle阶段会多次根据key值排序。但是在shuffle分组后,相同key值的values序列的顺序是不确定的(如下图)。如果想要此时value值也是排序好的,这种需求就是二次排序。

1.jpg

测试的文件数据

a 1
a 5
a 7
a 9
b 3
b 8
b 10

未经过二次排序的输出结果

a   9
a   7
a   5
a   1
b   10
b   8
b   3

第一种实现思路

直接在reduce端对分组后的values进行排序。

  • reduce关键代码
        @Override
        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {

             List<Integer> valuesList = new ArrayList<Integer>();

             // 取出value
             for(IntWritable value : values) {
                 valuesList.add(value.get());
             }
             // 进行排序
             Collections.sort(valuesList);

             for(Integer value : valuesList) {
                context.write(key, new IntWritable(value));
             }

        }

  • 输出结果
a   1
a   5
a   7
a   9
b   3
b   8
b   10

很容易发现,这样把排序工作都放到reduce端完成,当values序列长度非常大的时候,会对CPU和内存造成极大的负载。

  • 注意的地方(容易被“坑”)
    在reduce端对values进行迭代的时候,不要直接直接存储value值或者key值,因为reduce方法会反复执行多次,但key和value相关的对象只有两个,reduce会反复重用这两个对象。需要用相应的数据类型.get()取出后再存储。

第二种实现思路

将map端输出的<key,value>中的key和value组合成一个新的key(称为newKey),value值不变。这里就变成<(key,value),value>,在针对newKey排序的时候,如果key相同,就再对value进行排序。

  • 需要自定义的地方
  1. 自定义数据类型实现组合key
    实现方式:继承WritableComparable
  2. 自定义partioner,形成newKey后保持分区规则任然按照key进行。保证不打乱原来的分区。
    实现方式:继承partitioner
  3. 自动以分组,保持分组规则任然按照key进行。不打乱原来的分组
    实现方式:继承RawComparator
  • 自定义数据类型关键代码
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

 public class PairWritable implements WritableComparable<PairWritable> {
    // 组合key
      private String first;
      private int second;

    public PairWritable() {
    }

    public PairWritable(String first, int second) {
        this.set(first, second);
    }

    /**
     * 方便设置字段
     */
    public void set(String first, int second) {
        this.first = first;
        this.second = second;
    }

    /**
     * 反序列化
     */
    @Override
    public void readFields(DataInput arg0) throws IOException {
        this.first = arg0.readUTF();
        this.second = arg0.readInt();
    }
    /**
     * 序列化
     */
    @Override
    public void write(DataOutput arg0) throws IOException {
        arg0.writeUTF(first);
        arg0.writeInt(second);
    }

    /*
     * 重写比较器
     */
    public int compareTo(PairWritable o) {
        int comp = this.first.compareTo(o.first);

        if(comp != 0) {
            return comp;
        } else { // 若第一个字段相等,则比较第二个字段
            return Integer.valueOf(this.second).compareTo(
                    Integer.valueOf(o.getSecond()));
        }
    }

    public int getSecond() {
        return second;
    }
    public void setSecond(int second) {
        this.second = second;
    }
    public String getFirst() {
        return first;
    }
    public void setFirst(String first) {
        this.first = first;
    }

  • 自定义分区规则
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class SecondPartitioner extends Partitioner<PairWritable, IntWritable> {

    @Override
    public int getPartition(PairWritable key, IntWritable value, int numPartitions) {
        /* 
         * 默认的实现 (key.hashCode() & Integer.MAX_VALUE) % numPartitions
         * 让key中first字段作为分区依据
         */
        return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions; 
    }
}

  • 自定义分组比较器
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;

public class SecondGroupComparator implements RawComparator<PairWritable> {

    /*
     * 对象比较
     */
    public int compare(PairWritable o1, PairWritable o2) {
        return o1.getFirst().compareTo(o2.getFirst());
    }

    /*
     * 字节比较
     * arg0,arg3为要比较的两个字节数组
     * arg1,arg2表示第一个字节数组要进行比较的收尾位置,arg4,arg5表示第二个
     * 从第一个字节比到组合key中second的前一个字节,因为second为int型,所以长度为4
     */
    public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
        return WritableComparator.compareBytes(arg0, 0, arg2-4, arg3, 0, arg5-4);
    }
}

  • map关键代码
        private PairWritable mapOutKey = new PairWritable();
        private IntWritable mapOutValue = new IntWritable();
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String lineValue = value.toString();
            String[] strs = lineValue.split("\t");

            //设置组合key和value ==> <(key,value),value>
            mapOutKey.set(strs[0], Integer.valueOf(strs[1]));
            mapOutValue.set(Integer.valueOf(strs[1]));

            context.write(mapOutKey, mapOutValue);
        }

  • reduce关键代码
        private Text outPutKey = new Text(); 
        public void reduce(PairWritable key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            //迭代输出
            for(IntWritable value : values) {
                outPutKey.set(key.getFirst());
                context.write(outPutKey, value);
            }

        }

  • 输出结果
a   1
a   5
a   7
a   9
b   3
b   8
b   10

</div>

MapReduce Join

<div class='mdContent'>
对两份数据data1和data2进行关键词连接是一个很通用的问题,如果数据量比较小,可以在内存中完成连接。

如果数据量比较大,在内存进行连接操会发生OOM。mapreduce join可以用来解决大数据的连接。

1 思路

1.1 reduce join

在map阶段, 把关键字作为key输出,并在value中标记出数据是来自data1还是data2。因为在shuffle阶段已经自然按key分组,reduce阶段,判断每一个value是来自data1还是data2,在内部分成2组,做集合的乘积。

这种方法有2个问题:

1, map阶段没有对数据瘦身,shuffle的网络传输和排序性能很低。

2, reduce端对2个集合做乘积计算,很耗内存,容易导致OOM。

1.2 map join

两份数据中,如果有一份数据比较小,小数据全部加载到内存,按关键字建立索引。大数据文件作为map的输入文件,对map()函数每一对输入,都能够方便地和已加载到内存的小数据进行连接。把连接结果按key输出,经过shuffle阶段,reduce端得到的就是已经按key分组的,并且连接好了的数据。

这种方法,要使用hadoop中的DistributedCache把小数据分布到各个计算节点,每个map节点都要把小数据库加载到内存,按关键字建立索引。

这种方法有明显的局限性:有一份数据比较小,在map端,能够把它加载到内存,并进行join操作。

1.3 使用内存服务器,扩大节点的内存空间

针对map join,可以把一份数据存放到专门的内存服务器,在map()方法中,对每一个<key,value>的输入对,根据key到内存服务器中取出数据,进行连接

1.4 使用BloomFilter过滤空连接的数据

对其中一份数据在内存中建立BloomFilter,另外一份数据在连接之前,用BloomFilter判断它的key是否存在,如果不存在,那这个记录是空连接,可以忽略。

1.5 使用mapreduce专为join设计的包

在mapreduce包里看到有专门为join设计的包,对这些包还没有学习,不知道怎么使用,只是在这里记录下来,作个提醒。

jar: mapreduce-client-core.jar

package: org.apache.hadoop.mapreduce.lib.join

2 实现map join

相对而言,map join更加普遍,下面的代码使用DistributedCache实现map join

2.1 背景

有客户数据customer和订单数据orders。

customer

客户编号 姓名 地址 电话
1 hanmeimei ShangHai 110
2 leilei BeiJing 112
3 lucy GuangZhou 119

** order**

订单编号 客户编号 其它字段被忽略
1 1 50
2 1 200
3 3 15
4 3 350
5 3 58
6 1 42
7 1 352
8 2 1135
9 2 400
10 2 2000
11 2 300

要求对customer和orders按照客户编号进行连接,结果要求对客户编号分组,对订单编号排序,对其它字段不作要求

客户编号 订单编号 订单金额 姓名 地址 电话
1 1 50 hanmeimei ShangHai 110
1 2 200 hanmeimei ShangHai 110
1 6 42 hanmeimei ShangHai 110
1 7 352 hanmeimei ShangHai 110
2 8 1135 leilei BeiJing 112
2 9 400 leilei BeiJing 112
2 10 2000 leilei BeiJing 112
2 11 300 leilei BeiJing 112
3 3 15 lucy GuangZhou 119
3 4 350 lucy GuangZhou 119
3 5 58 lucy GuangZhou 119
  1. 在提交job的时候,把小数据通过DistributedCache分发到各个节点。
  2. map端使用DistributedCache读到数据,在内存中构建映射关系--如果使用专门的内存服务器,就把数据加载到内存服务器,map()节点可以只保留一份小缓存;如果使用BloomFilter来加速,在这里就可以构建;
  3. map()函数中,对每一对<key,value>,根据key到第2)步构建的映射里面中找出数据,进行连接,输出。

2.2 程序实现

public class Join extends Configured implements Tool {
    // customer文件在hdfs上的位置。
    // TODO: 改用参数传入
    private static final String CUSTOMER_CACHE_URL = "hdfs://hadoop1:9000/user/hadoop/mapreduce/cache/customer.txt";
    private static class CustomerBean {     
        private int custId;
        private String name;
        private String address;
        private String phone;

        public CustomerBean() {}

        public CustomerBean(int custId, String name, String address,
                String phone) {
            super();
            this.custId = custId;
            this.name = name;
            this.address = address;
            this.phone = phone;
        }

        public int getCustId() {
            return custId;
        }

        public String getName() {
            return name;
        }

        public String getAddress() {
            return address;
        }

        public String getPhone() {
            return phone;
        }
    }

    private static class CustOrderMapOutKey implements WritableComparable<CustOrderMapOutKey> {
        private int custId;
        private int orderId;

        public void set(int custId, int orderId) {
            this.custId = custId;
            this.orderId = orderId;
        }

        public int getCustId() {
            return custId;
        }

        public int getOrderId() {
            return orderId;
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeInt(custId);
            out.writeInt(orderId);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            custId = in.readInt();
            orderId = in.readInt();
        }

        @Override
        public int compareTo(CustOrderMapOutKey o) {
            int res = Integer.compare(custId, o.custId);
            return res == 0 ? Integer.compare(orderId, o.orderId) : res;
        }

        @Override
        public boolean equals(Object obj) {
            if (obj instanceof CustOrderMapOutKey) {
                CustOrderMapOutKey o = (CustOrderMapOutKey)obj;
                return custId == o.custId && orderId == o.orderId;
            } else {
                return false;
            }
        }

        @Override
        public String toString() {
            return custId + "\t" + orderId;
        }
    }

    private static class JoinMapper extends Mapper<LongWritable, Text, CustOrderMapOutKey, Text> {
        private final CustOrderMapOutKey outputKey = new CustOrderMapOutKey();
        private final Text outputValue = new Text();

        /**
         * 在内存中customer数据
         */
        private static final Map<Integer, CustomerBean> CUSTOMER_MAP = new HashMap<Integer, Join.CustomerBean>();
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            // 格式: 订单编号 客户编号    订单金额
            String[] cols = value.toString().split("\t");           
            if (cols.length < 3) {
                return;
            }

            int custId = Integer.parseInt(cols[1]);     // 取出客户编号
            CustomerBean customerBean = CUSTOMER_MAP.get(custId);

            if (customerBean == null) {         // 没有对应的customer信息可以连接
                return;
            }

            StringBuffer sb = new StringBuffer();
            sb.append(cols[2])
                .append("\t")
                .append(customerBean.getName())
                .append("\t")
                .append(customerBean.getAddress())
                .append("\t")
                .append(customerBean.getPhone());

            outputValue.set(sb.toString());
            outputKey.set(custId, Integer.parseInt(cols[0]));

            context.write(outputKey, outputValue);
        }

        @Override
        protected void setup(Context context)
                throws IOException, InterruptedException {
            FileSystem fs = FileSystem.get(URI.create(CUSTOMER_CACHE_URL), context.getConfiguration());
            FSDataInputStream fdis = fs.open(new Path(CUSTOMER_CACHE_URL));

            BufferedReader reader = new BufferedReader(new InputStreamReader(fdis));
            String line = null;
            String[] cols = null;

            // 格式:客户编号  姓名  地址  电话
            while ((line = reader.readLine()) != null) {
                cols = line.split("\t");
                if (cols.length < 4) {              // 数据格式不匹配,忽略
                    continue;
                }

                CustomerBean bean = new CustomerBean(Integer.parseInt(cols[0]), cols[1], cols[2], cols[3]);
                CUSTOMER_MAP.put(bean.getCustId(), bean);
            }
        }
    }

    /**
     * reduce
     * @author Ivan
     *
     */
    private static class JoinReducer extends Reducer<CustOrderMapOutKey, Text, CustOrderMapOutKey, Text> {
        @Override
        protected void reduce(CustOrderMapOutKey key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            // 什么事都不用做,直接输出
            for (Text value : values) {
                context.write(key, value);
            }
        }
    }
    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        if (args.length < 2) {
            new IllegalArgumentException("Usage: <inpath> <outpath>");
            return;
        }

        ToolRunner.run(new Configuration(), new Join(), args);
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        Job job = Job.getInstance(conf, Join.class.getSimpleName());
        job.setJarByClass(SecondarySortMapReduce.class);

        // 添加customer cache文件
        job.addCacheFile(URI.create(CUSTOMER_CACHE_URL));

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // map settings
        job.setMapperClass(JoinMapper.class);
        job.setMapOutputKeyClass(CustOrderMapOutKey.class);
        job.setMapOutputValueClass(Text.class);

        // reduce settings
        job.setReducerClass(JoinReducer.class);
        job.setOutputKeyClass(CustOrderMapOutKey.class);
        job.setOutputKeyClass(Text.class);

        boolean res = job.waitForCompletion(true);

        return res ? 0 : 1;
    }
}

运行环境

  • 操作系统: Centos 6.4
  • Hadoop: Apache Hadoop-2.5.0

==客户数据文件在hdfs上的位置硬编码为==
hdfs://hadoop1:9000/user/hadoop/mapreduce/cache/customer.txt, 运行程序之前先把客户数据上传到这个位置。

  • 程序运行结果
##

</div>

MapReduce自定义分组Group

<div class='mdContent'>
一:背景

在上一篇文章中我们可以对两列数据进行排序,即完成了当第一列相同时第二列数据升序排列的功能,现在我们需要进一步完善一个功能,那就是当第一列相同时求出第二列的最小值或最大值,Hadoop提供了自定义分组的功能,可以满足我们的需求。

二:技术实现

我们先来看看需求

当第一列不相等时,第一列按升序排列,当第一列相等时,求出对应第二列的最小值

3 3
3 2
3 1
2 2
2 1
1 1

输出结果应该是:
1 1
2 1
3 1

实现:

(1).自定义分组比较器继承RawComparator,实现compare()方法。

(2).在设置作业是设置job.setGroupingComparatorClass()。

代码如下:

public class MyGroupTest {
// 定义输入路径
private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/data";
// 定义输出路径
private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";

public static void main(String[] args) {

    try {
        // 创建配置信息
        Configuration conf = new Configuration();


        // 创建文件系统
        FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
        // 如果输出目录存在,我们就删除
        if (fileSystem.exists(new Path(OUT_PATH))) {
            fileSystem.delete(new Path(OUT_PATH), true);
        }

        // 创建任务
        Job job = new Job(conf, MyGroupTest.class.getName());

        // 天龙八部1.1 设置输入目录和设置输入数据格式化的类
        FileInputFormat.setInputPaths(job, INPUT_PATH);
        job.setInputFormatClass(TextInputFormat.class);

        //1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型
        job.setMapperClass(MyGroupMapper.class);
        job.setMapOutputKeyClass(CombineKey.class);
        job.setMapOutputValueClass(LongWritable.class);
        
        //一定不要忘记设置自定义分组比较器的类(这一步是关键)
        job.setGroupingComparatorClass(MyGroupComparator.class);
        
        //1.3 设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)
        job.setPartitionerClass(HashPartitioner.class);
        job.setNumReduceTasks(1);

        //1.4 排序、分组
        //1.5 归约
        //2.1 Shuffle把数据从Map端拷贝到Reduce端。
        //2.2 指定Reducer类和输出key和value的类型
        job.setReducerClass(MyGroupReducer.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(LongWritable.class);

        //2.3 指定输出的路径和设置输出的格式化类
        FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
        job.setOutputFormatClass(TextOutputFormat.class);

        // 提交作业 退出
        System.exit(job.waitForCompletion(true) ? 0 : 1);

    } catch (Exception e) {
        e.printStackTrace();
    }
}

public static class MyGroupMapper extends Mapper<LongWritable, Text, CombineKey, LongWritable> {
    // 创建联合的key
    private CombineKey combineKey = new CombineKey();

    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CombineKey, LongWritable>.Context context) throws IOException,
            InterruptedException {
        // 对输入value进行分割
        String[] splits = value.toString().split("\t");
        // 设置联合的Key
        combineKey.setComKey(Long.parseLong(splits[0]));
        combineKey.setComVal(Long.parseLong(splits[1]));

        // 写出去
        context.write(combineKey, new LongWritable(Long.parseLong(splits[1])));
    }
}

public static class MyGroupReducer extends Reducer<CombineKey, LongWritable, LongWritable, LongWritable> {
    @Override
    protected void reduce(CombineKey combineKey, Iterable<LongWritable> values,
            Reducer<CombineKey, LongWritable, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException {

        long min = Long.MAX_VALUE;
        // 遍历比较求出每个组中的最小值
        for (LongWritable val : values) {

            if (val.get() < min) {
                min = val.get();
            }
        }

        // 把原始数据中的第一列中的元素分组后的组号作为key,所求的最小值为value将结果写出去
        context.write(new LongWritable(combineKey.getComKey()), new LongWritable(min));
    }
}

}

/**

  • 二次排序构造一个新的Key

*/
class CombineKey implements WritableComparable<CombineKey> {

private Long comKey;
private Long comVal;

// 无参构造函数必须提供,否则Hadoop的反射机制会报错
public CombineKey() {
}

// 有参构造函数
public CombineKey(Long comKey, Long comVal) {
    this.comKey = comKey;
    this.comVal = comVal;
}

public Long getComKey() {
    return comKey;
}

public void setComKey(Long comKey) {
    this.comKey = comKey;
}

public Long getComVal() {
    return comVal;
}

public void setComVal(Long comVal) {
    this.comVal = comVal;
}

public void write(DataOutput out) throws IOException {
    out.writeLong(this.comKey);
    out.writeLong(this.comVal);
}

public void readFields(DataInput in) throws IOException {
    this.comKey = in.readLong();
    this.comVal = in.readLong();
}

/**
 * 第一列按升序排列,第一列相同时,第二列也按升序排列
 */
public int compareTo(CombineKey o) {
    long minus = this.comKey - o.comVal;
    if (minus != 0) {
        return (int) minus;
    }

    return (int) (this.comVal - o.comVal);
}

}

/**

  • 自定义分组比较器
    */
    class MyGroupComparator implements RawComparator<CombineKey> {

    // 分组策略中,这个方法不是重点
    public int compare(CombineKey o1, CombineKey o2) {
    // TODO Auto-generated method stub
    return 0;
    }

    /**

    • b1 表示第一个参与比较的字节数组

    • s1 表示第一个字节数组中开始比较的位置

    • l1 表示第一个字节数组中参与比较的字节长度

    • b2 表示第二个参与比较的字节数组

    • s2 表示第二个字节数组中开始比较的位置

    • l2 表示第二个字节数组参与比较的字节长度
      */
      public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

      // 这里是按第CombineKey中的第一个元素进行分组,因为是long类型,所以是8个字节
      return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);
      }

}

</div>

MapReduce自定义输入输出

自定义输入:

其中 :reader.readLine(tmp); 是读取下一行到tmp中
map的默认输入key是行的偏移值 value是每一行的数据
相对map的输入key value 以及读哪些文件我们都可以灵活控制 :

输入的格式是有FileInputFormat控制的 而对格式的控制是有RecordReader做到的 所以 要想控制输入格式 首先重写FileInputFormat的RecordReader 方法,在重写的RecordReader 中new一个新类(继承FileInputFormat 实现五个方法),达到控制

上代码:

//1.继承FileInputFormat 重写RecordReader  输入输出为map输入输出
public class AuthReader extends FileInputFormat<Text,Text>{
    @Override
    public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        return new InputFormat(); //new的新类
    }
}

//2.创建新类 继承RecordReader  输入输出为map输入输出
public class InputFormat extends RecordReader<Text,Text>{
    private FileSplit fs ;
    private Text key;
    private Text value;
    private LineReader reader;

    private String fileName;

    //初始化方法
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        fs = (FileSplit) split;
        fileName = fs.getPath().getName();
        Path path = fs.getPath();
        Configuration conf = new Configuration();
        //获取文件系统
        FileSystem system = path.getFileSystem(conf);
        FSDataInputStream in = system.open(path);
        reader = new LineReader(in);
    }

     //知识点1:这个方法会被调用多次   这个方法的返回值如果是true就会被调用一次
     // 知识点2:每当nextKeyValue被调用一次 ,getCurrentKey,getCurrentValue也会被跟着调用一次
     //知识点3:getCurrentKey,getCurrentValue给Map传key,value
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        //可以定义哪些文件不处理
        if(!fileName.startsWith("wo"))return false;
        Text tmp = new Text();
        int length = reader.readLine(tmp);
        if(length==0){
            return false;
        }else{
            value=new Text(tmp+"何睿");
            key = new Text("我是雷神托尔");
            return true;
        }

    }

    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    @Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }

    @Override
    public void close() throws IOException {
        if(reader!=null){
            reader.close();
        }
    }
}

最后 在Driver中

          //自定义输入
        job.setInputFormatClass(AuthReader.class);

自定义输出:

//writer
public class AuthWriter<K,V> extends FileOutputFormat<K,V>{
    @Override
    public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        Path path=super.getDefaultWorkFile(job, "");
        Configuration conf=job.getConfiguration();
        FileSystem fs=path.getFileSystem(conf);
        FSDataOutputStream out=fs.create(path);
        //新类 的键值分割符      行分割符
        return new NOutputFormat<K,V>(out,"#|#","\r\n");
    }

//实现类
public class NOutputFormat<K,V> extends RecordWriter<K,V>{
    private FSDataOutputStream out;
    private String keyValueSeparator;//键值分隔符
    private String lineSeparator; //行与行分隔符

    public NOutputFormat(FSDataOutputStream out,String keyValueSeparator,String lineSeparator){
        this.out=out;
        this.keyValueSeparator=keyValueSeparator;
        this.lineSeparator=lineSeparator;
    }

    @Override
    public void write(K key, V value) throws IOException, InterruptedException {
        out.write(key.toString().getBytes());//key
        out.write(keyValueSeparator.getBytes());//键值对分隔符
        out.write(value.toString().getBytes());//vale
        out.write(lineSeparator.getBytes());//行与行分隔符
    }

    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        if(out!=null)out.close();
    }
}

//在Driver中 
        //自定义输出
        job.setOutputFormatClass(AuthWriter.class);

多输入源 一个job执行

在Driver中

//对A目录 用A Mapper  A Reduce 执行
MultipleInputs.addInputPath(job, new Path("hdfs://xxx:9000/formatscore/format
score.txt"),AuthInputFormat.class,ScoreMapper.class);

//对B目录 用B Mapper  B Reduce 执行
MultipleInputs.addInputPath(job, new Path("hdfs://xxx:9000/formatscore/format
score-1.txt"),TextInputFormat.class,ScoreMapper2.class);

</div>

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352

推荐阅读更多精彩内容

  • 一.简述如何安装配置apache 的一个开源的hadoop 1.使用root账户登陆 2.修改ip 3.修改hos...
    栀子花_ef39阅读 4,944评论 0 52
  • 关于Mongodb的全面总结 MongoDB的内部构造《MongoDB The Definitive Guide》...
    中v中阅读 31,928评论 2 89
  • 三月,初春时节。花儿们还没有开始争奇斗艳,作为哺乳动物的人们也裹着冬衣在大地上行走。在罗江只可以看到一抹抹盎然...
    嘻哈Ray阅读 141评论 0 1
  • 乾卦第一 坤卦第二 乾下乾上 坤下坤上 乾:元、亨、利、贞。 坤:元、享,利牝马之贞。 君子有攸往,先迷,后得,主...
    香香纯妹子浅浅阅读 118评论 0 0
  • 今天和你相见我面带微笑我的手和你的手相握我用我的温暖接近你请你收下我的关怀和敬意 各自的手刚才还在自己的袖管里沉默...
    王春涞阅读 220评论 1 7