MapReduce On YARN

我们知道,MRv1主要由编程模型(MapReduce API)、资源管理与作业控制模块(由JobTracker和TaskTracker组成)和数据处理引擎(由MapTask和ReduceTask组成)三部分组成。而YARN出现后,资源管理模块则交由YARN实现,这样为了让MapReduce框架运行在YARN上,仅需实现一个ApplicationMaster组件完成作业控制模块功能即可,其他部分,包括编程模型和数据处理引擎等,可直接采用MRv1原有的实现。下面将详细介绍MapReduce On YARN(即MRv2)的基本架构、模块组成以及各模块的实现。

MapReduce On YARN与MRv1在编程模型和数据处理引擎方面的实现是一样的,唯一的不同是运行时环境。不同于MRv1中由JobTracker和TaskTracker构成的运行时环境,MapReduce On YARN的运行时环境由YARN与ApplicationMaster构成,这种新颖的运行时环境使得MapReduce可以与其他计算框架运行在一个集群中,从而达到共享集群资源、提高资源利用率的目的。随着YARN的成熟与完善,MRv1的独立运行模式将被MapReduce On YARN取代。

MRAppMaster是MapReduce的ApplicationMaster实现,它使得MapReduce应用程序可以直接运行于YARN之上。在YARN中,MRAppMaster负责管理MapReduce作业的生命周期,包括作业管理、资源申请与再分配、Container启动与释放、作业恢复等。本节将介绍MRAppMaster基本构成。

如图8-1所示,MRAppMaster主要由以下几种组件/服务构成。


image.png
  • ContainerAllocator。与ResourceManager通信,为MapReduce作业申请资源。作业的每个任务资源需求可描述为5元组<Priority,hostname,capability,containers,relax_locality >,分别表示作业优先级、期望资源所在的host、资源量(当前支持内存和CPU两种资源)、Container数目、是否松弛本地性。ContainerAllocator周期性通过RPC与ResourceManager通信,而ResourceManager则通过心跳应答的方式为之返回已经分配的Container列表、完成的Container列表等信息

  • ClientService。ClientService是一个接口,由MRClientService实现。MRClientService实现了MRClientProtocol协议,客户端可通过该协议获取作业的执行状态(不必通过ResourceManager)和控制作业(比如杀死作业、改变作业优先级等)。

  • Job。Job表示一个MapReduce作业,与MRv1的JobInProgress功能一样,负责监控作业的运行状态。它维护了一个作业状态机,以实现异步执行各种作业相关的操作。

  • Task。Task表示一个MapReduce作业中的某个任务,与MRv1中的TaskInProgress功能类似,负责监控一个任务的运行状态。它维护了一个任务状态机,以实现异步执行各种任务相关的操作

  • TaskAttempt。TaskAttempt表示一个任务运行实例,它的执行逻辑与MRv1中的MapTask和ReduceTask运行实例完全一致,实际上,它直接使用了MRv1的数据处理引擎,但经过了一些优化,正是由于它与MRv1的数据处理引擎一样,它对外提供的编程接口也与MRv1完全一致,这意味着MRv1的应用程序可直接运行在YARN之上。

TaskCleaner。TaskCleaner负责清理失败任务或者被杀死任务使用的目录和产生的临时结果(可统称为垃圾数据),它维护了一个线程池和一个共享队列,异步删除任务产生的垃圾数据。

  • Speculator。Speculator完成推测执行功能。当同一个作业的某个任务运行速度明显慢于其他任务时,Speculator会为该任务启动一个备份任务,让它与原任务同时处理同一份数据,谁先计算完成则将谁的结果作为最终结果,并将另一个任务杀掉。该机制可有效防止那些“拖后腿”任务拖慢整个作业的执行进度

  • ContainerLauncher。ContainerLauncher负责与NodeManager通信,以启动一个Container。当ResourceManager为作业分配资源后,ContainerLauncher会将任务执行相关信息填充到Container中,包括任务运行所需资源、任务运行命令、任务运行环境、任务依赖的外部文件等,然后与对应的NodeManager通信,要求它启动Container

  • TaskAttemptListener。TaskAttemptListener负责管理各个任务的心跳信息,如果一个任务一段时间内未汇报心跳,则认为它死掉了,会将其从系统中移除。同MRv1中的TaskTracker类似,它实现了TaskUmbilicalProtocol协议,任务会通过该协议汇报心跳,并询问是否能够提交最终结果

  • JobHistoryEventHandler。JobHistoryEventHandler负责对作业的各个事件记录日志,比如作业创建、作业开始运行、一个任务开始运行等,这些日志会被写到HDFS的某个目录下,这对于作业恢复非常有用。当MRAppMaster出现故障时,YARN会将其重新调度到另外一个节点上。为了避免重新计算,MRAppMaster首先会从HDFS上读取上次运行产生的日志,以恢复已经运行完成的任务,进而能够只运行尚未运行完成的任务

