MapReduce应用
二次排序
<div class='mdContent'>
二次排序的需求说明
在mapreduce操作时,shuffle阶段会多次根据key值排序。但是在shuffle分组后,相同key值的values序列的顺序是不确定的(如下图)。如果想要此时value值也是排序好的,这种需求就是二次排序。
测试的文件数据
a 1
a 5
a 7
a 9
b 3
b 8
b 10
未经过二次排序的输出结果
a 9
a 7
a 5
a 1
b 10
b 8
b 3
第一种实现思路
直接在reduce端对分组后的values进行排序。
- reduce关键代码
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
List<Integer> valuesList = new ArrayList<Integer>();
// 取出value
for(IntWritable value : values) {
valuesList.add(value.get());
}
// 进行排序
Collections.sort(valuesList);
for(Integer value : valuesList) {
context.write(key, new IntWritable(value));
}
}
- 输出结果
a 1
a 5
a 7
a 9
b 3
b 8
b 10
很容易发现,这样把排序工作都放到reduce端完成,当values序列长度非常大的时候,会对CPU和内存造成极大的负载。
- 注意的地方(容易被“坑”)
在reduce端对values进行迭代的时候,不要直接直接存储value值或者key值,因为reduce方法会反复执行多次,但key和value相关的对象只有两个,reduce会反复重用这两个对象。需要用相应的数据类型.get()取出后再存储。
第二种实现思路
将map端输出的<key,value>中的key和value组合成一个新的key(称为newKey),value值不变。这里就变成<(key,value),value>,在针对newKey排序的时候,如果key相同,就再对value进行排序。
- 需要自定义的地方
- 自定义数据类型实现组合key
实现方式:继承WritableComparable - 自定义partioner,形成newKey后保持分区规则任然按照key进行。保证不打乱原来的分区。
实现方式:继承partitioner - 自动以分组,保持分组规则任然按照key进行。不打乱原来的分组
实现方式:继承RawComparator
- 自定义数据类型关键代码
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class PairWritable implements WritableComparable<PairWritable> {
// 组合key
private String first;
private int second;
public PairWritable() {
}
public PairWritable(String first, int second) {
this.set(first, second);
}
/**
* 方便设置字段
*/
public void set(String first, int second) {
this.first = first;
this.second = second;
}
/**
* 反序列化
*/
@Override
public void readFields(DataInput arg0) throws IOException {
this.first = arg0.readUTF();
this.second = arg0.readInt();
}
/**
* 序列化
*/
@Override
public void write(DataOutput arg0) throws IOException {
arg0.writeUTF(first);
arg0.writeInt(second);
}
/*
* 重写比较器
*/
public int compareTo(PairWritable o) {
int comp = this.first.compareTo(o.first);
if(comp != 0) {
return comp;
} else { // 若第一个字段相等,则比较第二个字段
return Integer.valueOf(this.second).compareTo(
Integer.valueOf(o.getSecond()));
}
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
public String getFirst() {
return first;
}
public void setFirst(String first) {
this.first = first;
}
- 自定义分区规则
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class SecondPartitioner extends Partitioner<PairWritable, IntWritable> {
@Override
public int getPartition(PairWritable key, IntWritable value, int numPartitions) {
/*
* 默认的实现 (key.hashCode() & Integer.MAX_VALUE) % numPartitions
* 让key中first字段作为分区依据
*/
return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
- 自定义分组比较器
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;
public class SecondGroupComparator implements RawComparator<PairWritable> {
/*
* 对象比较
*/
public int compare(PairWritable o1, PairWritable o2) {
return o1.getFirst().compareTo(o2.getFirst());
}
/*
* 字节比较
* arg0,arg3为要比较的两个字节数组
* arg1,arg2表示第一个字节数组要进行比较的收尾位置,arg4,arg5表示第二个
* 从第一个字节比到组合key中second的前一个字节,因为second为int型,所以长度为4
*/
public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3, int arg4, int arg5) {
return WritableComparator.compareBytes(arg0, 0, arg2-4, arg3, 0, arg5-4);
}
}
- map关键代码
private PairWritable mapOutKey = new PairWritable();
private IntWritable mapOutValue = new IntWritable();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String lineValue = value.toString();
String[] strs = lineValue.split("\t");
//设置组合key和value ==> <(key,value),value>
mapOutKey.set(strs[0], Integer.valueOf(strs[1]));
mapOutValue.set(Integer.valueOf(strs[1]));
context.write(mapOutKey, mapOutValue);
}
- reduce关键代码
private Text outPutKey = new Text();
public void reduce(PairWritable key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
//迭代输出
for(IntWritable value : values) {
outPutKey.set(key.getFirst());
context.write(outPutKey, value);
}
}
- 输出结果
a 1
a 5
a 7
a 9
b 3
b 8
b 10
</div>
MapReduce Join
<div class='mdContent'>
对两份数据data1和data2进行关键词连接是一个很通用的问题,如果数据量比较小,可以在内存中完成连接。
如果数据量比较大,在内存进行连接操会发生OOM。mapreduce join可以用来解决大数据的连接。
1 思路
1.1 reduce join
在map阶段, 把关键字作为key输出,并在value中标记出数据是来自data1还是data2。因为在shuffle阶段已经自然按key分组,reduce阶段,判断每一个value是来自data1还是data2,在内部分成2组,做集合的乘积。
这种方法有2个问题:
1, map阶段没有对数据瘦身,shuffle的网络传输和排序性能很低。
2, reduce端对2个集合做乘积计算,很耗内存,容易导致OOM。
1.2 map join
两份数据中,如果有一份数据比较小,小数据全部加载到内存,按关键字建立索引。大数据文件作为map的输入文件,对map()函数每一对输入,都能够方便地和已加载到内存的小数据进行连接。把连接结果按key输出,经过shuffle阶段,reduce端得到的就是已经按key分组的,并且连接好了的数据。
这种方法,要使用hadoop中的DistributedCache把小数据分布到各个计算节点,每个map节点都要把小数据库加载到内存,按关键字建立索引。
这种方法有明显的局限性:有一份数据比较小,在map端,能够把它加载到内存,并进行join操作。
1.3 使用内存服务器,扩大节点的内存空间
针对map join,可以把一份数据存放到专门的内存服务器,在map()方法中,对每一个<key,value>的输入对,根据key到内存服务器中取出数据,进行连接
1.4 使用BloomFilter过滤空连接的数据
对其中一份数据在内存中建立BloomFilter,另外一份数据在连接之前,用BloomFilter判断它的key是否存在,如果不存在,那这个记录是空连接,可以忽略。
1.5 使用mapreduce专为join设计的包
在mapreduce包里看到有专门为join设计的包,对这些包还没有学习,不知道怎么使用,只是在这里记录下来,作个提醒。
jar: mapreduce-client-core.jar
package: org.apache.hadoop.mapreduce.lib.join
2 实现map join
相对而言,map join更加普遍,下面的代码使用DistributedCache实现map join
2.1 背景
有客户数据customer和订单数据orders。
customer
客户编号 | 姓名 | 地址 | 电话 |
---|---|---|---|
1 | hanmeimei | ShangHai | 110 |
2 | leilei | BeiJing | 112 |
3 | lucy | GuangZhou | 119 |
** order**
订单编号 | 客户编号 | 其它字段被忽略 |
---|---|---|
1 | 1 | 50 |
2 | 1 | 200 |
3 | 3 | 15 |
4 | 3 | 350 |
5 | 3 | 58 |
6 | 1 | 42 |
7 | 1 | 352 |
8 | 2 | 1135 |
9 | 2 | 400 |
10 | 2 | 2000 |
11 | 2 | 300 |
要求对customer和orders按照客户编号进行连接,结果要求对客户编号分组,对订单编号排序,对其它字段不作要求
客户编号 | 订单编号 | 订单金额 | 姓名 | 地址 | 电话 |
---|---|---|---|---|---|
1 | 1 | 50 | hanmeimei | ShangHai | 110 |
1 | 2 | 200 | hanmeimei | ShangHai | 110 |
1 | 6 | 42 | hanmeimei | ShangHai | 110 |
1 | 7 | 352 | hanmeimei | ShangHai | 110 |
2 | 8 | 1135 | leilei | BeiJing | 112 |
2 | 9 | 400 | leilei | BeiJing | 112 |
2 | 10 | 2000 | leilei | BeiJing | 112 |
2 | 11 | 300 | leilei | BeiJing | 112 |
3 | 3 | 15 | lucy | GuangZhou | 119 |
3 | 4 | 350 | lucy | GuangZhou | 119 |
3 | 5 | 58 | lucy | GuangZhou | 119 |
- 在提交job的时候,把小数据通过DistributedCache分发到各个节点。
- map端使用DistributedCache读到数据,在内存中构建映射关系--如果使用专门的内存服务器,就把数据加载到内存服务器,map()节点可以只保留一份小缓存;如果使用BloomFilter来加速,在这里就可以构建;
- map()函数中,对每一对<key,value>,根据key到第2)步构建的映射里面中找出数据,进行连接,输出。
2.2 程序实现
public class Join extends Configured implements Tool {
// customer文件在hdfs上的位置。
// TODO: 改用参数传入
private static final String CUSTOMER_CACHE_URL = "hdfs://hadoop1:9000/user/hadoop/mapreduce/cache/customer.txt";
private static class CustomerBean {
private int custId;
private String name;
private String address;
private String phone;
public CustomerBean() {}
public CustomerBean(int custId, String name, String address,
String phone) {
super();
this.custId = custId;
this.name = name;
this.address = address;
this.phone = phone;
}
public int getCustId() {
return custId;
}
public String getName() {
return name;
}
public String getAddress() {
return address;
}
public String getPhone() {
return phone;
}
}
private static class CustOrderMapOutKey implements WritableComparable<CustOrderMapOutKey> {
private int custId;
private int orderId;
public void set(int custId, int orderId) {
this.custId = custId;
this.orderId = orderId;
}
public int getCustId() {
return custId;
}
public int getOrderId() {
return orderId;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(custId);
out.writeInt(orderId);
}
@Override
public void readFields(DataInput in) throws IOException {
custId = in.readInt();
orderId = in.readInt();
}
@Override
public int compareTo(CustOrderMapOutKey o) {
int res = Integer.compare(custId, o.custId);
return res == 0 ? Integer.compare(orderId, o.orderId) : res;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof CustOrderMapOutKey) {
CustOrderMapOutKey o = (CustOrderMapOutKey)obj;
return custId == o.custId && orderId == o.orderId;
} else {
return false;
}
}
@Override
public String toString() {
return custId + "\t" + orderId;
}
}
private static class JoinMapper extends Mapper<LongWritable, Text, CustOrderMapOutKey, Text> {
private final CustOrderMapOutKey outputKey = new CustOrderMapOutKey();
private final Text outputValue = new Text();
/**
* 在内存中customer数据
*/
private static final Map<Integer, CustomerBean> CUSTOMER_MAP = new HashMap<Integer, Join.CustomerBean>();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 格式: 订单编号 客户编号 订单金额
String[] cols = value.toString().split("\t");
if (cols.length < 3) {
return;
}
int custId = Integer.parseInt(cols[1]); // 取出客户编号
CustomerBean customerBean = CUSTOMER_MAP.get(custId);
if (customerBean == null) { // 没有对应的customer信息可以连接
return;
}
StringBuffer sb = new StringBuffer();
sb.append(cols[2])
.append("\t")
.append(customerBean.getName())
.append("\t")
.append(customerBean.getAddress())
.append("\t")
.append(customerBean.getPhone());
outputValue.set(sb.toString());
outputKey.set(custId, Integer.parseInt(cols[0]));
context.write(outputKey, outputValue);
}
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
FileSystem fs = FileSystem.get(URI.create(CUSTOMER_CACHE_URL), context.getConfiguration());
FSDataInputStream fdis = fs.open(new Path(CUSTOMER_CACHE_URL));
BufferedReader reader = new BufferedReader(new InputStreamReader(fdis));
String line = null;
String[] cols = null;
// 格式:客户编号 姓名 地址 电话
while ((line = reader.readLine()) != null) {
cols = line.split("\t");
if (cols.length < 4) { // 数据格式不匹配,忽略
continue;
}
CustomerBean bean = new CustomerBean(Integer.parseInt(cols[0]), cols[1], cols[2], cols[3]);
CUSTOMER_MAP.put(bean.getCustId(), bean);
}
}
}
/**
* reduce
* @author Ivan
*
*/
private static class JoinReducer extends Reducer<CustOrderMapOutKey, Text, CustOrderMapOutKey, Text> {
@Override
protected void reduce(CustOrderMapOutKey key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// 什么事都不用做,直接输出
for (Text value : values) {
context.write(key, value);
}
}
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
if (args.length < 2) {
new IllegalArgumentException("Usage: <inpath> <outpath>");
return;
}
ToolRunner.run(new Configuration(), new Join(), args);
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = Job.getInstance(conf, Join.class.getSimpleName());
job.setJarByClass(SecondarySortMapReduce.class);
// 添加customer cache文件
job.addCacheFile(URI.create(CUSTOMER_CACHE_URL));
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// map settings
job.setMapperClass(JoinMapper.class);
job.setMapOutputKeyClass(CustOrderMapOutKey.class);
job.setMapOutputValueClass(Text.class);
// reduce settings
job.setReducerClass(JoinReducer.class);
job.setOutputKeyClass(CustOrderMapOutKey.class);
job.setOutputKeyClass(Text.class);
boolean res = job.waitForCompletion(true);
return res ? 0 : 1;
}
}
运行环境
- 操作系统: Centos 6.4
- Hadoop: Apache Hadoop-2.5.0
==客户数据文件在hdfs上的位置硬编码为==
hdfs://hadoop1:9000/user/hadoop/mapreduce/cache/customer.txt, 运行程序之前先把客户数据上传到这个位置。
- 程序运行结果
</div>
MapReduce自定义分组Group
<div class='mdContent'>
一:背景
在上一篇文章中我们可以对两列数据进行排序,即完成了当第一列相同时第二列数据升序排列的功能,现在我们需要进一步完善一个功能,那就是当第一列相同时求出第二列的最小值或最大值,Hadoop提供了自定义分组的功能,可以满足我们的需求。
二:技术实现
我们先来看看需求
当第一列不相等时,第一列按升序排列,当第一列相等时,求出对应第二列的最小值
3 3
3 2
3 1
2 2
2 1
1 1
输出结果应该是:
1 1
2 1
3 1
实现:
(1).自定义分组比较器继承RawComparator,实现compare()方法。
(2).在设置作业是设置job.setGroupingComparatorClass()。
代码如下:
public class MyGroupTest {
// 定义输入路径
private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/data";
// 定义输出路径
private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";
public static void main(String[] args) {
try {
// 创建配置信息
Configuration conf = new Configuration();
// 创建文件系统
FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
// 如果输出目录存在,我们就删除
if (fileSystem.exists(new Path(OUT_PATH))) {
fileSystem.delete(new Path(OUT_PATH), true);
}
// 创建任务
Job job = new Job(conf, MyGroupTest.class.getName());
// 天龙八部1.1 设置输入目录和设置输入数据格式化的类
FileInputFormat.setInputPaths(job, INPUT_PATH);
job.setInputFormatClass(TextInputFormat.class);
//1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型
job.setMapperClass(MyGroupMapper.class);
job.setMapOutputKeyClass(CombineKey.class);
job.setMapOutputValueClass(LongWritable.class);
//一定不要忘记设置自定义分组比较器的类(这一步是关键)
job.setGroupingComparatorClass(MyGroupComparator.class);
//1.3 设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);
//1.4 排序、分组
//1.5 归约
//2.1 Shuffle把数据从Map端拷贝到Reduce端。
//2.2 指定Reducer类和输出key和value的类型
job.setReducerClass(MyGroupReducer.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);
//2.3 指定输出的路径和设置输出的格式化类
FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
job.setOutputFormatClass(TextOutputFormat.class);
// 提交作业 退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
public static class MyGroupMapper extends Mapper<LongWritable, Text, CombineKey, LongWritable> {
// 创建联合的key
private CombineKey combineKey = new CombineKey();
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CombineKey, LongWritable>.Context context) throws IOException,
InterruptedException {
// 对输入value进行分割
String[] splits = value.toString().split("\t");
// 设置联合的Key
combineKey.setComKey(Long.parseLong(splits[0]));
combineKey.setComVal(Long.parseLong(splits[1]));
// 写出去
context.write(combineKey, new LongWritable(Long.parseLong(splits[1])));
}
}
public static class MyGroupReducer extends Reducer<CombineKey, LongWritable, LongWritable, LongWritable> {
@Override
protected void reduce(CombineKey combineKey, Iterable<LongWritable> values,
Reducer<CombineKey, LongWritable, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException {
long min = Long.MAX_VALUE;
// 遍历比较求出每个组中的最小值
for (LongWritable val : values) {
if (val.get() < min) {
min = val.get();
}
}
// 把原始数据中的第一列中的元素分组后的组号作为key,所求的最小值为value将结果写出去
context.write(new LongWritable(combineKey.getComKey()), new LongWritable(min));
}
}
}
/**
- 二次排序构造一个新的Key
*/
class CombineKey implements WritableComparable<CombineKey> {
private Long comKey;
private Long comVal;
// 无参构造函数必须提供,否则Hadoop的反射机制会报错
public CombineKey() {
}
// 有参构造函数
public CombineKey(Long comKey, Long comVal) {
this.comKey = comKey;
this.comVal = comVal;
}
public Long getComKey() {
return comKey;
}
public void setComKey(Long comKey) {
this.comKey = comKey;
}
public Long getComVal() {
return comVal;
}
public void setComVal(Long comVal) {
this.comVal = comVal;
}
public void write(DataOutput out) throws IOException {
out.writeLong(this.comKey);
out.writeLong(this.comVal);
}
public void readFields(DataInput in) throws IOException {
this.comKey = in.readLong();
this.comVal = in.readLong();
}
/**
* 第一列按升序排列,第一列相同时,第二列也按升序排列
*/
public int compareTo(CombineKey o) {
long minus = this.comKey - o.comVal;
if (minus != 0) {
return (int) minus;
}
return (int) (this.comVal - o.comVal);
}
}
/**
-
自定义分组比较器
*/
class MyGroupComparator implements RawComparator<CombineKey> {// 分组策略中,这个方法不是重点
public int compare(CombineKey o1, CombineKey o2) {
// TODO Auto-generated method stub
return 0;
}/**
b1 表示第一个参与比较的字节数组
s1 表示第一个字节数组中开始比较的位置
l1 表示第一个字节数组中参与比较的字节长度
b2 表示第二个参与比较的字节数组
s2 表示第二个字节数组中开始比较的位置
-
l2 表示第二个字节数组参与比较的字节长度
*/
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {// 这里是按第CombineKey中的第一个元素进行分组,因为是long类型,所以是8个字节
return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);
}
}
</div>
MapReduce自定义输入输出
自定义输入:
其中 :reader.readLine(tmp); 是读取下一行到tmp中
map的默认输入key是行的偏移值 value是每一行的数据
相对map的输入key value 以及读哪些文件我们都可以灵活控制 :
输入的格式是有FileInputFormat控制的 而对格式的控制是有RecordReader做到的 所以 要想控制输入格式 首先重写FileInputFormat的RecordReader 方法,在重写的RecordReader 中new一个新类(继承FileInputFormat 实现五个方法),达到控制
上代码:
//1.继承FileInputFormat 重写RecordReader 输入输出为map输入输出
public class AuthReader extends FileInputFormat<Text,Text>{
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
return new InputFormat(); //new的新类
}
}
//2.创建新类 继承RecordReader 输入输出为map输入输出
public class InputFormat extends RecordReader<Text,Text>{
private FileSplit fs ;
private Text key;
private Text value;
private LineReader reader;
private String fileName;
//初始化方法
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
fs = (FileSplit) split;
fileName = fs.getPath().getName();
Path path = fs.getPath();
Configuration conf = new Configuration();
//获取文件系统
FileSystem system = path.getFileSystem(conf);
FSDataInputStream in = system.open(path);
reader = new LineReader(in);
}
//知识点1:这个方法会被调用多次 这个方法的返回值如果是true就会被调用一次
// 知识点2:每当nextKeyValue被调用一次 ,getCurrentKey,getCurrentValue也会被跟着调用一次
//知识点3:getCurrentKey,getCurrentValue给Map传key,value
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
//可以定义哪些文件不处理
if(!fileName.startsWith("wo"))return false;
Text tmp = new Text();
int length = reader.readLine(tmp);
if(length==0){
return false;
}else{
value=new Text(tmp+"何睿");
key = new Text("我是雷神托尔");
return true;
}
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
@Override
public void close() throws IOException {
if(reader!=null){
reader.close();
}
}
}
最后 在Driver中
//自定义输入
job.setInputFormatClass(AuthReader.class);
自定义输出:
//writer
public class AuthWriter<K,V> extends FileOutputFormat<K,V>{
@Override
public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
Path path=super.getDefaultWorkFile(job, "");
Configuration conf=job.getConfiguration();
FileSystem fs=path.getFileSystem(conf);
FSDataOutputStream out=fs.create(path);
//新类 的键值分割符 行分割符
return new NOutputFormat<K,V>(out,"#|#","\r\n");
}
//实现类
public class NOutputFormat<K,V> extends RecordWriter<K,V>{
private FSDataOutputStream out;
private String keyValueSeparator;//键值分隔符
private String lineSeparator; //行与行分隔符
public NOutputFormat(FSDataOutputStream out,String keyValueSeparator,String lineSeparator){
this.out=out;
this.keyValueSeparator=keyValueSeparator;
this.lineSeparator=lineSeparator;
}
@Override
public void write(K key, V value) throws IOException, InterruptedException {
out.write(key.toString().getBytes());//key
out.write(keyValueSeparator.getBytes());//键值对分隔符
out.write(value.toString().getBytes());//vale
out.write(lineSeparator.getBytes());//行与行分隔符
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
if(out!=null)out.close();
}
}
//在Driver中
//自定义输出
job.setOutputFormatClass(AuthWriter.class);
多输入源 一个job执行
在Driver中
//对A目录 用A Mapper A Reduce 执行
MultipleInputs.addInputPath(job, new Path("hdfs://xxx:9000/formatscore/format
score.txt"),AuthInputFormat.class,ScoreMapper.class);
//对B目录 用B Mapper B Reduce 执行
MultipleInputs.addInputPath(job, new Path("hdfs://xxx:9000/formatscore/format
score-1.txt"),TextInputFormat.class,ScoreMapper2.class);
</div>