如果你看了MapReduce:N keys,N files(一)这篇文章,并按其介绍的方法尝试去将N个key映射到N的文件中,你会发现分割后数据量比分割前的要多,并且有些文件不能正常读取。
用presto读取的话,可能会报这种错:
Query 20181122_073113_31966_aeiaw failed: Error opening Hive split hdfs://xxx3/dt=20181122/uiappid=300046/20181122150918_100.110.30.239_SkgSHZT8 (offset=62576808, length=62576807): Protocol message tag had invalid wire type.
【问题现象】
问题的直观现象是MR输出的orc文件,presto认为是无效的,无法读取。但并不是每一次MR的输出都会产生这种无效文件,有时有,有时没有。
【尝试一】
初步怀疑是reduce保存当前所有reduce文件RecordWriter的map被回收了(Map<String,OrcMapreduceRecordWriter> map = new HashMap<String, OrcMapreduceRecordWriter>()),但又觉得不应该,因为OrcReNameMapreduceRecordWriter一直保持着引用关系。但还是将其调到了父类OrcReNameFileOutputFormat中。如下:
package is.split;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.orc.OrcFile;
import org.apache.orc.Writer;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapreduce.OrcMapreduceRecordWriter;
import org.apache.orc.mapreduce.OrcOutputFormat;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class OrcReNameFileOutputFormat extends OrcOutputFormat {
//保存各个key对应文件的writer
static Map<String, OrcMapreduceRecordWriter> map = new HashMap<String, OrcMapreduceRecordWriter>(); //移到了父类中
//private OrcMapreduceRecordWriter realWrite ;
@Override
public RecordWriter<Text, OrcStruct> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException {
return new OrcReNameMapreduceRecordWriter(taskAttemptContext);
}
private class OrcReNameMapreduceRecordWriter extends RecordWriter<Text, OrcStruct>{
private TaskAttemptContext taskAttemptContext;
public OrcReNameMapreduceRecordWriter(TaskAttemptContext taskAttemptContext){
this.taskAttemptContext = taskAttemptContext;
}
//该函数接收的key是map阶段输出的key
public void write(Text key, OrcStruct value) throws IOException, InterruptedException {
//真正向文件中写数据的Writer,还是OrcMapreduceRecordWriter
OrcMapreduceRecordWriter realWrite = map.get(key.toString());
if (realWrite == null){
//String outputFileName = taskAttemptContext.getConfiguration().get("ouputfile.prefix.col.name", "uiappid");
String split_key = key.toString();
//输出路径文件夹,以该key为文件夹
String outputDirPath = taskAttemptContext.getConfiguration().get(FileOutputFormat.OUTDIR ) + "/" + split_key ;
//输出路径文件名,文件名是根据当前时间戳和六位随机数生成的
Path filename = new Path(new Path(outputDirPath), ISTool.getCurTime() + "_" + ISTool.getLocalIp() + "_" + RandomStringUtils.randomAlphanumeric(6) );
Writer writer = OrcFile.createWriter(filename, org.apache.orc.mapred.OrcOutputFormat.buildOptions(taskAttemptContext.getConfiguration()));
realWrite = new OrcMapreduceRecordWriter<OrcStruct>(writer);
map.put(key.toString(), realWrite);
}
realWrite.write(NullWritable.get(), value);
}
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
for (Map.Entry<String, OrcMapreduceRecordWriter> entry : map.entrySet()) {
if (entry.getValue() != null){
entry.getValue().close(context);
}
}
}
}
}
尝试修改后没有效果。。。依旧是有时候生成不完整的orc文件,有时没有。
【尝试二】
网上没有找到答案。文件名中加上机器IP后发现单个key的文件会由两台机器分别生成。一个key不应该对应一个reduce吗?为啥一个key会生成两个文件。。难道reduce跑偏了??
在浏览 https://www.cnblogs.com/YDDMAX/p/6828363.html 这篇文章的时候,看到OutputCommitter 有个abortJob方法,突然灵光一闪,无效的orc文件是不是备用的reduce任务生成的?后面查看OutputCommitter的方法验证了猜想。
MR框架会对跑的慢的reduce任务起一个备份任务,两个同时跑。如果一个reduce一个输出的话不会出现这种问题,因为reduce的输出会写到一个临时文件,只有整个reduce跑成功之后,才会将该临时文件移动到指定的输出目录。任务跑成功后掉的是commitTask方法。可以看到,任务跑成功后,OutputCommitter会调用rename方法移动文件,如果algorithmVersion大于1,还会对生成的文件做合并。
@Private
public void commitTask(TaskAttemptContext context, Path taskAttemptPath)
throws IOException {
TaskAttemptID attemptId = context.getTaskAttemptID();
if (hasOutputPath()) {
context.progress();
if(taskAttemptPath == null) {
taskAttemptPath = getTaskAttemptPath(context);
}
FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
FileStatus taskAttemptDirStatus;
try {
taskAttemptDirStatus = fs.getFileStatus(taskAttemptPath);
} catch (FileNotFoundException e) {
taskAttemptDirStatus = null;
}
if (taskAttemptDirStatus != null) {
if (algorithmVersion == 1) {
Path committedTaskPath = getCommittedTaskPath(context);
if (fs.exists(committedTaskPath)) {
if (!fs.delete(committedTaskPath, true)) {
throw new IOException("Could not delete " + committedTaskPath);
}
}
if (!fs.rename(taskAttemptPath, committedTaskPath)) { //移动文件
throw new IOException("Could not rename " + taskAttemptPath + " to "
+ committedTaskPath);
}
LOG.info("Saved output of task '" + attemptId + "' to " +
committedTaskPath);
} else {
// directly merge everything from taskAttemptPath to output directory
mergePaths(fs, taskAttemptDirStatus, outputPath);
LOG.info("Saved output of task '" + attemptId + "' to " +
outputPath);
}
} else {
LOG.warn("No Output found for " + attemptId);
}
} else {
LOG.warn("Output Path is null in commitTask()");
}
}
而如果kill掉任务,OutputCommitter会删除该任务产生的临时目录:
public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException {
if (hasOutputPath()) {
context.progress();
if(taskAttemptPath == null) {
taskAttemptPath = getTaskAttemptPath(context);
}
FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
if(!fs.delete(taskAttemptPath, true)) {//删除临时文件
LOG.warn("Could not delete "+taskAttemptPath);
}
} else {
LOG.warn("Output Path is null in abortTask()");
}
}
回到我们这个程序,我是在reduce的过程中直接向指定的输出目录中写数据,如果reduce任务被kill了,其实数据已经写进去了,被kill的reduce产生的orc文件不是一个完整的orc文件,所以存在读数据格式问题。
【解决方案】:
将reduce任务备份执行的功能取消即可。
conf.set("mapreduce.reduce.speculative","false");
conf.set("mapreduce.map.speculative","false");