事件与事件处理器

YARN使用了基于事件驱动的异步编程模型,它通过事件将各个组件联系起来,并由一个中央异步调度器统一将各种事件分配给对应的事件处理器。在MRAppMaster中,每种组件是一个事件处理器,当MRAppMaster启动时,它们会以服务的形式注册到MRAppMaster的中央异步调度器上,并告诉调度器它们处理的事件类型。这样当出现某一种事件时,MRAppMaster会查询<事件,事件处理器>表,并将该事件分配给对应的事件处理器

MapReduce客户端

MapReduce客户端是MapReduce用户与YARN(和MRAppMaster)进行通信的唯一途径,通过该客户端,用户可以向YARN提交作业,获取作业的运行状态和控制作业(比如杀死作业、杀死任务等)。MapReduce客户端涉及两个RPC通信协议

  • ApplicationClientProtocol。在YARN中,ResourceManager实现了ApplicationClient-Protocol协议,任何客户端需使用该协议完成提交作业、杀死作业、改变作业优先级等操作
  • MRClientProtocol。当作业的ApplicationMaster成功启动后,它会启动MRClient-Service服务,该服务实现了MRClientProtocol协议,从而允许客户端直接通过该协议与ApplicationMaster通信以控制作业和查询作业运行状态,以减轻Resource-Manager负载。

在YARN中,应用程序(作业)的运行过程包括两个步骤:启动Application-Master和运行应用程序(作业)内部的各类任务,其中,ApplicationMaster是由ResourceManager直接与NodeManager通信而启动的,在它启动起来之前,客户端只能与ResourceManager交互以查询作业相关信息,在它启动起来之前,客户端只能与ResourceManager交互以查询作业相关信息。一旦作业的ApplicationMaster成功启动,客户端可直接与它交互以查询作业信息和控制作业。接下来,我们分别介绍这两个通信协议

  • ApplicationClientProtocol协议
    MapReduce客户端通过该协议与ResourceManager通信,以提交应用程序和查询集群信息。ResourceManager用Application表示用户提交的作业,并提供了以下接口供用户使用:
public interface ApplicationClientProtocol {
  public GetNewApplicationResponse getNewApplication( //获取一个Application ID
    GetNewApplicationRequest request)throws YarnRemoteException;
  public SubmitApplicationResponse submitApplication( //提交一个Application
    SubmitApplicationRequest request) throws YarnRemoteException;
  public KillApplicationResponse forceKillApplication( //杀死一个Application
    KillApplicationRequest request) throws YarnRemoteException;
  public GetApplicationReportResponse getApplicationReport( //获取Application运行报告
    GetApplicationReportRequest request) throws YarnRemoteException;
  public GetClusterMetricsResponse getClusterMetrics( //获取集群所有Metric
    GetClusterMetricsRequest request) throws YarnRemoteException;
  public GetAllApplicationsResponse getAllApplications( //列出所有Application
    GetAllApplicationsRequest request) throws YarnRemoteException;
  public GetClusterNodesResponse getClusterNodes( //获取集群中所有节点
    GetClusterNodesRequest request) throws YarnRemoteException;
  public GetQueueUserAclsInfoResponse getQueueUserAcls( //获取用户访问控制权限
    GetQueueUserAclsInfoRequest request) throws YarnRemoteException;
  ...
}
MRClientProtocol协议

MRAppMaster实现了MRClientProtocol协议为客户端提供服务,该协议提供了作业和任务的查询和控制接口,主要如下:

