MapReduce编程模型

本篇文章介绍一下MapReduce[1]分布式计算

先回顾一下Hadoop架构:
Hadoop由HDFS分布式存储、MR分布式计算、Yarn资源调度三部分组成

Hadoop

MR
  • MR是采用一种分而治之[2]的思想设计出来的分布式计算框架
  • MR由两个阶段组成:
    • Map阶段(切分成一个个小的任务)
    • Reduce阶段(汇总小任务的结果)
MR分治
Map阶段
  • map阶段有一个关键的map()函数。
  • 此函数的输入是键值对
  • 输出是一系列的键值对,输出写到本地磁盘
Reduce阶段
  • reduce阶段有一个关键的函数reduce函数。
  • 此函数的输入也是键值对(即map的输出kv对)。
  • 输出也是一系列的键值对,结果最终写入HDFS。

Map&Reduce工作流程图如下:

Map和Reduce工作流程

下面以MR的词频统计为例,详细介绍MR工作流程。
需求:统计一批英文文章中,每个单词出现的总次数。
假设:现在有一个输入文件"Gone With The Wind",这个文件有三个block:block1, block2, block3。三个block的内容依次如下图。

MR原理图
  • Map阶段
    • 每一个block对应一个分片split[3] (默认split与block一一对应)。
    • 每一个split对应一个map任务(map task)。所以这里三个block将会对应三个map task(map1, map2, map3),这3个任务的逻辑完全一样。
    • 以map1为例。map1会读取block1的数据,一次读取block1的一行数据,然后会产生一个kv对(其中,key是当前所读行的行首相对于当前block开始处的字节偏移量;value是当前行的内容;如假设当前所读行是第一行,那么当前行的内容是"Dear Bear River",则kv对是(0, "Dear Bear River")),作为map()的参数传入,调用map()。
    • map()方法。将value当前行内容按空格切分,得到三个单词Dear|Bear|River,然后将每个单词变成键值对(Dear, 1)|(Bear, 1)|(River, 1),最终结果输出为文件,写入map任务所在节点的本地磁盘中(其中还有一个Shuffle的过程,下文会详细讲解)。
    • block的第一行数据被处理完后,接着处理第二行,当map任务将当前block所有的数据全部处理完后,此map任务即运行结束。
  • Reduce阶段
    • reduce任务(reduce task)的个数由用户程序指定,main()内的job.setNumReduceTask(4)指定reduce任务是4个(reduce1, reduce2, reduce3, reduce4)。
    • 以reduce1为例。reduce1通过网络,连接到map1,将map1输出结果中属于reduce1的分区的数据通过网络获取到reduce1端(拷贝阶段)。同样地,也会连接到map2,map3获取数据。最终reduce1端获得4个(Dear, 1)键值对;由于key键相同,它们分到同一个组[4]。4个(Dear, 1)键值对,转换成[Dear, Iterable(1, 1, 1, )],作为两个参数传入(其中还有一个Shuffle的过程,下文会详细讲解),调用reduce()。
    • reduce()方法。计算Dear的总数为4,并将(Dear, 4)作为键值对输出,最终结果输出成文件,写入HDFS。
MR中key的作用

MR编程中,key有特殊的作用:
数据中,若要针对某个值进行分组、聚合时,需将此值作为MR中的reduce的输入的key。 如上边词频统计例子,按单词进行分组,每组中对出现次数做聚合(计算总和);所以需要将每个单词作为reduce输入的key,MapReduce框架自动按照单词分组,进而求出每组即每个单词的总次数。

聚合

另外,key还具有可排序的特性,因为MR中的key类需要实现WritableComparable接口;而此接口又继承Comparable接口(可查看源码)。

排序

MR编程时,要充分利用以上两点;结合实际业务需求,设置合适的key。

Shuffle

前面在讲map和reduce的工作原理的时候,对于map的处理结果只是简单地说保存在磁盘,而对于reduce,也只是简单地说了从map端获取处理结果作为其输入。这两个过程其实并不是那么那么简单,当中还有一个shuffle的过程。

Shuffle简图
Shuffle细节图

其中,分区用到了分区器,默认分区器是HashPartitioner,源码:

