MapReduce执行过程及shuffle详解

        开始学习Hadoop时,经常会听到MapReduce。MapReduce由Map和Reduce两个阶段,每个阶段都是以键-值对作为输入和输出,Map阶段是将数据进行映射处理,形成键值对。reduce是对map处理后的键值对进行聚合等相关运算。比如经典的wordcount:map阶段是将文本内容拆分成(字段串,1)键值对,(hello,1)(world,1)(hello,1),reduce是将相同key的value值合并,形成(hello,2),(world,1)。但在整个处理中有数据排序、落盘、合并、shuffle等过程,希望通过下面内容可以帮助你深入理解这个过程。

     MapReduce处理流程回顾

        MapReduce是客户端需要执行的一个工作单元,包括输入数据、MapReduce程序和配置信息。在运行时Hadoop会将作业分成若干个任务(task)执行,任务🈶️map任务和reduce任务构成。这些任务在集群的节点(服务器)通过YARN进行调度,如果一个任务失败另一个不同节点会自动重新调用运行。

     Map过程

        Hadoop运行时对数据进行了分片,每个分片构建一个map任务(map数量具体确定细节查看https://www.jianshu.com/p/a691980f18d1)。通过对数据进行分片,每个分片的处理时间小于整个数据花费时间,如果这些map任务并行执行处理,并且每一个分片数据比较小,执行时间短,则可以通过mapreduce加速数据处理时间。但也不是每个分区数据越小越好,如果数据切分太小,管理分片总时间和构建map任务总时间远大于整体执行过程就得不偿失了。通常一个作业来说一个合理但分片大小默认是128MB,但也可以根据具体情况调整。

        map任务运行在由YARN进行集群统一资源管理调度的节点上,任务执行的最佳性能是数据在map任务运行的节点上,即“数据本地化优化”。但如果数据及其副本所在节点没有资源运行map任务,此时调度系统会尽量在数据所在集群寻找一个空闲的map槽(slot)运行map任务分片,数据和执行任务运行在不同的节点上,需要通过网络进行数据传输,严重影响运行时间。

        map任务运行完成后将输出结果写入本地磁盘,之所以不是HDFS,因为map的输出是中间结果,该结果通过reduce任务处理后才产生最终输出结果,一旦作业完成,map的输出结果就可以删除。数据如果存储在HDFS,一方面数据写性能差另外没有必要通过默认的3副本高可用存储数据,如果map任务节点宕机,可以通过在其他节点重新运行map任务构建中间结果。  

     reduce过程

        reduce任务并不具备数据本地化优势,单个reduce任务的输入通常来自多个map的输出,如果仅有一个reduce任务,其输入是所有map任务的输出。排序后的map输出通过网络传输发送到reduce任务节点,数据在reduce端合并处理。reduce的输出通常存储在HDFS中实现可靠存储。reduce任务不是由输入数据大小决定(reduce数量确定参考https://www.jianshu.com/p/a691980f18d1)。如果有多个reduce任务,每个map任务会针对输出键值对进行分区(partition),为每个reduce任务创建一个分区,每个分区有许多键,但每个键值对的键值对记录在同一个分区中,分区函数默认通过哈希函数决定。

图-一个reduce任务的MapReduce数据流
图-多个reduce任务数据流

多个reduce任务数据流map任务和reduce任务之间的数据流称为shuffle(混洗),每个reduce的输入都来自许多map任务,shuffle一般比图中所示更复杂,而且调整混洗参数对作业总执行时间影响非常大。

图-无reduce任务的MapReduce数据流

        集群可用带宽限制MapReduce作业数量,因此应该尽量避免map和reduce任务之间的数据传输,hadoop允许用户针对map任务输出执行一个combiner,进行部分数据map端合并减少数据传输数量,combiner属于优化方案,不管调用combiner多少次,reduce的输出都一样。

     MapReduce作业运行机制

mapreduce运行过程如下,由一下5个独立的实体:

1、客户端,提交mapreduce作业

2、YARN资源管理器,负责协调集群上计算机资源分配

3、YARN节点管理器,负责启动和监视集群中机器上的计算容器(container)

4、MapReduce的application master,负责协调运行MapReduce作业的任务,和MapReduce任务在容器中运行,这些容器由资源管理器分配,由节点管理器进行管理

5、分布式文件系统(HDFS),用来与其他实体间共享作业文件。

图-mapreduce作业工作原理

        这个执行过程包括下面几个阶段:

     作业提交

        job的submit() 方法创建一个内部JobSummiter实例,并调用submitJobInternal()方法,提交后waitForCompletion()每秒轮询作业解读,如果有变动,把进度报告到控制台。JobSummiter作业提交过程如下:

        1、向资源管理器申请一个新应用ID,用于MapReduce作业ID。如上图步骤2。

        2、检查作业的输出说明。如果没指定输出目录或输出目录已存在,作业不提交,错误抛给MapReduce程序。

        3、计算作业的输入分片,分片无法计算,错误返回给MapReduce程序。

        4、将运行作业所需资源(作业jar文件、配置文件和计算所得输入分片)复制到一个以作业ID命名的目录下共享文件系统(步骤3),作业JAR复本较多,在运行作业任务时集群中有很多复本可供节点管理器访问。

        5、通过调用资源管理器submitApplication()方法提交作业,如上图步骤4。

     作业初始化

        资源调度器收到调用它的submitApplication()消息后,将请求传递给YARN调度器(scheduler),调度器分配一个容器,然后资源管理器在节点管理器管理下在容器中启动application master进程(步骤5a和5b)。

        application master是一个java应用程序,主类是MRAppMaster,将接受来自任务进度和完成报告(步骤6),application master对作业的初始化通过创建多个薄记对象保持对作业进度跟踪。然后接受来自共享文件系统的、在客户端计算的输入分配,对每个分片创建一个map任务对象以及由mapreduce.job.reduces属性确定多个reduce对象。

     任务分配

        application master 为作业中所有map任务和reduce任务向资源管理器请求容器(步骤8),首先Map任务发出请求,该请求高于reduce任务请求,因为reduce必须在所有map任务完成后才能启动。

        reduce任务可以运行在集群任意位置,但map任务请求有着数据本地化局限,在理想情况下任务是数据本地化,任务可以和分片在一个节点运行。如果节点资源无法启动map任务,map任务可能是机架本地化(rack local),即数据和任务运行在同一机架而非同一节点。一些map任务既不是数据本地化,也不是机器本地化,从别的机架获取运行数据。请求会为任务分配内存需求和cpu数,默认情况下,每个map任务和reduce任务分配1024MB内存和虚拟内核,这些值可通过mapreduce.map.memory.mb(map任务需要内存)、mapreduce.reduce.memory.mb(reduce任务需要内存)、mapreduce.map.cpu.vcores(map任务需要内核数)、mapreduce.reduce.cpu.vcores

      任务执行

        一旦资源管理器的调度器为任务分配特定节点上容器,application master通过与节点管理器通信启动容器(步骤9a、9b),任务由YarnChild执行。在运行任务之前,需要将作业配置、JAR文件等资源本地化。YarnChild在指定JVM中运行,故map或reduce函数任何缺陷都不会影响节点管理器。

      进度和状态更新

        MapReduce作业是长时间运行批量作业,运行范围从数秒到数小时,如何得知作业进展就很重要。作业和任务关注的进展有:作业或任务状态(运行中、成功完成、失败)、map和reduce进度、作业计数器值、状态信息。当map和reduce任务运行时,子进程和application master 通过umbilical接口通信,每隔3秒,umbilical接口向application master 报告进度和状态,会形成作业汇聚试图。

        客户端可以使用Job的getStatus()得到JobStatus实例,包含作业所有状态信息。流程如下:

图-mapreduce作业状态更新传递流程

      作业完成

        application master收到作业最后一个任务已完成通知,把作业状态设置为“成功”,在Job轮询状态时,知道任务已完成,从waitForCompletion()方法返回。

     shuffle和排序

        MapReduce确保每个reduce的输入都是按键排序的,系统执行排序、将map输出作为输入传给reduce过程叫shuffle。

     map端

        map函数利用缓冲方式将结果写到内存并出于效率考虑进行预排序,过程如下:  

图-MapReduce的shuffle和排序

        每个map任务都有一个环形内存缓冲区存储任务输出,默认情况下,缓冲区大小是100MB,一旦缓冲区内容达到阈值,一个后台线程把内容溢出(spill)到磁盘。如果map输出写入缓冲区被填满,map会被阻塞直到写磁盘过程完成。溢出写过程将缓冲区内容写到mapreduce.cluster.local.dir属性在作业特定子目录指定下目录。

        在写磁盘前,线程根据数据最终要传的reduce,把数据划分成相应的分区(partition),在每个分区中后台数据进行内存排序,如果有combiner函数,在排序后运行combiner函数使map输出结果更紧簇,减少写到磁盘数据和传递给reduce数据。

        每次达到溢出阈值,会新建一个溢出文件(spill file),在map任务写完最后一个输出记录后,会有多个溢出文件,在任务完成之前,溢出文件被合并成一个已分区且已排序输出文件。

        mapreduce.map.combine.minspills属性设置combiner再次运行时溢出文件的要求,默认至少3个。

        在将map输出到本地节点磁盘时可以对数据进行压缩,节约磁盘空间,减少传递给reduce数据量,可以通过mapreduce.map.output.compress设置,mapreduce.map.output.compress.codec指定压缩库。reduce通过HTTP得到输出文件分区。

      reduce端

        map输出文件位于本地磁盘,所有map任务完成时,会通知给application master,reduce中一个线程定期询问master获取map输出主机位置,直到获取所有输出位置, reduce任务开始复制map输出。复制完成后reduce任务进入排序阶段,该阶段将合并map输出,维持其顺序排序。在reduce阶段,直接把数据输入给reduce函数,省略一次磁盘往返行程。

图-reduce端文件合并

      shuffle配置调优

        通过上面shuffle过程解读,可知shuffle过程在mapreduce整个过程中起着关键端作用,优化好shuffle过程可以提升mapreduce性能。下面从map和reduce端梳理shuffle优化方法。

      map端shuffle调优

        mapreduce.task.io.sort.mb    排序map输出时所使用的内存缓冲区大小,默认是100MB。

        mapreduce.map.sort.spill.percent  map输出内存缓冲区和开始溢出写过程阈值

        mapreduce.task.io.sort.factor   排序文件时一次最多合并流数

        mapreduce.map.combine.minspills  运行combiner所需最少溢出文件数

        mapreduce.map.output.compress    是否压缩map输出

        mapreduce.map.output.compress.codec   指定map输出压缩解码器

        mapreduce.shuffle.max.threads    用于将map输出到reduce的工作线程数,表示的是整个集群范围的设置。

      reduce端shuffle调优

        mapreduce.reduce.shuffle.parallelcopies 用于把map输出复制到reduce线程数

        mapreduce.reduce.shuffle.maxfetchfailures reduce获取一个map输出所花费最大时间

        mapreduce.task.io.sort.factor 排序文件时一次最多合并流数 

        mapreduce.reduce.shuffle.input.buffer.percent     在shuffle复制阶段,分配给map输出的缓冲区占堆空间百分比。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,670评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,928评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,926评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,238评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,112评论 4 356
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,138评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,545评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,232评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,496评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,596评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,369评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,226评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,600评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,906评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,185评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,516评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,721评论 2 335

推荐阅读更多精彩内容

  • “你做了恶梦?”“梦是梦,恶梦跟美梦有什么分别?”我虚弱地问。 “你为什么不哭?”他问。“哭有什么帮助?”“你应该...
    温柔亦有力量阅读 951评论 0 0
  • note 1:python中函数声明 def 函数名(形参列表): 。函数没有标明函数的开始结束花括号。函数名下的...
    君宝儿阅读 227评论 0 0
  • 不知从何时起,西方的情人节同样也在中国盛行起来,朋友圈晒花的,晒物的,晒钱的,吃的喝的真是玲琅满目。 人的生活需要...
    灵滴阅读 420评论 0 2
  • 咳嗽越来越严重,心中颇不宁静,强行让自己平静,完成每日的任务。 人们常说“相由心生”,好事源于好心情,病情的恶化是...
    焚心之劫阅读 151评论 0 0
  • 纠结对一个人来说是一种折磨 纠结在心头的事儿 拽也拽不掉 推也推不开 纠结在大脑的事儿 随时随地都会在想着那事儿 ...
    茕灺阅读 293评论 4 11