mapreduce基础知识

MapReduce Demo 个人github上一些简单的mapreduce demo,个人学习代码笔记。

  • demo0 : reduce output 选择 MultipleOutput
  • demo1 : 自定义序列化对象实现 Writable 接口
  • demo2 : 自定义Partitioner分区对象
  • demo3 : 自定义Partitioner和Reduce端的GroupingComparator对象
  • demo4 : 小文件合并,自定义InputFormat对象和RecordReader对象
  • demo5 : join实现

InputFormat 详解

一般默认对于MapReduce任务来说,默认的InputFormatTextInputFormat

下面是关于任务启动的读取配置的一些class,在org.apache.hadoop.mapreduce包内

  • TextInputFormat -> FileInputFormat -> InputFormat
  • FileSplit -> InputSplit
  • LineRecordReader -> RecordReader

下面关于如何启动任务的一些class,在org.apache.hadoop.mapred包内

  • JvmTask
  • MapTask 很重要的一个类,如果想要详细理解Map过程,可以精读这部分代码。
  • ReduceTask也是一个十分重要的类,如果想详细理解Reduce过程,可以精读这部分代码。


/** 
 * <code>InputFormat</code> describes the input-specification for a 
 * Map-Reduce job. 
 * 
 * <p>The Map-Reduce framework relies on the <code>InputFormat</code> of the
 * job to:<p>
 * <ol>
 *   <li>
 *   Validate the input-specification of the job. 
 *   <li>
 *   Split-up the input file(s) into logical {@link InputSplit}s, each of 
 *   which is then assigned to an individual {@link Mapper}.
 *   </li>
 *   <li>
 *   Provide the {@link RecordReader} implementation to be used to glean
 *   input records from the logical <code>InputSplit</code> for processing by 
 *   the {@link Mapper}.
 *   </li>
 * </ol>
 * 
 * <p>The default behavior of file-based {@link InputFormat}s, typically 
 * sub-classes of {@link FileInputFormat}, is to split the 
 * input into <i>logical</i> {@link InputSplit}s based on the total size, in 
 * bytes, of the input files. However, the {@link FileSystem} blocksize of  
 * the input files is treated as an upper bound for input splits. 
 
 // 切分文件也是具有一个上下界的 
    1. 上界(HDFS BlockSize)
    2. 下届可以通过参数配置
 
 A lower bound 
 * on the split size can be set via 
 * <a href="{@docRoot}/../hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml#mapreduce.input.fileinputformat.split.minsize">
 * mapreduce.input.fileinputformat.split.minsize</a>.</p>
 * 
 * <p>Clearly, logical splits based on input-size is insufficient for many 
 * applications since record boundaries are to respected. In such cases, the
 * application has to also implement a {@link RecordReader} on whom lies the
 * responsibility to respect record-boundaries and present a record-oriented
 * view of the logical <code>InputSplit</code> to the individual task.
 
 上面注释很有意义,getInputSplit返回splits交给每一个Mapper,这都是逻辑层面上的,但是RecordReader可以读取每一个InputSplit的
 数据交给Mapper的map函数进行处理。
 
 */




// InputFormat<K,V> 是一个抽象类,只有两个方法
public abstract class InputFormat<K, V> {

  /** 
   * Logically split the set of input files for the job.  
   * 
   * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
   * for processing.</p>
   *
   * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
   * input files are not physically split into chunks. For e.g. a split could
   * be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat
   * also creates the {@link RecordReader} to read the {@link InputSplit}.
   * 
   * @param context job configuration.
   * @return an array of {@link InputSplit}s for the job.
   */
  public abstract 
    List<InputSplit> getSplits(JobContext context
                               ) throws IOException, InterruptedException;
  
  /**
   * Create a record reader for a given split. The framework will call
   * {@link RecordReader#initialize(InputSplit, TaskAttemptContext)} before
   * the split is used.   首先调用initialize方法,接收 InputSplit 和 MapTask 上下文Context
   * @param split the split to be read
   * @param context the information about the task
   * @return a new record reader
   * @throws IOException
   * @throws InterruptedException
   */
  public abstract 
    RecordReader<K,V> createRecordReader(InputSplit split,
                                         TaskAttemptContext context
                                        ) throws IOException, 
                                                 InterruptedException;

}

如何构建一个Mapper的呢?

