MapReduce

论文地址:MapReduce: Simplified Data Processing on Large Clusters

简介

MapReduce是一个编程模型,和处理,产生大数据集的相关实现。用户指定一个map函数处理一个key/value对,从而产生中间的key/value对集。然后再指定一个reduce函数合并所有的具有相同中间key的中间value。

计算利用一个输入key/value对集,来产生一个输出key/value对集。MapReduce库的用户用两个函数表达这个计算:map和reduce。

用户自定义的map函数,接受一个输入对。然后产生一个中间key/value对集。MapReduce库把所有具有相同中间key I的中间value聚合在一起,然后把它们传递给reduce函数。

用户自定义的reduce函数,接受一个中间key I和相关的一个value集。它合并这些value,形成一个比较小的value集。一般的,每次reduce调用只产生0或1个输出value,通过一个迭代器把中间value提供给用户自定义的reduce函数。这样可以使我们根据内存来控制value列表的大小。

实例

计算在一个大的文档集合中每个词出现的次数。用户将写和下面类似的伪代码:

map(String key, String value):
 //key:文档的名字
 //value:文档的内容
 for each word w in value:
    EmitIntermediate(w, "1");
 
reduce(String key, Iterator values):
//key:一个词
//values:一个计数列表
 int result=0;
 for each v in values:
   result+=ParseInt(v);
 Emit(AsString(resut));

map函数产生每个词和这个词的出现次数(在这个简单的例子里就是1)。reduce函数把产生的每一个特定的词的计数加在一起。

另外,用户用输入输出文件的名字和可选的调节参数来填充一个mapreduce规范对象。用户然后调用MapReduce函数,并把规范对象传递给它。用户的代码和MapReduce库链接在一起。

执行过程

执行过程

通过自动分割输入数据成一个有M个split的集,map调用被分布到多台机器上。输入的split能够在不同的机器上被并行处理。通过用分割函数分割中间key,来形成R个片(例如,hash(key) mod R),reduce调用被分布到多台机器上。分割数量(R)和分割函数由用户来指定。

上图显示了我们实现的MapReduce操作的全部流程。当用户的程序调用MapReduce的函数的时候,将发生下面的一系列动作(下面的数字和图1中的数字标签相对应):

  1. 在用户程序里的MapReduce库首先分割输入文件成M个片,每个片的大小一般从 16到64MB(用户可以通过可选的参数来控制)。然后在机群中开始大量的拷贝程序。

  2. 这些程序拷贝中的一个是master,其他的都是由master分配任务的worker。有M 个map任务和R个reduce任务将被分配。管理者分配一个map任务或reduce任务给一个空闲的worker。

  3. 一个被分配了map任务的worker读取相关输入split的内容。它从输入数据中分析出key/value对,然后把key/value对传递给用户自定义的map函数。由map函数产生的中间key/value对被缓存在内存中。

  4. 缓存在内存中的key/value对被周期性的写入到本地磁盘上,通过分割函数把它们写入R个区域。在本地磁盘上的缓存对的位置被传送给master,master负责把这些位置传送给reduce worker。

  5. 当一个reduce worker得到master的位置通知的时候,它使用远程过程调用来从map worker的磁盘上读取缓存的数据。当reduce worker读取了所有的中间数据后,它通过排序使具有相同key的内容聚合在一起。因为许多不同的key映射到相同的reduce任务,所以排序是必须的。如果中间数据比内存还大,那么还需要一个外部排序。

  6. reduce worker迭代排过序的中间数据,对于遇到的每一个唯一的中间key,它把key和相关的中间value集传递给用户自定义的reduce函数。reduce函数的输出被添加到这个reduce分割的最终的输出文件中。

  7. 当所有的map和reduce任务都完成了,管理者唤醒用户程序。在这个时候,在用户程序里的MapReduce调用返回到用户代码。

在成功完成之后,mapreduce执行的输出存放在R个输出文件中(每一个reduce任务产生一个由用户指定名字的文件)。一般,用户不需要合并这R个输出文件成一个文件--他们经常把这些文件当作一个输入传递给其他的MapReduce调用,或者在可以处理多个分割文件的分布式应用中使用他们。

容错

因为MapReduce库被设计用来使用成百上千的机器来帮助处理非常大规模的数据,所以这个库必须要能很好的处理机器故障。

worker故障

master周期性的ping每个worker。如果master在一个确定的时间段内没有收到worker返回的信息,那么它将把这个worker标记成失效。因为每一个由这个失效的worker完成的map任务被重新设置成它初始的空闲状态,所以它可以被安排给其他的worker。同样的,每一个在失败的worker上正在运行的map或reduce任务,也被重新设置成空闲状态,并且将被重新调度。
在一个失败机器上已经完成的map任务将被再次执行,因为它的输出存储在它的磁盘上,所以不可访问。已经完成的reduce任务将不会再次执行,因为它的输出存储在全局文件系统中。
当一个map任务首先被worker A执行之后,又被B执行了(因为A失效了),重新执行这个情况被通知给所有执行reduce任务的worker。任何还没有从A读数据的reduce任务将从worker B读取数据。
MapReduce可以处理大规模worker失败的情况。例如,在一个MapReduce操作期间,在正在运行的机群上进行网络维护引起80台机器在几分钟内不可访问了,MapReduce master只是简单的再次执行已经被不可访问的worker完成的工作,继续执行,最终完成这个MapReduce操作。

