MapReduce
一、什么是MapReduce
1.1 定义:
MapReduce是Google提出的一个软件架构,用于大规模数据集(大于1TB)的并行运算。概念“Map(映射)”和“Reduce(归纳)”,及他们的主要思想,都是从函数式编程语言借来的,还有从矢量编程语言借来的特性。主要功能是运用n多台计算机处理同一堆海量数据以此得到最终结果。
MapReduce包括两个步骤:一是Map,二是Reduce。
- Map:Map就是把一个输入映射为一组(多个)全新的数据,而不去改变原始数据。
- Reduce:简化,就是把通过Map得到的一组数据经过某些方法归一成输出值。
Google文档原文:
MapReduce是一个编程模型,也是一个处理和生成超大数据集的算法模型的相关实现。用户首先创建一个Map函数处理一个基于key/value pair的数据集合,输出中间的基于key/value pair的数据集合;然后再创建一个Reduce函数用来合并所有的具有相同中间key值的中间value值。现实世界中有很多满足上述处理模型的例子,本论文将详细描述这个模型。
MapReduce架构的程序能够在大量的普通配置的计算机上实现并行化处理。这个系统在运行时只关心:如何分割输入数据,在大量计算机组成的集群上的调度,集群中计算机的错误处理,管理集群中计算机之间必要的通信。采用MapReduce架构可以使那些没有并行计算和分布式处理系统开发经验的程序员有效利用分布式系统的丰富资源。
从另一个角度来看:
- Map:负责把海量的输入数据分解成一个一个小的block(一般为16 - 64mb),然后集群上每台计算机对每以个分割的数据进行处理获取一组中间值。
- Reduce:把这些中间结果通过一定的函数进行处理来获取最终的答案。
1.2 Example:
- 有一组数据[1, 2, 3, ..., 1000]需要求它的的平方和:
- Map:首先把数据划分成1000份,每台机器只负责把自己接受的数据进行一次平方运算,[1,2,3,…,1000]就被映射成了[1,4,9,16,…,1000000]
- Reduce:把映射后得到的这1000个新的数据累加
- 计算一个大的文档集合中每个单词出现的次数,假设有3篇文章,分别为:
Paper1: We study algorithm.
Paper2: We share our thinking.
Paper3: This team shares thinking of algorithm.
根据MapReduce的模型,将这3篇文章交由3台处理器单独处理:
Map函数输出文档中的每个词、以及这个词的出现次数(在这个简单的例子里就是1)。Reduce函数把Map函数产生的每一个特定的词的计数累加起来。
Map:
处理之后对中间结果进行一定处理:(注意:此步骤仍然是用多台处理器分别完成)
Reduce:
将Map操作生成的中间值进行汇总,然后输出去最终结果
下面用伪代码描述上面的流程
Map函数输出文档中的每个词、以及这个词的出现次数(在这个简单的例子里就是1)。
Reduce函数把Map函数产生的每一个特定的词的计数累加起来
// key: document name
// value: document contents
map(String key, String value)
{
for each word w in value
EmitIntermediate(w, “1″);
}
// key: a word
// values: a list of counts
reduce(String key, Iterator values)
{
int result = 0;
for each v in values:
result += ParseInt(v);
}
二、实现流程(粗略概括,具体可参考Google-MapReduce文档)
2.1 执行概括:
通过将Map调用的输入数据自动分割为M个数据片段的集合,Map调用被分布到多台机器上执行。输入的数据片段能够在不同的机器上并行处理。使用分区函数将Map调用产生的中间key值分成R个不同分区。例如,(hash(key) mod R),Reduce调用也被分布到多台机器上执行。分区数量(R)和分区函数由用户来指定。
上图展示了MapReduce实现中操作的全部流程,可以归纳为以下步骤:
用户程序首先调用的MapReduce库将输入文件分成M个数据片度,每个数据片段的大小一般从 16MB-64MB(可以通过可选的参数来控制每个数据片段的大小)。然后用户程序在机群中创建大量的程序副本。
这些程序副本中的有一个特殊的程序–master。副本中其它的程序都是worker程序,由master分配任务。有M个Map任务和R个Reduce任务将被分配,master将一个Map任务或Reduce任务分配给一个空闲的worker。
被分配了map任务的worker程序读取相关的输入数据片段,从输入的数据片段中解析出key/value pair,然后把key/value pair传递给用户自定义的Map函数,由Map函数生成并输出的中间key/value pair,并缓存在内存中。
缓存中的key/value pair通过分区函数分成R个区域,之后周期性的写入到本地磁盘上。缓存的key/value pair在本地磁盘上的存储位置将被回传给master,由master负责把这些存储位置再传送给Reduce worker。
当Reduce worker程序接收到master程序发来的数据存储位置信息后,使用RPC从Map worker所在主机的磁盘上读取这些缓存数据。当Reduce worker读取了所有的中间数据后,通过对key进行排序后使得具有相同key值的数据聚合在一起。由于许多不同的key值会映射到相同的Reduce任务上,因此必须进行排序。如果中间数据太大无法在内存中完成排序,那么就要在外部进行排序。
Reduce worker程序遍历排序后的中间数据,对于每一个唯一的中间key值,Reduce worker程序将这个key值和它相关的中间value值的集合传递给用户自定义的Reduce函数。Reduce函数的输出被追加到所属分区的输出文件。
当所有的Map和Reduce任务都完成之后,master唤醒用户程序。在这个时候,在用户程序里的对MapReduce调用才返回。
在成功完成任务之后,MapReduce的输出存放在R个输出文件中(对应每个Reduce任务产生一个输出文件,文件名由用户指定)。一般情况下,用户不需要将这R个输出文件合并成一个文件–他们经常把这些文件作为另外一个MapReduce的输入,或者在另外一个可以处理多个分割文件的分布式应用中使用。
2.2 Master数据结构:
Master持有的数据包括:
- 每一个Map任务状态(空闲、工作中或完)
- 每一个Reduce任务状态(空闲、工作中或完)
- 对于每个已经完成的Map任务,master存储了Map任务产生的R个中间文件存储区域的大小和位置
- 当Map任务完成时,Master接收到位置和大小的更新信息,这些信息被逐步递增的推送给那些正在工作的Reduce任务。
2.3 容错
因为MapReduce库的设计初衷是使用由成百上千的机器组成的集群来处理超大规模的数据,所以,这个库必须要能很好的处理机器故障。
2.3.1 worker故障
主要步骤:
- master周期性的ping每个worker。如果在一个约定的时间范围内没有收到worker返回的信息,master将把这个worker标记为失效。
- 将失效worker(A)完成的任务重新初始化为空闲状态,然后交给其他worker(B)执行,如果是Map worker失败,则会通知所有执行Reduce任务的work,还没有从失败worker A读取数据的Reduce任务将从B读取数据。
- 当worker故障时,由于已经完成的Map任务的输出存储在这台机器上,Map任务的输出已不可访问了,因此必须重新执行。而已经完成的Reduce任务的输出存储在全局文件系统上,因此不需要再次执行。
2.3.2 master失败
master周期性的将上面描述的数据结构的写入磁盘,即检查点(checkpoint)。如果这个master任务失效了,可以从最后一个检查点(checkpoint)开始启动另一个master进程。
2.3.3 在失效方面的处理机制
重点是分布式实现在任何情况下的输出都和所有程序没有出现任何错误、顺序的执行产生的输出是一样的。
2.3.4 存储位置
尽量把输入数据(由GFS管理)存储在集群中机器的本地磁盘上来节省网络带宽。GFS把每个文件按64MB一个Block分隔,每个Block保存在多台机器上,环境中就存放了多份拷贝(一般是3个拷贝)。
MapReduce的master在调度Map任务时会考虑输入文件的位置信息,尽量将一个Map任务调度在包含相关输入数据拷贝的机器上执行;如果上述努力失败了,master将尝试在保存有输入数据拷贝的机器附近的机器上执行Map任务。
2.3.5 任务粒度
如前所述,我们把Map拆分成了M个片段、把Reduce拆分成R个片段执行。理想情况下,M和R应当比集群中worker的机器数量要多得多。在每台worker机器都执行大量的不同任务能够提高集群的动态的负载均衡能力,并且能够加快故障恢复的速度:失效机器上执行的大量Map任务都可以分布到所有其他的worker机器上去执行。
2.3.6 备份任务
有时候因为一台机器在最后几个Map或Reduce花费大量时间,导致整个MapReduce操作总的执行时间超过预期。
解决的办法之一:当一个MapReduce操作接近完成的时候,master调度备用(backup)任务进程来执行剩下的、处于处理中状态(in-progress)的任务。无论是最初的执行进程、还是备用(backup)任务进程完成了任务,我们都把这个任务标记成为已经完成。
三、MapReduce适合做什么
- 日志分析
- 排序
- 广告计算,广告优化、分析,点击流分析,链接分析
- 搜索关键字进行内容分类
- 搜索引擎,创建索引
- word 计数,统计值计算,统计数据,过滤,分析,查询
- 垃圾数据分析
- 数据分析
- 机器学习
- 数据挖掘