public interface MRClientProtocol {
  ...
    public GetJobReportResponse getJobReport(GetJobReportRequest request) throws YarnRemoteException;//获取作业运行报告
  public GetTaskReportResponse getTaskReport(GetTaskReportRequest request) throws YarnRemoteException;//获取所有任务运行报告
  public GetTaskAttemptReportResponse getTaskAttemptReport(GetTaskAttemptReportRequest request) throws YarnRemoteException;//获取所有任务实例的运行报告
  public GetCountersResponse getCounters(GetCountersRequest request) throws YarnRemoteException;//获取所有Counter
  public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(GetTaskAttemptCompletionEventsRequest request) throws YarnRemoteException;//获取所有运行完成的任务
  public GetTaskReportsResponse getTaskReports(GetTaskReportsRequest request) throws YarnRemoteException;//获取所有任务运行报告
  public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request) throws YarnRemoteException;//获取作业诊断信息
  public KillJobResponse killJob(KillJobRequest request) throws YarnRemoteException;//杀死一个作业
  public KillTaskResponse killTask(KillTaskRequest request) throws YarnRemoteException;//杀死一个任务
  public KillTaskAttemptResponse killTaskAttempt(KillTaskAttemptRequest request) throws YarnRemoteException;//杀死一个任务实例
  public FailTaskAttemptResponse failTaskAttempt(FailTaskAttemptRequest request) throws YarnRemoteException;//让一个任务实例运行失败
  ...
}
MRAppMaster工作流程

按照作业大小不同,MRAppMaster提供了三种作业运行模式:本地模式(通常用于作业调试,同MRv1一样,不再赘述)、Uber模式和Non-Uber 模式。对于小作业,为了降低其延迟,可采用Uber模式,在该模式下,所有Map Task和Reduce Task在同一个Container(MRAppMaster所在Container)中顺次执行;对于大作业,则采用Non-Uber 模式,在该模式下,MRAppMaster先为Map Task申请资源,当Map Task运行完成数目达到一定比例后再为Reduce Task申请资源
1)Uber运行模式
为了降低小作业延时,YARN专门对小作业运行方式进行了优化。对于小作业而言,MRAppMaster无须再为每个任务分别申请资源,而是让其重用一个Container,并按照先Map Task后Reduce Task的运行方式串行执行每个任务。在YARN中,如果一个MapReduce作业同时满足以下条件,则认为是小作业,可运行在Uber模式下:

  • Map Task数目不超过mapreduce.job.ubertask.maxmaps(默认是9)。
  • Reduce Task数目不超过mapreduce.job.ubertask.maxmaps(默认是1)。
  • 输入文件大小不超过mapreduce.job.ubertask.maxbytes(默认是一个Block大小)。
  • Map Task和Reduce Task需要的资源量不超过MRAppMaster可使用的资源量。

2)Non-Uber运行模式

在大数据环境下,Uber运行模式通常只能覆盖到一小部分作业,而对于其他大多数作业,仍将运行在Non-Uber模式下。在Non-Uber模式下,MRAppMaster将一个作业的Map Task和Reduce Task分为四种状态

  • pending:刚启动但尚未向ResourceManager发送资源请求
  • scheduled:已经向ResourceManager发送资源请求但尚未分配到资源。
  • assigned:已经分配到了资源且正在运行。
  • completed:已经运行完成。

对于Map Task而言,它的生命周期为scheduled →assigned→completed;而对于Reduce Task而言,它的生命周期为pending →scheduled →assigned→completed。由于Reduce Task的执行依赖于Map Task的输出结果,因此,为避免Reduce Task过早启动造成资源利用率低下,MRAppMaster让刚启动的Reduce Task处于pending状态,以便能够根据Map Task运行情况决定是否对其进行调度。在YARN之上运行MapReduce作业需要解决两个关键问题:如何确定Reduce Task启动时机以及如何完成Shuffle功能。

由于YARN中不再有Map Slot和Reduce Slot的概念,且RedouceManager也不知道Map Task与Reduce Task之间存在依赖关系,因此,MRAppMaster自己需设计资源申请策略以防止因Reduce Task过早启动造成资源利用率低下和Map Task因分配不到资源而“饿死”。MRAppMaster在MRv1原有策略(Map Task完成数目达到一定比例后才允许启动Reduce Task)基础上添加了更为严格的资源控制策略和抢占策略。总结起来,Reduce Task启动时机由以下三个参数控制:

  • mapreduce.job.reduce.slowstart.completedmaps:当Map Task完成的比例达到该值后才会为Reduce Task申请资源,默认是0.05。
  • yarn.app.mapreduce.am.job.reduce.rampup.limit:在Map Task完成前,最多启动的Reduce Task比例,默认为0.5。
  • yarn.app.mapreduce.am.job.reduce.preemption.limit:当Map Task需要资源但暂时无法获取资源(比如Reduce Task运行过程中,部分Map Task因结果丢失需重算)时,为了保证至少一个Map Task可以得到资源,最多可以抢占的Reduce Task比例,默认为0.5。