master失败

可以很容易的让管理者周期的写入上面描述的数据结构的checkpoints。如果这个master任务失效了,可以从上次最后一个checkpoint开始启动另一个master进程。然而,因为只有一个master,所以它的失败是比较麻烦的,因此我们现在的实现是,如果master失败,就中止MapReduce计算。客户可以检查这个状态,并且可以根据需要重新执行MapReduce操作。

在错误面前的处理机制

当用户提供的map和reduce操作对它的输出值是确定的函数时,我们的分布式实现产生,和全部程序没有错误的顺序执行一样,相同的输出。
我们依赖对map和reduce任务的输出进行原子提交来完成这个性质。每个工作中的任务把它的输出写到私有临时文件中。一个reduce任务产生一个这样的文件,而一个map任务产生R个这样的文件(一个reduce任务对应一个文件)。当一个map任务完成的时候,worker发送一个消息给master,在这个消息中包含这R个临时文件的名字。如果master从一个已经完成的map任务再次收到一个完成的消息,它将忽略这个消息。否则,它在master的数据结构里记录这R个文件的名字。
当一个reduce任务完成的时候,这个reduce worker原子的把临时文件重命名成最终的输出文件。如果相同的reduce任务在多个机器上执行,多个重命名调用将被执行,并产生相同的输出文件。我们依赖由底层文件系统提供的原子重命名操作来保证,最终的文件系统状态仅仅包含一个reduce任务产生的数据。
我们的map和reduce操作大部分都是确定的,并且我们的处理机制等价于一个顺序的执行的这个事实,使得程序员可以很容易的理解程序的行为。当map或/和reduce操作是不确定的时候,我们提供虽然比较弱但是合理的处理机制。当在一个非确定操作的前面,一个reduce任务R1的输出等价于一个非确定顺序程序执行产生的输出。然而,一个不同的reduce任务R2的输出也许符合一个不同的非确定顺序程序执行产生的输出。
考虑map任务M和reduce任务R1,R2的情况。我们设定e(Ri)为已经提交的Ri的执行(有且仅有一个这样的执行)。这个比较弱的语义出现,因为e(R1)也许已经读取了由M的执行产生的输出,而e(R2)也许已经读取了由M的不同执行产生的输出。

关键优化

存储位置

在我们的计算机环境里,网络带宽是一个相当缺乏的资源。我们利用把输入数据(由GFS管理)存储在机器的本地磁盘上来保存网络带宽。GFS把每个文件分成64MB的一些块,然后每个块的几个拷贝存储在不同的机器上(一般是3个拷贝)。MapReduce的master考虑输入文件的位置信息,并且努力在一个包含相关输入数据的机器上安排一个map任务。如果这样做失败了,它尝试在那个任务的输入数据的附近安排一个map任务(例如,分配到一个和包含输入数据块在一个switch里的worker机器上执行)。当运行巨大的MapReduce操作在一个机群中的一部分机器上的时候,大部分输入数据在本地被读取,从而不消耗网络带宽。

任务粒度

像上面描述的那样,我们细分map阶段成M个片,reduce阶段成R个片。M和R应当比worker机器的数量大许多。每个worker执行许多不同的工作来提高动态负载均衡,也可以加速从一个worker失效中的恢复,这个机器上的许多已经完成的map任务可以被分配到所有其他的worker机器上。
在我们的实现里,M和R的范围是有大小限制的,因为master必须做O(M+R)次调度,并且保存O(MR)个状态在内存中。(这个因素使用的内存是很少的,在O(MR)个状态片里,大约每个map任务/reduce任务对使用一个字节的数据)。
此外,R经常被用户限制,因为每一个reduce任务最终都是一个独立的输出文件。实际上,我们倾向于选择M,以便每一个单独的任务大概都是16到64MB的输入数据(以便上面描述的位置优化是最有效的),我们把R设置成我们希望使用的worker机器数量的小倍数。我们经常执行MapReduce计算,在M=200000,R=5000,使用2000台工作者机器的情况下。

备用任务

一个落后者是延长MapReduce操作时间的原因之一:一个机器花费一个异乎寻常地的长时间来完成最后的一些map或reduce任务中的一个。有很多原因可能产生落后者。例如,一个有坏磁盘的机器经常发生可以纠正的错误,这样就使读性能从30MB/s降低到3MB/s。机群调度系统也许已经安排其他的任务在这个机器上,由于计算要使用CPU,内存,本地磁盘,网络带宽的原因,引起它执行MapReduce代码很慢。我们最近遇到的一个问题是,一个在机器初始化时的Bug引起处理器缓存的失效:在一个被影响的机器上的计算性能有上百倍的影响。
我们有一个一般的机制来减轻这个落后者的问题。当一个MapReduce操作将要完成的时候,master调度备用进程来执行那些剩下的还在执行的任务。无论是原来的还是备用的执行完成了,工作都被标记成完成。我们已经调整了这个机制,通常只会占用多几个百分点的机器资源。我们发现这可以显著的减少完成大规模MapReduce操作的时间。作为一个例子,将要在5。3描述的排序程序,在关闭掉备用任务的情况下,要比有备用任务的情况下多花44%的时间。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352

推荐阅读更多精彩内容