public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {

  public void configure(JobConf job) {}

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K2 key, V2 value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}
  • map端

    • 每个map任务都有一个对应的环形内存缓冲区(如上图),对于map()输出的kv对并不是立马就写入磁盘,而是先写入到一个环形缓冲区(默认大小是100M),当内容占据80%的缓冲区空间后,由一个后台线程将缓冲区中的数据溢出写到一个磁盘文件。
    • 在溢写的过程中,map任务可以继续向环形缓冲区写入数据;但是若写入速度大于溢出写的速度,最终造成100m占满后,map任务会暂停向环形缓冲区中写数据的过程;只执行溢出写的过程;直到环形缓冲区的数据全部溢出写到磁盘,才恢复向缓冲区写入。
    • 后台线程溢写磁盘过程,有以下几个步骤:
      • 先对每个溢写的kv对做分区;分区的个数由MR程序的reduce任务数决定; 默认使用HashPartitioner计算当前kv对属于哪个分区;计算公式:(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks(见上面源码)
      • 每个分区中,根据kv对的key做内存中排序。
      • 若设置了map端本地聚合combiner,则对每个分区中,排好序的数据做combine[5]操作。
      • 若设置了对map输出压缩的功能,会对溢写数据压缩。
    • 随着不断的向环形缓冲区中写入数据,会多次触发溢写(每当环形缓冲区写满100m),本地磁盘最终会生成多个溢出文件。
    • 合并溢写文件:在map task完成之前,所有溢出文件会被合并成一个大的溢出文件;且是已分区、已排序的输出文件(如上图,每个计算节点上保存的合并后的文件都有4个分区,每个分区内的kv对都是已经排好了序)。
  • 在合并溢写文件时,如果至少有3个溢写文件,并且设置了map端combine的话,会在合并的过程中触发combine操作;
  • 但是若只有2个或1个溢写文件,则不触发combine操作(因为combine操作,本质上是一个reduce,需要启动JVM虚拟机,有一定的开销)
  • reduce端
  • reduce task会在每个map task运行完成后,通过HTTP获得map task输出中,属于自己的分区数据(许多kv对),如果map输出数据比较小,先保存在reduce的jvm内存中,否则直接写入reduce磁盘。一旦内存缓冲区达到阈值(默认0.66)或map输出数的阈值(默认1000),则触发归并merge,结果写到本地磁盘。
  • 若MR编程指定了combine,在归并过程中会执行combine操作。
  • 随着溢出写的文件的增多,后台线程会将它们合并大的、排好序的文件。
  • reduce task将所有map task复制完后,将合并磁盘上所有的溢出文件,默认一次合并10个,最后一批合并,部分数据来自内存,部分来自磁盘上的文件。
  • 合并完成之后,进入“归并、排序、分组阶段”。
  • 最后每组数据调用一次reduce方法。
小结:

shuffle主要指的是map端的输出作为reduce端输入的过程。

  • map端
    • map()输出结果先写入环形缓冲区。
    • 缓冲区100M;写满80M后,开始溢出写磁盘文件。
    • 此过程中,会进行分区、排序、combine(可选)、压缩(可选)
    • map任务完成前,会将多个小的溢出文件,合并成一个大的溢出文件(已分区、排序)。
  • reduce端
    • 拷贝阶段:reduce任务通过http将map任务属于自己的分区数据拉取过来。
    • 开始merge及溢出写磁盘文件。
    • 所有map任务的分区全部拷贝过来后,进行阶段合并、排序、分组阶段。
    • 每组数据调用一次reduce()。
    • 结果写入HDFS。

拓展阅读:


  1. MapReduce可简称为MR。

  2. 如果有一组大任务(复杂,计算量大,耗时较长的任务),使用单台服务器无法计算或者叫段时间内计算出结果时,可将此大任务切分成一个个小的任务,小任务分别在不同服务器上并行的执行,最终再汇总每个小任务的结果。

  3. split 是一个逻辑概念,它只包含一些元数据信息,比如 、数据起始位置、数据长度、数据所在节点等。它的划分方法完全由用户自己决定。

  4. 实际情况是存在多个不同的键,然后会根据键分组,相同的键分到一个组。

  5. 将如kv对("poem", 3)和("poem", 5)键值对合并的过程,叫combine操作,将map()结果写入磁盘之前进行combine可以减少带宽消耗。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容