摘要
MapReduce 是一个编程模型,也是一个处理和生成超大数据集的算法模型的相关实现。用户首先创建一个 Map 函数处理一个基于 key/value pair 的数据集合,输出中间的基于 key/value pair 的数据集合;然后再创建一个 Reduce 函数用来合并所有的具有相同中间 key 值的中间 value 值。
需要解决的问题:
- 如何分割输入数据
- 在大量计算机组成的集群上的调度
- 集群中计算机的错误处理
- 管理集群中计算机之间必要的通信。
1. 介绍
设计MapReduce模型的起因:
由于数据量巨大,为了效率,就必须用多台机器进行跑任务,就会出现如何处理并行计算、如何分发数据、如何处理错误等需要解决的问题。就需要一个框架能隐藏并行计算、容错、数据分布、负载均衡等复杂的细节。
2. 编程模型
MapReduce 编程模型的原理是:利用一个输入 key/value pair 集合来产生一个输出的 key/value pair 集合。MapReduce 库的用户用两个函数表达这个计算:Map 和 Reduce。用户自定义的 Map 函数接受一个输入的 key/value pair 值,然后产生一个中间 key/value pair 值的集合。MapReduce 库把所有具有相同中间 key 值 I 的中间 value 值集合在一起后传递给 reduce 函数。用户自定义的 Reduce 函数接受一个中间 key 的值 I 和相关的一个 value 值的集合。Reduce 函数合并这些value 值,形成一个较小的 value 值的集合。一般的,每次 Reduce 函数调用只产生 0 或 1 个输出 value 值。通常我们通过一个迭代器把中间 value 值提供给 Reduce 函数,这样我们就可以处理无法全部放入内存中的大量的 value 值的集合。
3. 实现
3.1 执行概括
通过将 Map 调用的输入数据自动分割为 M 个数据片段的集合,Map 调用被分布到多台机器上执行。输入的数据片段能够在不同的机器上并行处理。使用分区函数将 Map 调用产生的中间 key 值分成 R 个不同分区(例如,hash(key) mod R),Reduce 调用也被分布到多台机器上执行。分区数量(R)和分区函数由用户来指定。
图 1 展示了我们的 MapReduce 实现中操作的全部流程。当用户调用 MapReduce 函数时,将发生下面的一
系列动作(下面的序号和图 1 中的序号一一对应):
- 用户程序首先调用的 MapReduce 库将输入文件分成 M 个数据片度,每个数据片段的大小一般从16MB 到 64MB(可以通过可选的参数来控制每个数据片段的大小)。然后用户程序在机群中创建大量的程序副本。
- 这些程序副本中的有一个特殊的程序–master。副本中其它的程序都是 worker 程序,由 master 分配任务。有 M 个 Map 任务和 R 个 Reduce 任务将被分配,master 将一个 Map 任务或 Reduce 任务分配给一个空闲的 worker。
- 被分配了 map 任务的 worker 程序读取相关的输入数据片段,从输入的数据片段中解析出 key/value pair,然后把 key/value pair 传递给用户自定义的 Map 函数,由 Map 函数生成并输出的中间 key/value pair,并缓存在内存中。
- 缓存中的 key/value pair 通过分区函数分成 R 个区域,之后周期性的写入到本地磁盘上。缓存的key/value pair 在本地磁盘上的存储位置将被回传给 master,由 master 负责把这些存储位置再传送给Reduce worker。
- 当 Reduce worker 程序接收到 master 程序发来的数据存储位置信息后,使用 RPC 从 Map worker 所在主机的磁盘上读取这些缓存数据。当 Reduce worker 读取了所有的中间数据后,通过对 key 进行排序后使得具有相同 key 值的数据聚合在一起。由于许多不同的 key 值会映射到相同的 Reduce 任务上,因此必须进行排序。如果中间数据太大无法在内存中完成排序,那么就要在外部进行排序。
- Reduce worker 程序遍历排序后的中间数据,对于每一个唯一的中间 key 值,Reduce worker 程序将这个 key 值和它相关的中间 value 值的集合传递给用户自定义的 Reduce 函数。Reduce 函数的输出被追加到所属分区的输出文件。
- 当所有的 Map 和 Reduce 任务都完成之后,master 唤醒用户程序。在这个时候,在用户程序里的对MapReduce 调用才返回。在成功完成任务之后,MapReduce 的输出存放在 R 个输出文件中(对应每个 Reduce 任务产生一个输出文件,文件名由用户指定)。一般情况下,用户不需要将这 R 个输出文件合并成一个文件–他们经常把这些文件作为另外一个 MapReduce 的输入,或者在另外一个可以处理多个分割文件的分布式应用中使用。
3.2master数据结构
Master 持有一些数据结构,它存储每一个 Map 和 Reduce 任务的状态(空闲、工作中或完成),以及 Worker机器(非空闲任务的机器)的标识。
Master 就像一个数据管道,中间文件存储区域的位置信息通过这个管道从 Map 传递到 Reduce。因此,对于每个已经完成的 Map 任务,master 存储了 Map 任务产生的 R 个中间文件存储区域的大小和位置。当 Map
任务完成时,Master 接收到位置和大小的更新信息,这些信息被逐步递增的推送给那些正在工作的 Reduce 任务。
3.3容错
3.3.1worker故障
master 周期性的 ping 每个 worker。如果在一个约定的时间范围内没有收到 worker 返回的信息,master 将把这个 worker 标记为失效。所有由这个失效的 worker 完成的 Map 任务被重设为初始的空闲状态,之后这些
任务就可以被安排给其他的 worker。同样的,worker 失效时正在运行的 Map 或 Reduce 任务也将被重新置为空闲状态,等待重新调度。
当 worker 故障时,由于已经完成的 Map 任务的输出存储在这台机器上,Map 任务的输出已不可访问了,因此必须重新执行。而已经完成的 Reduce 任务的输出存储在全局文件系统上,因此不需要再次执行。
当一个 Map 任务首先被 worker A 执行,之后由于 worker A 失效了又被调度到 worker B 执行,这个“重新执行”的动作会被通知给所有执行 Reduce 任务的 worker。任何还没有从 worker A 读取数据的 Reduce 任务将从 worker B 读取数据。
3.3.2master失败
一个简单的解决办法是让 master 周期性的将上面描述的数据结构写入磁盘,即检查点(checkpoint)。如果这个 master 任务失效了,可以从最后一个检查点(checkpoint)开始启动另一个master 进程。然而,由于只有一个 master 进程,master 失效后再恢复是比较麻烦的,因此我们现在的实现是如果 master 失效,就中止 MapReduce 运算。客户可以检查到这个状态,并且可以根据需要重新执行 MapReduce操作。
3.4存储位置
我们通过尽量把输入数据(由 GFS 管理)存储在集群中机器的本地磁盘上来节省网络带宽。GFS 把每个文件按 64MB 一个 Block 分隔,每个 Block 保存在多台机器上,环境中就存放了多份拷贝(一般是 3 个拷贝)。MapReduce 的 master 在调度 Map 任务时会考虑输入文件的位置信息,尽量将一个 Map 任务调度在包含相关输入数据拷贝的机器上执行;如果上述努力失败了,master 将尝试在保存有输入数据拷贝的机器附近的机器上执行 Map 任务(例如,分配到一个和包含输入数据的机器在一个 switch 里的 worker 机器上执行)。
3.6备用任务
影响一个 MapReduce 的总执行时间最通常的因素是“落伍者”:在运算过程中,如果有一台机器花了很长的时间才完成最后几个 Map 或 Reduce 任务,导致 MapReduce 操作总的执行时间超过预期。我们有一个通用的机制来减少“落伍者”出现的情况。当一个 MapReduce 操作接近完成的时候,master调度备用(backup)任务进程来执行剩下的、处于处理中状态(in-progress)的任务。无论是最初的执行进程、还是备用(backup)任务进程完成了任务,我们都把这个任务标记成为已经完成。我们调优了这个机制,通常只会占用比正常操作多几个百分点的计算资源。我们发现采用这样的机制对于减少超大 MapReduce 操作的总处理时间效果显著。
4. 技巧
4.1分区函数
MapReduce 的使用者通常会指定 Reduce 任务和 Reduce 任务输出文件的数量(R)。我们在中间 key 上使用分区函数来对数据进行分区,之后再输入到后续任务执行进程。一个缺省的分区函数是使用 hash 方法(比如,hash(key) mod R)进行分区。hash 方法能产生非常平衡的分区。
4.2顺序保证
我们确保在给定的分区中,中间 key/value pair 数据的处理顺序是按照 key 值增量顺序处理的。这样的顺序保证对每个分成生成一个有序的输出文件,这对于需要对输出文件按 key 值随机存取的应用非常有意义,对在排序输出的数据集也很有帮助。
4.3combiner函数
在某些情况下,Map 函数产生的中间 key 值的重复数据会占很大的比重,我们允许用户指定一个可选的 combiner 函数,combiner 函数首先在本地将这些记录进行一次合并,然后将合并的结果再通过网络发送给reduce。Combiner 函数在每台执行 Map 任务的机器上都会被执行一次。一般情况下,Combiner 和 Reduce 函数是一样的。Combiner 函数和 Reduce 函数之间唯一的区别是 MapReduce 库怎样控制函数的输出。Reduce 函数的输出被保存在最终的输出文件里,而 Combiner 函数的输出被写到中间文件里,然后被发送给 Reduce 任务。部分的合并中间结果可以显著的提高一些 MapReduce 操作的速度