MapReduce:N keys,N files(二)

如果你看了MapReduce:N keys,N files(一)这篇文章,并按其介绍的方法尝试去将N个key映射到N的文件中,你会发现分割后数据量比分割前的要多,并且有些文件不能正常读取。
用presto读取的话,可能会报这种错:

Query 20181122_073113_31966_aeiaw failed: Error opening Hive split hdfs://xxx3/dt=20181122/uiappid=300046/20181122150918_100.110.30.239_SkgSHZT8 (offset=62576808, length=62576807): Protocol message tag had invalid wire type.

【问题现象】

问题的直观现象是MR输出的orc文件,presto认为是无效的,无法读取。但并不是每一次MR的输出都会产生这种无效文件,有时有,有时没有。

【尝试一】

初步怀疑是reduce保存当前所有reduce文件RecordWriter的map被回收了(Map<String,OrcMapreduceRecordWriter> map = new HashMap<String, OrcMapreduceRecordWriter>()),但又觉得不应该,因为OrcReNameMapreduceRecordWriter一直保持着引用关系。但还是将其调到了父类OrcReNameFileOutputFormat中。如下:

package is.split;

import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.orc.OrcFile;
import org.apache.orc.Writer;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapreduce.OrcMapreduceRecordWriter;
import org.apache.orc.mapreduce.OrcOutputFormat;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class OrcReNameFileOutputFormat extends OrcOutputFormat {
    //保存各个key对应文件的writer
    static Map<String, OrcMapreduceRecordWriter> map = new HashMap<String, OrcMapreduceRecordWriter>(); //移到了父类中
        //private OrcMapreduceRecordWriter realWrite ;
    @Override
    public RecordWriter<Text, OrcStruct> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException {
        return new OrcReNameMapreduceRecordWriter(taskAttemptContext);
    }


    private class OrcReNameMapreduceRecordWriter extends RecordWriter<Text, OrcStruct>{

        private TaskAttemptContext taskAttemptContext;

        public OrcReNameMapreduceRecordWriter(TaskAttemptContext taskAttemptContext){
            this.taskAttemptContext = taskAttemptContext;
        }

        //该函数接收的key是map阶段输出的key
        public void write(Text key, OrcStruct value) throws IOException, InterruptedException {
          //真正向文件中写数据的Writer,还是OrcMapreduceRecordWriter
            OrcMapreduceRecordWriter realWrite = map.get(key.toString());
            if (realWrite == null){
                //String outputFileName = taskAttemptContext.getConfiguration().get("ouputfile.prefix.col.name", "uiappid");
            
                String split_key =  key.toString();
                //输出路径文件夹,以该key为文件夹
                String outputDirPath = taskAttemptContext.getConfiguration().get(FileOutputFormat.OUTDIR ) + "/" + split_key ;
                //输出路径文件名,文件名是根据当前时间戳和六位随机数生成的
                Path filename = new Path(new Path(outputDirPath), ISTool.getCurTime() + "_" + ISTool.getLocalIp() + "_" + RandomStringUtils.randomAlphanumeric(6) );
                Writer writer = OrcFile.createWriter(filename, org.apache.orc.mapred.OrcOutputFormat.buildOptions(taskAttemptContext.getConfiguration()));
                realWrite = new OrcMapreduceRecordWriter<OrcStruct>(writer);
                map.put(key.toString(), realWrite);
            }
            realWrite.write(NullWritable.get(), value);
        }

        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            for (Map.Entry<String, OrcMapreduceRecordWriter> entry : map.entrySet()) {
                if (entry.getValue() != null){
                    entry.getValue().close(context);
                }

            }
        }
    }
}

尝试修改后没有效果。。。依旧是有时候生成不完整的orc文件,有时没有。

【尝试二】

网上没有找到答案。文件名中加上机器IP后发现单个key的文件会由两台机器分别生成。一个key不应该对应一个reduce吗?为啥一个key会生成两个文件。。难道reduce跑偏了??
在浏览 https://www.cnblogs.com/YDDMAX/p/6828363.html 这篇文章的时候,看到OutputCommitter 有个abortJob方法,突然灵光一闪,无效的orc文件是不是备用的reduce任务生成的?后面查看OutputCommitter的方法验证了猜想。
MR框架会对跑的慢的reduce任务起一个备份任务,两个同时跑。如果一个reduce一个输出的话不会出现这种问题,因为reduce的输出会写到一个临时文件,只有整个reduce跑成功之后,才会将该临时文件移动到指定的输出目录。任务跑成功后掉的是commitTask方法。可以看到,任务跑成功后,OutputCommitter会调用rename方法移动文件,如果algorithmVersion大于1,还会对生成的文件做合并。

  @Private
  public void commitTask(TaskAttemptContext context, Path taskAttemptPath) 
      throws IOException {

    TaskAttemptID attemptId = context.getTaskAttemptID();
    if (hasOutputPath()) {
      context.progress();
      if(taskAttemptPath == null) {
        taskAttemptPath = getTaskAttemptPath(context);
      }
      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
      FileStatus taskAttemptDirStatus;
      try {
        taskAttemptDirStatus = fs.getFileStatus(taskAttemptPath);
      } catch (FileNotFoundException e) {
        taskAttemptDirStatus = null;
      }

      if (taskAttemptDirStatus != null) {
        if (algorithmVersion == 1) {
          Path committedTaskPath = getCommittedTaskPath(context);
          if (fs.exists(committedTaskPath)) {
             if (!fs.delete(committedTaskPath, true)) {
               throw new IOException("Could not delete " + committedTaskPath);
             }
          }
          if (!fs.rename(taskAttemptPath, committedTaskPath)) { //移动文件
            throw new IOException("Could not rename " + taskAttemptPath + " to "
                + committedTaskPath);
          }
          LOG.info("Saved output of task '" + attemptId + "' to " +
              committedTaskPath);
        } else {
          // directly merge everything from taskAttemptPath to output directory
          mergePaths(fs, taskAttemptDirStatus, outputPath);
          LOG.info("Saved output of task '" + attemptId + "' to " +
              outputPath);
        }
      } else {
        LOG.warn("No Output found for " + attemptId);
      }
    } else {
      LOG.warn("Output Path is null in commitTask()");
    }
  }

而如果kill掉任务,OutputCommitter会删除该任务产生的临时目录:

 public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException {
    if (hasOutputPath()) { 
      context.progress();
      if(taskAttemptPath == null) {
        taskAttemptPath = getTaskAttemptPath(context);
      }
      FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration());
      if(!fs.delete(taskAttemptPath, true)) {//删除临时文件
        LOG.warn("Could not delete "+taskAttemptPath);
      }
    } else {
      LOG.warn("Output Path is null in abortTask()");
    }
  }

回到我们这个程序,我是在reduce的过程中直接向指定的输出目录中写数据,如果reduce任务被kill了,其实数据已经写进去了,被kill的reduce产生的orc文件不是一个完整的orc文件,所以存在读数据格式问题。

【解决方案】:

将reduce任务备份执行的功能取消即可。

conf.set("mapreduce.reduce.speculative","false");
conf.set("mapreduce.map.speculative","false");
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349