首先来看JvmTask,这其实就是一个Java虚拟机上的Task,实现序列化,实现Writable接口,根据任务类型来生成一个MapTask或者ReduceTask内部对象,这些对象包含了Task该有的信息了。然后根据Job Conf构建Mapper对象,获取对应的InputFormat获取对应的RecordReader,初始化RecordReader,不停读取InputSplit对应的文件内容,然后再交给Mapper.map方法去处理,InputSplit对应的就是FileSplit,保存的是该Block的基本信息,包括以下基本属性。

  private Path file;       // 文件路径
  private long start;      // offest 起始位置
  private long length;     // block 块大小长度
  private String[] hosts;     // 该block在哪些主机上
  private SplitLocationInfo[] hostInfos;     // 主机的信息

针对RecordReader,我们来谈谈LineRecordReader。

public class LineRecordReader extends RecordReader<LongWritable, Text> {

  private long start;
  private long pos;
  private long end;
  ......
  
  
  public void initialize(InputSplit genericSplit,
                         TaskAttemptContext context) throws IOException {
    ......   // 根据压缩方式去构建 decompressor
    
    // If this is not the first split, we always throw away first record
    // because we always (except the last split) read one extra line in
    // next() method.
    // 很重要的代码,会直接忽略开始的那一行
    if (start != 0) {
      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
    }
    this.pos = start;
  }
  
  
  // 如果最后一行是被截断的,会顺利读完剩下的内容进来。
  public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = new LongWritable();
    }
    key.set(pos);
    if (value == null) {
      value = new Text();
    }
    int newSize = 0;
    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) {
      if (pos == 0) {
        newSize = skipUtfByteOrderMark();
      } else {
        newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos));
        pos += newSize;
      }

      if ((newSize == 0) || (newSize < maxLineLength)) {
        break;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + 
               (pos - newSize));
    }
    if (newSize == 0) {
      key = null;
      value = null;
      return false;
    } else {
      return true;
    }
  }
}

分析下JobSubmitter对象

class JobSubmitter {

  // submitFs 连接HDFS对象
  // ClientProtocol  连接RM对象
  JobSubmitter(FileSystem submitFs, ClientProtocol submitClient) 
  throws IOException {
    this.submitClient = submitClient;
    this.jtFs = submitFs;
  }
  
  /*
   * @param job the configuration to submit
   * @param cluster the handle to the Cluster
  */
  JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {
    ......
    // job .staging 目录
    Path jobStagingArea JobSubmissionFiles.getStagingDir(cluster, conf);
    ......
    JobID jobId = submitClient.getNewJobID();
    job.setJobID(jobId);
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    JobStatus status = null;   // 获取 MapReduce Job信息
    
    ......
    copyAndConfigureFiles(job, submitJobDir);  // 写Job信息到HDFS里面
    // Create the splits for the job
    LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
    int maps = writeSplits(job, submitJobDir);  // 根据InputFormat划分split
    conf.setInt(MRJobConfig.NUM_MAPS, maps);   // 写进conf
    LOG.info("number of splits:" + maps);
    
    ......
    // Write job file to submit dir
    writeConf(conf, submitJobFile);
    
    ......
    status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());  // 获取job状态
  }
}

mapreduce过程笔记


mapreduce.png
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,928评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,192评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,468评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,186评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,295评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,374评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,403评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,186评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,610评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,906评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,075评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,755评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,393评论 3 320
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,079评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,313评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,934评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,963评论 2 351

推荐阅读更多精彩内容

  • MapReduce工作流程 流程图如下 解释上面的流程是整个mapreduce最全工作流程,但是shuffle过程...
    ZFH__ZJ阅读 559评论 0 3
  • 数据切片和MapTask并行度决定机制 1)一个Job的Map阶段并行度由客户端在提交Job时的切片数决定 2)每...
    bullion阅读 788评论 0 1
  • MapReduce介绍 在Hadoop中计算模型使用的是MapReduce。Hadoop MapReduce计算编...
    spraysss阅读 305评论 0 1
  • 武功山,这个名字,对于从小就喜欢看武侠的人来说,仅名字就有足够的吸引力了;而对于既喜欢武侠又喜欢爬山的我而言...
    笑淡风云77阅读 331评论 0 2
  • 曦光 寻找我心底 最阴暗的一隅 跌跌撞撞 一身残伤犹在 似青春画下 祭奠无邪的淡妆 无意间 无意中 面对不安的遥远...
    和芷雪阅读 279评论 12 46