本文是Hadoop组件之MapReduce的学习总结性文章。因本人非技术出身,所学均来源于网络,难免有不严谨甚至错误之处,恳请大家指正。
我们借用本系列第一篇文章内举的关于MapReduce的小例子来开始本次分享。
张三接到一个统计去年全年购买过“汇源肾宝”的用户所处年龄段的任务。
假设当时他是这样做的:写了一个程序,先遍历去年的数据文件,然后找到在去年所有购买过“汇源肾宝”的用户,并且记录下他们的年龄,最后把符合各年龄段的用户数进行汇总统计,就可以得到各个年龄段的用户数的个数。但是这种方法比较耗时,毕竟就一个程序在执行这个任务。当然他也可以编写一个多线程程序,并发的去执行这个任务,这种方法肯定比前面提到的方法效率高,但是写一个多线程程序要困难的多,不仅需要同步共享数据,还要防止两个线程重复统计,这对于张三来说肯定是一个有难度、并且工作量大的任务。
但是在hadoop框架下,可以使用MapReduce来执行这个任务就容易的多,大致过程为:数据文件被分成若干部分存在HDFS上(分布在不同的计算机中),然后每一台计算机执行一个MapReduce去计算他保存的这部分数据,找出去年购买过“汇源肾宝”的用户,并且把他们的年龄记录下来。比如计算机A统计完他的数据后,20岁的用户数有20个、30岁的用户数有30个,计算机B统计完他的数据后,20岁的用户数有22个、30岁的用户数有30个、33岁的用户数有60个......,这个阶段可以简单的理解为map阶段。然后把所有的计算机统计的数据进行汇总就可以得到所有年龄的用户数分别是多少个,假设最后汇总结果为,20岁 :1000个、30岁 :700个、33岁 :1200个,那么这个阶段可以简单的理解为reduce阶段。在整个过程中张三只需要实现Map和Reduce这两个函数(即按照什么键值对去统计、汇总),不用去管数据文件如何切分、统计不重复、汇总不遗漏等等。所以这种分布式运算的方式肯定比原来的更快,而且由于底层的技术实现封装,又保证了分布式运算的准确性、便利性。
什么是MapReduce?
MapReduce是一个分布式计算框架,作为产品人员我觉得也可以简单粗暴的理解为他就是用于计算HDFS上数据文件的计算引擎。即HDFS是存储大数据的,MapReduce是进行大数据运算的。
既然是做计算的,那么就需要有数据输入(input),经过MapReduce运算后也会有个输出(output),这个输出就是我们所需要的结果。
一个完整的MapReduce计算任务主要有六个阶段(我这样划分仅是为了更方便理解,当然相对严谨的是input包含了split):
1.input阶段获取输入数据
2.split阶段对数据进行分片作为map的输入
3.map阶段过程对某种输入格式的一条记录解析成一条或多条记录
4.shffle阶段对中间数据的控制,作为reduce的输入
5.reduce阶段对相同key的数据进行合并
6.output阶段按照格式输出到指定目录
整个运算任务一般只需要程序员定义好这map函数和reduce函数即可(前面也有提到)
接下来就用图文的方式来讲解下一个MapReduce任务的具体过程,备注:本文仅针对MapReduce1.0做讲解
假设我们HDFS上存了一个如上图所示的文件,这个文件中记录了A公司去年里所有买了“汇源肾宝”的用户的年龄。然后我们使用MapReduce来统计出各个年龄的人数。
1.input阶段:读取文件数据。略微扩展下:Input是读取数据的总接口,默认使用FileInputFomart类,因为读取的数据的类型不同(日志文件、二进制格式文件、数据库表等等),所以FileInputFomart有多个实现类来处理不同类型的数据,甚至可以自定义实现类。
2.split阶段将要处理的数据进行逻辑上的切片划分,每一个切片(split)都对应一个mapTast任务,也就是说,将数据切成几片,就有几个mapTast任务。心细的朋友也许会想到这里的切分与HDFS存储数据文件时切分的Block有什么关系。那我也做个简单的介绍,hadoop在默认的情况下,Split和Block的大小是一样的(默认128M),这样容易造成误解认为两者是一样的,但是其实他们是2个概念,split是MapReduce里的概念,是切片的概念,split是逻辑切片,它只包含一些元数据信息(数据起始位置、数据长度、数据所在的节点等) ;而block是hdfs中切块的大小,block是物理切块。默认情况下一个split对应一个block,但是实际情况可能是一个split对应多个block。有兴趣的朋友可以去这里做详细了解 Hadoop Block 与 InputSplit 的区别与联系。
3.map阶段此阶段,就是执行mapTast任务的过程,可简单粗暴的理解为运行多个程序(程序的数量与切片数相等,且这个程序包含程序员所写的map()函数,这个函数的作用是获取给定文件中一行数据,对其分词后,依次输出用户年龄)去读取切片对应的数据,然后输出<20岁,1>、<30岁,1>这样的键值对。
4.shffle阶段简单的理解为分组吧(如果了解SQL的,可以类比成group by),即将map的输出经过“整理”后给到reduce,分为map端操作和reduce端操作,这个阶段是不需要程序员来做什么的,里面的代码逻辑已经被大佬们封装完毕。由于比较复杂,我个人也就没有过多去了解了。
5.reduce阶段简单的理解为对分组后的数据进行汇总(同样可类比成分组后的 count)。当然这个阶段也会运行程序员写的reduce()函数,他的作用是将相同的年龄聚集在一起,然后统计每个年龄出现的总次数,得出<20岁,1>、<30岁,1>、<33岁,1>这样的键值对。
6.output阶段按照输出文件的格式,将每个键值对作为结果输出。
MapReduce 1.0架构
JobTracker是Map-Reduce框架中心(就像上一篇HDFS中NameNode一样,把它看成指挥官),主要负责资源监控和作业调度(管理哪些程序应该跑在哪些些机器上)。JobTracker监控所有TaskTracker与作业的健康状况,一旦发现任务失败情况,其会将相应的任务转移到其他机器上执行;同时JobTracker会跟踪任务的执行进度、资源(主要就是IO、网络、磁盘)使用量等,并将这些信息告诉给任务调度器(Task Scheduler),而调度器会在资源出现空闲时,选择合适的任务使用这些资源。
TaskTracker会周期性地将本节点上资源的使用情况和任务的运行进度汇报给JobTracker,同时接收JobTracker发送过来的命令并执行相应操作(如启动新任务、杀死任务等)。
最后做个总结:
MapReduce是一个分布式计算框架,具有易编程、高容错性、高吞吐的特性。易编程主要是因为他提供了非常易用的编程接口,程序员只需要编写几个简单的函数就可以实现分布式程序,而其他比较复杂的工作,比如节点间的通信、节点失效、数据切分等,全部由MapReduce运行环境完成,程序员们不用关注这些,只需把精力放在业务逻辑上即可。高容错性主要是采取了计算迁移、数据迁移等策略提高集群的可用性与容错性。高吞吐率一个分布式系统通常需要在高吞吐率与低延迟之间做权衡,而MapReduce选择了高吞吐率,即利用分布式并行技术,使用多机资源,一次读取/写入数据。同时也造成了其效率比较低,所以它一般用于离线计算,而实时计算一般使用Spark、Flink。
传送门
Hadoop系列文章(一)数据产品经理有必要了解的Hadoop
Hadoop系列文章(二)数据产品经理有必要了解的HDFS
Hadoop系列文章(四)数据产品经理有必要了解的YARN