按照MapReduce的执行逻辑,Shuffle HTTP Server应该分布到各个节点上,以便能够支持各个Reduce Task远程复制数据。然而,由于Shuffle是MapReduce框架中特有的一个处理流程,从设计上讲,不应该将它直接嵌到YARN的某个组件(比如NodeManager)中。

YARN采用了服务模型管理各个对象,且多个服务可以通过组合的方式交由一个核心服务进行统一管理。在YARN中,NodeManager作为一种组合服务模式,允许动态加载应用程序临时需要的附属服务,利用这一特性,YARN将Shuffle HTTP Server组装成了一种服务,以便让各个NodeManager启动时加载它。

当用户向YARN中提交一个MapReduce应用程序后,YARN将分两个阶段运行该应用程序:第一个阶段是由ResourceManager启动MRAppMaster;第二个阶段是由MRAppMaster创建应用程序,为它申请资源,并监控它的整个运行过程,直到运行成功。如图8-3所示,YARN的工作流程分为以下几个步骤:

步骤1 用户向YARN中提交应用程序,其中包括MRAppMaster程序、启动MRApp-Master的命令、用户程序等。

步骤2 ResourceManager为该应用程序分配第一个Container,并与对应的NodeManager通信,要求它在这个Container中启动应用程序的MRAppMaster。

步骤3 MRAppMaster启动后,首先向ResourceManager注册,这样用户可以直接通过ResourceManager查看应用程序的运行状态,之后,它将为内部任务申请资源,并监控它们的运行状态,直到运行结束,即重复步骤4~7。

步骤4 MRAppMaster采用轮询的方式通过RPC协议向ResourceManager申请和领取资源。

步骤5 一旦MRAppMaster申请到资源后,则与对应的NodeManager通信,要求它启动任务。

步骤6 NodeManager为任务设置好运行环境(包括环境变量、JAR包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务

步骤7 各个任务通过RPC协议向MRAppMaster汇报自己的状态和进度,以让MRAppMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务
在应用程序运行过程中,用户可随时通过RPC向MRAppMaster查询应用程序的当前运行状态。

步骤8 应用程序运行完成后,MRAppMaster向ResourceManager注销并关闭自己。


image.png
MR作业生命周期及相关状态机

MR作业生命周期
在正式介绍作业(Job)生命周期之前,先要了解MRAppMaster中作业表示方式。如图8-4所示,与MRv1一样,MRAppMaster根据InputFormat组件的具体实现(通常是根据数据量切分数据),将作业分解成若干个Map Task和Reduce Task,其中每个Map Task处理一片InputSplit数据,而每个Reduce Task则进一步处理Map Task产生的中间结果。每个Map/Reduce Task只是一个具体计算任务的描述,真正的任务计算工作则是由运行实例TaskAttempt完成的,每个Map/Reduce Task可能顺次启动多个运行实例,比如第一个运行实例失败了,则另起一个实例重新计算,直到这一份数据处理完成或者达到尝试次数上限;也可能同时启动多个运行实例,让它们竞争同时处理一片数据。在MRAppMaster中,上述Job、Task和TaskAttempt的生命周期均由一个有限状态机描述,其中TaskAttempt是实际进行任务计算的组件,其他两个只负责监控和管理。正是由于MRAppMaster中的TaskAttempt重用了MRv1中的代码,MRv1中的MapReduce应用程序可直接运行在YARN上.

image.png

作业的创建入口在MRAppMaster类中,如下所示:

public class MRAppMaster extends CompositeService {
  public void start() {
    ...
    job = createJob(getConfig());//创建Job
    JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
    jobEventDispatcher.handle(initJobEvent);//发送JOB_INI,创建MapTask、ReduceTask
    startJobs();//启动作业,这是后续一切动作的触发之源
    ...
  }
  protected Job createJob(Configuration conf) {
    Job newJob =
    new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
                taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
                completedTasksFromPreviousRun, metrics, committer, newApiCommitter,
                currentUser.getUserName(), appSubmitTime, amInfos, context);
    ((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
    dispatcher.register(JobFinishEvent.Type.class,
                        createJobFinishEventHandler());     
    return newJob;
  }
}

之后,MapReduce作业将依次经历作业/任务初始化和作业启动两个阶段。
(1)作业/任务初始化
JobImpl首先接收到JOB_INIT事件,然后触发调用函数InitTransition(),进而导致作业状态从NEW变为INITED,InitTransition函数主要工作是创建Map Task和Reduce Task,代码如下

public static class InitTransition 
implements MultipleArcTransition<JobImpl, JobEvent, JobState> {
  ...
  createMapTasks(job, inputLength, taskSplitMetaInfo);
  createReduceTasks(job);
  ...
}

其中,createMapTasks函数实现如下:

private void createMapTasks(JobImpl job, long inputLength,
                            TaskSplitMetaInfo[] splits) {
  for (int i=0; i < job.numMapTasks; ++i) {
    TaskImpl task =
    new MapTaskImpl(job.jobId, i,
                    job.eventHandler, 
                    job.remoteJobConfFile, 
                    job.conf, splits[i], 
                    job.taskAttemptListener, 
                    job.committer, job.jobToken, job.fsTokens,
                    job.clock, job.completedTasksFromPreviousRun, 
                    job.applicationAttemptId.getAttemptId(),
                    job.metrics, job.appContext);
    job.addTask(task);
  }
}

(2)作业启动
启动作业的代码如下

public class MRAppMaster extends CompositeService {
  protected void startJobs() {
    JobEvent startJobEvent = new JobEvent(job.getID(), JobEventType.JOB_START);
    dispatcher.getEventHandler().handle(startJobEvent);
  }
}

JobImpl接收到JOB_START事件后,将执行函数StartTransition(),进而触发Map Task和Reduce Task的调度执行,同时使得作业状态从INITED变为RUNNING,具体如下:

public static class StartTransition
implements SingleArcTransition<JobImpl, JobEvent> {
  public void transition(JobImpl job, JobEvent event) {
    job.scheduleTasks(job.mapTasks);  
    job.scheduleTasks(job.reduceTasks);
  }
}

之后,每个Map Task和Reduce Task负责各自的状态变化,ContainerAllocator模块会首先为Map Task申请资源,然后是Reduce Task,一旦一个Task获取到了资源,就会创建一个运行实例Task Attempt。如果该实例运行成功,则Task运行成功,否则,Task还会启动下一个运行实例Task Attempt,直到一个Task Attempt运行成功或者达到尝试次数上限。当所有Task运行成功后,Job运行成功。一个运行成功的作业/任务所经历的状态变化(不包含失败或者被杀死情况)

Task状态机

Task状态机维护了一个任务的生命周期,即从创建到运行结束整个过程。一个任务可能存在多次运行尝试,每次运行尝试被称为一个“运行实例”,Task状态机则负责管理这些运行实例。Task状态机由TaskImpl类实现,其主要包括7种状态和9种导致状态发生变化的事件
(1)Task状态
Task状态包括

  • NEW:任务初始状态。当一个任务被创建时,状态被置为NEW

  • SCHEDULED:任务开始被调度时所处的状态。该状态意味着,MRAppMaster开始为任务(向ResourceManager)申请资源,但任务尚未获取到资源。

  • RUNNING:任务的一个实例开始运行。该状态意味着,MRAppMaster已经为该任务申请到资源(Container),并与对应的NodeManager通信成功启动了Container。需要注意的,在某一时刻,一个任务可能有多个运行实例,且可能存在运行失败的运行实例

  • SUCCEEDED:任务的一个实例运行成功。该状态意味着,该任务成功运行并完成。需要注意的,只要任务的一个实例运行成功,则意味着该任务运行成功。

  • KILLED:任务被杀死所处的状态。当一个任务的所有运行实例被杀死后,才认为该任务被杀死。

  • FAILED:任务运行失败所处的状态。每个任务的运行实例数目有一定上限,一旦超过该上限,才认为该任务运行失败,其中,Map Task运行实例数目上限由参数mapreduce.map.maxattempts指定,默认值为4,Reduce Task运行实例数目上限由参数mapreduce.reduce.maxattempts指定,默认值为4。需要注意的是,一个任务运行失败并不一定会导致整个作业运行失败,这同样取决于作业的错误容忍率(默认是0)

TaskAttempt状态机

在YARN中,任务实例是运行在Container中的,因此,Container状态变化往往伴随任务实例的状态变化,比如任务实例运行完成后,会清理Container占用的空间,而Container空间的清理实际上就是任务实例空间的清理。目前每个任务存在两种类型的实例:Avataar.VIRGIN和Avataar.SPECULATIVE,分别表示原始任务和备份任务(通过推测执行机制启动的)。

(1)TaskAttempt状态
TaskAttmpt状态包括:

  • NEW:任务实例初始状态。当一个任务实例被创建时,状态被置为NEW。
  • UNASSIGNED:等待分配资源所处的状态。当一个任务实例被创建后,它会发出一个资源申请请求,等待MRAppMaster为它申请和分配资源
  • ASSIGNED:任务实例获取到一个Container。
  • RUNNING:任务实例成功启动。MRAppMaster将资源分配给某个任务后,会与对应的NodeManager通信,以启动Container。只要Container启动成功,则任务实例启动成功
  • COMMIT_PENDING:任务实例等待提交最终结果。任务实例运行完成后,需向MRAppMaster请求提交最终结果,一旦提交成功后,该任务的其他实例将被杀死
  • SUCCEEDED:成功清理完成Container空间。任务实例运行完成后,需清理它使用的Container占用的空间,只有该空间清理完成后,才认为任务实例运行完成
  • FAILED:任务实例运行失败后所处的状态
  • KILLED:任务实例被杀死后所处的状态。
资源申请与再分配

ContainerAllocator是MRAppMaster中负责资源申请和分配的模块。用户提交的作业被分解成Map Task和Reduce Task后,这些Task所需的资源统一由ContainerAllocator模块负责从ResourceManager中申请,而一旦ContainerAllocator得到资源后,需采用一定的策略进一步分配给作业的各个任务。

在YARN中,作业的资源需求可描述为5元组:<priority,hostname,capability,containers,relax_locality >,分别表示作业优先级、期望资源所在的host、资源量(当前支持内存与CPU两种资源)、Container数目、是否松弛本地性,比如

<10, "node1", "memory:1G,CPU:1", 3,true>//优先级是一个正整数,优先级值越小,优先级越高
  <10, "node2", "memory:2G,CPU:1", 1, false>//1个必须来自node2上大小为2GB内存、1个CPU的Container(不能来自node2所在的机架或者其他节点)
    <2, "*", "memory:1G,CPU:1", 20, false> //*表示这样的资源可来自任意一个节点,即不考虑数据本地性

ContainerAllocator周期性通过心跳与ResourceManager通信,以获取已分配的Container列表、完成的Container列表、最近更新的节点列表等信息,而ContainerAllocator根据这些信息完成相应的操作。

1.资源申请过程分析
当用户提交作业之后,MRAppMaster会为之初始化,并创建一系列Map Task和Reduce Task,由于Reduce Task依赖于Map task之间结果,所以Reduce Task会延后调度。在ContainerAllocator中,当Map Task数目完成一定比例(由mapreduce.job.reduce.slowstart.completedmaps指定,默认是0.05,即5%)且Reduce Task可允许占用的资源(Reduce Task可占用资源比由yarn.app.mapreduce.am.job.reduce.rampup.limit指定)能够折合成整数个任务时,才会调度Reduce Task。

考虑到Map Task和Reduce Task之间的依赖关系,因此,它们之间的状态转移也是不一样的,对于Map Task而言,会依次转移到以下几个任务集合中:

scheduled->assigned->completed

对于Reduce Task而言,则按照以下流程进行:

pending->scheduled->assigned->completed

其中,pending表示等待ContainerAllocator发送资源请求的任务集合;scheduled表示已经将资源请求发送给RM,但还没有收到分配的资源的任务集合;assigned是已经收到RM分配的资源的任务集合;completed表示已运行完成的任务集合。

Reduce Task之所以会多出一个pending状态,主要是为了根据Map Task情况调整Reduce Task状态(在pending和scheduled中相互转移)。进一步说,这主要是为了防止Map Task饿死,因为在YARN中不再有Map Slot和Reduce Slot的概念(这两个概念从一定程度上减少了作业饿死的可能性),只有内存、CPU等真实的资源,需要由ApplicationMaster控制资源申请的顺序,以防止可能产生的作业饿死

此外,ContainerAllocator将所有任务划分成三类,分别是Failed Map Task、Map Task和Reduce Task,并分别赋予它们优先级5、20和10,也就是说,当三种任务同时有资源需求时,会优先分配给Failed Map Task,然后是Reduce Task,最后是Map Task。

总结起来,ContainerAllocator工作流程如下:

步骤1 将Map Task的资源需求发送给RM。

步骤2 如果达到了Reduce Task调度条件,则开始为Reduce Task申请资源。

步骤3 如果为某个Task申请到了资源,则取消其他重复资源的申请。由于在HDFS中,任何一份数据通常有三备份,而对于一个任务而言,考虑到rack和any级别的本地性,它可能会对应7个资源请求,分别是:

<20, "node1", "memory:1G", 1, true>
  <20, "node2", "memory:1G", 1, true>
    <20, "node3", "memory:1G", 1, true>
      <20, "rack1", "memory:1G", 1, true>
        <20, "rack2", "memory:1G", 1, true>
          <20, "rack3", "memory:1G", 1, true>
            <20, "*", "memory:1G", 1, true>

一旦该任务获取了以上任意一种资源,都会取消其他6个的资源申请。
在作业运行过程中,会出现资源重新申请和资源取消的行为,具体如下:

  • 如果任务运行失败,则会重新为该任务申请资源。
  • 如果一个任务运行速度过慢,则会为其额外申请资源以启动备份任务(如果启动了推测执行功能)
  • 如果一个节点失败的任务数目过多,则会撤销对该节点的所有资源的申请请求。

ContainerAllocator实际上是一接口,它只定义了三个事件:CONTAINER_REQ、CONTAINER_DEALLOCATE和CONTAINER_FAILED,分别表示请求Container、释放Container和Container运行失败。

ContainerAllocator的实现是RMContainerAllocator,它只接收和处理ContainerAllocator接口中定义的三种事件,它的运行是这三种事件驱动的。

RMContainerAllocator中最核心的框架是维护了一个心跳信息,在RMCommunicator类中的实现如下:

while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
  try {
    Thread.sleep(rmPollInterval);
    try {
      heartbeat();
    } catch (YarnException e) {
      LOG.error("Error communicating with RM: " + e.getMessage() , e);
      return;
    } catch (Exception e) {
      LOG.error("ERROR IN CONTACTING RM. ", e);
      continue;
    }
    lastHeartbeatTime = context.getClock().getTime();
    executeHeartbeatCallbacks();
  } catch (InterruptedException e) {
    LOG.warn("Allocated thread interrupted. Returning.");
    return;
  }
}

其中,heartbeat()函数的定义(在RMContainerAllocator类中)如下:

protected synchronized void heartbeat() throws Exception {
  scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
  List<Container> allocatedContainers = getResources();
  if (allocatedContainers.size() > 0) {
    scheduledRequests.assign(allocatedContainers);
  }
  …
}

其中,getResources()函数用于向RM发送心跳信息,并处理心跳应答。需要注意的是,大部分情况下,心跳信息中并不包含新的资源请求信息,即空的心跳信息,这种心跳有以下几个作用:

  • 周期性发送心跳,告诉RM自己还活着。
  • 周期性询问RM,以获取新分配的资源和各个Container运行状况

assign()函数作用是将收到的Container分配给某个任务,如果这个Container无法分配下去(比如内存空间不够),则在下次心跳中通知RM释放该Container,如果Container可以分下去,则会释放对应任务的其他资源请求,同时会向TaskAttempt发送一个TA_ASSIGNED事件,以通知ContainerLauncher启动Container。

除了新分配和已经运行完成的Container列表外,ContainerAllocator还会从RM中获取节点更新列表,这个列表给出了最近发生变化的节点,比如新增节点、不可用节点等,当前ContainerAllocator仅处理了不可用节点,即一旦发现节点不可用,ContainerAllocator会将该节点上正运行的任务的状态置为被杀死状态,并重新为这些任务申请资源。

Container启动与释放

ContainerLauncher负责与各个NodeManager通信,以启动或者释放Container。在YARN中,运行Task所需的全部信息被封装到Container中,包括所需资源、依赖的外部文件、JAR包、运行时环境变量、运行命令等。ContainerLauncher通过RPC协议ContainerManager与NodeManager通信,以控制Container的启动与释放,进而控制任务的执行(比如启动任务、杀死任务等),ContainerManager协议定义了三个RPC接口,具体如下:

StartContainerResponse startContainer(StartContainerRequest request)
  throws YarnRemoteException;//启动一个container
StopContainerResponse stopContainer(StopContainerRequest request)
  throws YarnRemoteException;//停止一个container
GetContainerStatusResponse getContainerStatus(
  GetContainerStatusRequest request) throws YarnRemoteException;//获取一个container运行情况

ContainerLauncher是一个Java接口,它定义了两种事件

  • CONTAINER_REMOTE_LAUNCH。启动一个Container。当ContainerAllocator为某个任务申请到资源后,会将运行该任务相关的所有信息封装到Container中,并要求对应的节点启动该Container

  • CONTAINER_REMOTE_CLEANUP。停止/杀死一个Container。存在多种可能触发该事件的行为,常见的有:

  1. 推测执行时一个任务运行完成,需杀死另一个同输入数据的任务
  2. 用户发送一个杀死任务请求;
  3. 任意一个任务运行结束时,YARN会触发一个杀死任务的命令,以释放对应Container占用的资源。

尤其需要注意的是第三种情况,YARN作为资源管理系统,应确保任何一个任务运行结束后资源得到释放,否则会造成资源泄露。而实现这一要求的可行方法是,任何一个任务结束后,不管它对应的资源是否得到释放,YARN均会主动显式检查和回收资源(container)。

ContainerLauncher接口由ContainerLauncherImpl类实现,它是一个服务,接收和处理来自事件调度器发送过来的CONTAINER_REMOTE_LAUNCH和CONTAINER_REMOTE_CLEANUP两种事件,它采用了线程池方式并行处理这两种事件。

对于CONTAINER_REMOTE_LAUNCH事件,它会调用Container.launch()函数与对应的NodeManager通信,以启动Container(可以同时启动多个Container),代码如下:

proxy = getCMProxy(containerMgrAddress, containerID);//构造一个RPC client
ContainerLaunchContext containerLaunchContext =
  event.getContainer();
StartContainerRequest startRequest =
  StartContainerRequest.newInstance(containerLaunchContext,
                                    event.getContainerToken());
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
list.add(startRequest);
StartContainersRequest requestList = 
  StartContainersRequest.newInstance(list); 
//调用RPC函数,获取返回值
StartContainersResponse response =
  proxy.getContainerManagementProtocol().startContainers(requestList);

对于CONTAINER_REMOTE_CLEANUP事件,它会调用Container.kill()函数与对应的NodeManager通信,以杀死Container释放资源(可以同时杀死多个Container),代码如下:

proxy = getCMProxy(this.containerMgrAddress, this.containerID);
// kill the remote container if already launched
List<ContainerId> ids = new ArrayList<ContainerId>();
ids.add(this.containerID);
StopContainersRequest request = StopContainersRequest.newInstance(ids);
StopContainersResponse response =
  proxy.getContainerManagementProtocol().stopContainers(request);

总之,ContainerLauncherImpl是一个非常简单的服务,其最核心的代码组织方式是“队列+线程池”,以处理事件调度器发送过来的CONTAINER_REMOTE_LAUNCH和CONTAINER_REMOTE_CLEANUP两种事件。

数据处理引擎

MRAppMaster仍采用了MRv1中的数据处理引擎,分别由数据处理引擎MapTask和ReduceTask完成Map任务和Reduce任务的处理,但相比于MRv1,MRAppMaster对这两个引擎进行了优化,这些优化主要体现在Shuffle阶段,具体如下。
1.Map端—用Netty代替Jetty

MRv1版本中,TaskTracker采用了Jetty服务器处理来自各个Reduce Task的数据读取请求。由于Jetty采用了非常简单的网络模型,因此性能比较低。在Hadoop 2.0中,MRAppMaster改用Netty—另一种开源的客户/服务器端编程框架,由于它内部采用了Java NIO技术,故其相比Jetty更加高效。Netty社区也比Jetty的更加活跃,且稳定性更好

2.Reduce端—批拷贝

MRv1版本中,在Shuffle过程中,Reduce Task会为每个数据分片建立一个专门的HTTP连接(One-connection-per-map),即使多个分片同时出现在一个TaskTracker上也是如此。为了提高数据复制效率,Hadoop 2.0尝试采用批拷贝技术:不再为每个Map Task建立一个HTTP连接,而是为同一个TaskTracker上的多个Map Task建立一个HTTP连接,进而能够一次读取多个数据分片,具体如图8-11所示


image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,002评论 6 509
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,777评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,341评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,085评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,110评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,868评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,528评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,422评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,938评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,067评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,199评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,877评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,540评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,079评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,192评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,514评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,190评论 2 357

推荐阅读更多精彩内容