我们知道,MRv1主要由编程模型(MapReduce API)、资源管理与作业控制模块(由JobTracker和TaskTracker组成)和数据处理引擎(由MapTask和ReduceTask组成)三部分组成。而YARN出现后,资源管理模块则交由YARN实现,这样为了让MapReduce框架运行在YARN上,仅需实现一个ApplicationMaster组件完成作业控制模块功能即可,其他部分,包括编程模型和数据处理引擎等,可直接采用MRv1原有的实现。下面将详细介绍MapReduce On YARN(即MRv2)的基本架构、模块组成以及各模块的实现。
MapReduce On YARN与MRv1在编程模型和数据处理引擎方面的实现是一样的,唯一的不同是运行时环境。不同于MRv1中由JobTracker和TaskTracker构成的运行时环境,MapReduce On YARN的运行时环境由YARN与ApplicationMaster构成,这种新颖的运行时环境使得MapReduce可以与其他计算框架运行在一个集群中,从而达到共享集群资源、提高资源利用率的目的。随着YARN的成熟与完善,MRv1的独立运行模式将被MapReduce On YARN取代。
MRAppMaster是MapReduce的ApplicationMaster实现,它使得MapReduce应用程序可以直接运行于YARN之上。在YARN中,MRAppMaster负责管理MapReduce作业的生命周期,包括作业管理、资源申请与再分配、Container启动与释放、作业恢复等。本节将介绍MRAppMaster基本构成。
如图8-1所示,MRAppMaster主要由以下几种组件/服务构成。
ContainerAllocator。与ResourceManager通信,为MapReduce作业申请资源。作业的每个任务资源需求可描述为5元组<Priority,hostname,capability,containers,relax_locality >,分别表示作业优先级、期望资源所在的host、资源量(当前支持内存和CPU两种资源)、Container数目、是否松弛本地性。ContainerAllocator周期性通过RPC与ResourceManager通信,而ResourceManager则通过心跳应答的方式为之返回已经分配的Container列表、完成的Container列表等信息
ClientService。ClientService是一个接口,由MRClientService实现。MRClientService实现了MRClientProtocol协议,客户端可通过该协议获取作业的执行状态(不必通过ResourceManager)和控制作业(比如杀死作业、改变作业优先级等)。
Job。Job表示一个MapReduce作业,与MRv1的JobInProgress功能一样,负责监控作业的运行状态。它维护了一个作业状态机,以实现异步执行各种作业相关的操作。
Task。Task表示一个MapReduce作业中的某个任务,与MRv1中的TaskInProgress功能类似,负责监控一个任务的运行状态。它维护了一个任务状态机,以实现异步执行各种任务相关的操作
TaskAttempt。TaskAttempt表示一个任务运行实例,它的执行逻辑与MRv1中的MapTask和ReduceTask运行实例完全一致,实际上,它直接使用了MRv1的数据处理引擎,但经过了一些优化,正是由于它与MRv1的数据处理引擎一样,它对外提供的编程接口也与MRv1完全一致,这意味着MRv1的应用程序可直接运行在YARN之上。
TaskCleaner。TaskCleaner负责清理失败任务或者被杀死任务使用的目录和产生的临时结果(可统称为垃圾数据),它维护了一个线程池和一个共享队列,异步删除任务产生的垃圾数据。
Speculator。Speculator完成推测执行功能。当同一个作业的某个任务运行速度明显慢于其他任务时,Speculator会为该任务启动一个备份任务,让它与原任务同时处理同一份数据,谁先计算完成则将谁的结果作为最终结果,并将另一个任务杀掉。该机制可有效防止那些“拖后腿”任务拖慢整个作业的执行进度
ContainerLauncher。ContainerLauncher负责与NodeManager通信,以启动一个Container。当ResourceManager为作业分配资源后,ContainerLauncher会将任务执行相关信息填充到Container中,包括任务运行所需资源、任务运行命令、任务运行环境、任务依赖的外部文件等,然后与对应的NodeManager通信,要求它启动Container
TaskAttemptListener。TaskAttemptListener负责管理各个任务的心跳信息,如果一个任务一段时间内未汇报心跳,则认为它死掉了,会将其从系统中移除。同MRv1中的TaskTracker类似,它实现了TaskUmbilicalProtocol协议,任务会通过该协议汇报心跳,并询问是否能够提交最终结果
JobHistoryEventHandler。JobHistoryEventHandler负责对作业的各个事件记录日志,比如作业创建、作业开始运行、一个任务开始运行等,这些日志会被写到HDFS的某个目录下,这对于作业恢复非常有用。当MRAppMaster出现故障时,YARN会将其重新调度到另外一个节点上。为了避免重新计算,MRAppMaster首先会从HDFS上读取上次运行产生的日志,以恢复已经运行完成的任务,进而能够只运行尚未运行完成的任务
事件与事件处理器
YARN使用了基于事件驱动的异步编程模型,它通过事件将各个组件联系起来,并由一个中央异步调度器统一将各种事件分配给对应的事件处理器。在MRAppMaster中,每种组件是一个事件处理器,当MRAppMaster启动时,它们会以服务的形式注册到MRAppMaster的中央异步调度器上,并告诉调度器它们处理的事件类型。这样当出现某一种事件时,MRAppMaster会查询<事件,事件处理器>表,并将该事件分配给对应的事件处理器
MapReduce客户端
MapReduce客户端是MapReduce用户与YARN(和MRAppMaster)进行通信的唯一途径,通过该客户端,用户可以向YARN提交作业,获取作业的运行状态和控制作业(比如杀死作业、杀死任务等)。MapReduce客户端涉及两个RPC通信协议
- ApplicationClientProtocol。在YARN中,ResourceManager实现了ApplicationClient-Protocol协议,任何客户端需使用该协议完成提交作业、杀死作业、改变作业优先级等操作
- MRClientProtocol。当作业的ApplicationMaster成功启动后,它会启动MRClient-Service服务,该服务实现了MRClientProtocol协议,从而允许客户端直接通过该协议与ApplicationMaster通信以控制作业和查询作业运行状态,以减轻Resource-Manager负载。
在YARN中,应用程序(作业)的运行过程包括两个步骤:启动Application-Master和运行应用程序(作业)内部的各类任务,其中,ApplicationMaster是由ResourceManager直接与NodeManager通信而启动的,在它启动起来之前,客户端只能与ResourceManager交互以查询作业相关信息,在它启动起来之前,客户端只能与ResourceManager交互以查询作业相关信息。一旦作业的ApplicationMaster成功启动,客户端可直接与它交互以查询作业信息和控制作业。接下来,我们分别介绍这两个通信协议
- ApplicationClientProtocol协议
MapReduce客户端通过该协议与ResourceManager通信,以提交应用程序和查询集群信息。ResourceManager用Application表示用户提交的作业,并提供了以下接口供用户使用:
public interface ApplicationClientProtocol {
public GetNewApplicationResponse getNewApplication( //获取一个Application ID
GetNewApplicationRequest request)throws YarnRemoteException;
public SubmitApplicationResponse submitApplication( //提交一个Application
SubmitApplicationRequest request) throws YarnRemoteException;
public KillApplicationResponse forceKillApplication( //杀死一个Application
KillApplicationRequest request) throws YarnRemoteException;
public GetApplicationReportResponse getApplicationReport( //获取Application运行报告
GetApplicationReportRequest request) throws YarnRemoteException;
public GetClusterMetricsResponse getClusterMetrics( //获取集群所有Metric
GetClusterMetricsRequest request) throws YarnRemoteException;
public GetAllApplicationsResponse getAllApplications( //列出所有Application
GetAllApplicationsRequest request) throws YarnRemoteException;
public GetClusterNodesResponse getClusterNodes( //获取集群中所有节点
GetClusterNodesRequest request) throws YarnRemoteException;
public GetQueueUserAclsInfoResponse getQueueUserAcls( //获取用户访问控制权限
GetQueueUserAclsInfoRequest request) throws YarnRemoteException;
...
}
MRClientProtocol协议
MRAppMaster实现了MRClientProtocol协议为客户端提供服务,该协议提供了作业和任务的查询和控制接口,主要如下:
public interface MRClientProtocol {
...
public GetJobReportResponse getJobReport(GetJobReportRequest request) throws YarnRemoteException;//获取作业运行报告
public GetTaskReportResponse getTaskReport(GetTaskReportRequest request) throws YarnRemoteException;//获取所有任务运行报告
public GetTaskAttemptReportResponse getTaskAttemptReport(GetTaskAttemptReportRequest request) throws YarnRemoteException;//获取所有任务实例的运行报告
public GetCountersResponse getCounters(GetCountersRequest request) throws YarnRemoteException;//获取所有Counter
public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(GetTaskAttemptCompletionEventsRequest request) throws YarnRemoteException;//获取所有运行完成的任务
public GetTaskReportsResponse getTaskReports(GetTaskReportsRequest request) throws YarnRemoteException;//获取所有任务运行报告
public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request) throws YarnRemoteException;//获取作业诊断信息
public KillJobResponse killJob(KillJobRequest request) throws YarnRemoteException;//杀死一个作业
public KillTaskResponse killTask(KillTaskRequest request) throws YarnRemoteException;//杀死一个任务
public KillTaskAttemptResponse killTaskAttempt(KillTaskAttemptRequest request) throws YarnRemoteException;//杀死一个任务实例
public FailTaskAttemptResponse failTaskAttempt(FailTaskAttemptRequest request) throws YarnRemoteException;//让一个任务实例运行失败
...
}
MRAppMaster工作流程
按照作业大小不同,MRAppMaster提供了三种作业运行模式:本地模式(通常用于作业调试,同MRv1一样,不再赘述)、Uber模式和Non-Uber 模式。对于小作业,为了降低其延迟,可采用Uber模式,在该模式下,所有Map Task和Reduce Task在同一个Container(MRAppMaster所在Container)中顺次执行;对于大作业,则采用Non-Uber 模式,在该模式下,MRAppMaster先为Map Task申请资源,当Map Task运行完成数目达到一定比例后再为Reduce Task申请资源
1)Uber运行模式
为了降低小作业延时,YARN专门对小作业运行方式进行了优化。对于小作业而言,MRAppMaster无须再为每个任务分别申请资源,而是让其重用一个Container,并按照先Map Task后Reduce Task的运行方式串行执行每个任务。在YARN中,如果一个MapReduce作业同时满足以下条件,则认为是小作业,可运行在Uber模式下:
- Map Task数目不超过mapreduce.job.ubertask.maxmaps(默认是9)。
- Reduce Task数目不超过mapreduce.job.ubertask.maxmaps(默认是1)。
- 输入文件大小不超过mapreduce.job.ubertask.maxbytes(默认是一个Block大小)。
- Map Task和Reduce Task需要的资源量不超过MRAppMaster可使用的资源量。
2)Non-Uber运行模式
在大数据环境下,Uber运行模式通常只能覆盖到一小部分作业,而对于其他大多数作业,仍将运行在Non-Uber模式下。在Non-Uber模式下,MRAppMaster将一个作业的Map Task和Reduce Task分为四种状态
- pending:刚启动但尚未向ResourceManager发送资源请求
- scheduled:已经向ResourceManager发送资源请求但尚未分配到资源。
- assigned:已经分配到了资源且正在运行。
- completed:已经运行完成。
对于Map Task而言,它的生命周期为scheduled →assigned→completed;而对于Reduce Task而言,它的生命周期为pending →scheduled →assigned→completed。由于Reduce Task的执行依赖于Map Task的输出结果,因此,为避免Reduce Task过早启动造成资源利用率低下,MRAppMaster让刚启动的Reduce Task处于pending状态,以便能够根据Map Task运行情况决定是否对其进行调度。在YARN之上运行MapReduce作业需要解决两个关键问题:如何确定Reduce Task启动时机以及如何完成Shuffle功能。
由于YARN中不再有Map Slot和Reduce Slot的概念,且RedouceManager也不知道Map Task与Reduce Task之间存在依赖关系,因此,MRAppMaster自己需设计资源申请策略以防止因Reduce Task过早启动造成资源利用率低下和Map Task因分配不到资源而“饿死”。MRAppMaster在MRv1原有策略(Map Task完成数目达到一定比例后才允许启动Reduce Task)基础上添加了更为严格的资源控制策略和抢占策略。总结起来,Reduce Task启动时机由以下三个参数控制:
- mapreduce.job.reduce.slowstart.completedmaps:当Map Task完成的比例达到该值后才会为Reduce Task申请资源,默认是0.05。
- yarn.app.mapreduce.am.job.reduce.rampup.limit:在Map Task完成前,最多启动的Reduce Task比例,默认为0.5。
- yarn.app.mapreduce.am.job.reduce.preemption.limit:当Map Task需要资源但暂时无法获取资源(比如Reduce Task运行过程中,部分Map Task因结果丢失需重算)时,为了保证至少一个Map Task可以得到资源,最多可以抢占的Reduce Task比例,默认为0.5。
按照MapReduce的执行逻辑,Shuffle HTTP Server应该分布到各个节点上,以便能够支持各个Reduce Task远程复制数据。然而,由于Shuffle是MapReduce框架中特有的一个处理流程,从设计上讲,不应该将它直接嵌到YARN的某个组件(比如NodeManager)中。
YARN采用了服务模型管理各个对象,且多个服务可以通过组合的方式交由一个核心服务进行统一管理。在YARN中,NodeManager作为一种组合服务模式,允许动态加载应用程序临时需要的附属服务,利用这一特性,YARN将Shuffle HTTP Server组装成了一种服务,以便让各个NodeManager启动时加载它。
当用户向YARN中提交一个MapReduce应用程序后,YARN将分两个阶段运行该应用程序:第一个阶段是由ResourceManager启动MRAppMaster;第二个阶段是由MRAppMaster创建应用程序,为它申请资源,并监控它的整个运行过程,直到运行成功。如图8-3所示,YARN的工作流程分为以下几个步骤:
步骤1 用户向YARN中提交应用程序,其中包括MRAppMaster程序、启动MRApp-Master的命令、用户程序等。
步骤2 ResourceManager为该应用程序分配第一个Container,并与对应的NodeManager通信,要求它在这个Container中启动应用程序的MRAppMaster。
步骤3 MRAppMaster启动后,首先向ResourceManager注册,这样用户可以直接通过ResourceManager查看应用程序的运行状态,之后,它将为内部任务申请资源,并监控它们的运行状态,直到运行结束,即重复步骤4~7。
步骤4 MRAppMaster采用轮询的方式通过RPC协议向ResourceManager申请和领取资源。
步骤5 一旦MRAppMaster申请到资源后,则与对应的NodeManager通信,要求它启动任务。
步骤6 NodeManager为任务设置好运行环境(包括环境变量、JAR包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务
步骤7 各个任务通过RPC协议向MRAppMaster汇报自己的状态和进度,以让MRAppMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务
在应用程序运行过程中,用户可随时通过RPC向MRAppMaster查询应用程序的当前运行状态。
步骤8 应用程序运行完成后,MRAppMaster向ResourceManager注销并关闭自己。
MR作业生命周期及相关状态机
MR作业生命周期
在正式介绍作业(Job)生命周期之前,先要了解MRAppMaster中作业表示方式。如图8-4所示,与MRv1一样,MRAppMaster根据InputFormat组件的具体实现(通常是根据数据量切分数据),将作业分解成若干个Map Task和Reduce Task,其中每个Map Task处理一片InputSplit数据,而每个Reduce Task则进一步处理Map Task产生的中间结果。每个Map/Reduce Task只是一个具体计算任务的描述,真正的任务计算工作则是由运行实例TaskAttempt完成的,每个Map/Reduce Task可能顺次启动多个运行实例,比如第一个运行实例失败了,则另起一个实例重新计算,直到这一份数据处理完成或者达到尝试次数上限;也可能同时启动多个运行实例,让它们竞争同时处理一片数据。在MRAppMaster中,上述Job、Task和TaskAttempt的生命周期均由一个有限状态机描述,其中TaskAttempt是实际进行任务计算的组件,其他两个只负责监控和管理。正是由于MRAppMaster中的TaskAttempt重用了MRv1中的代码,MRv1中的MapReduce应用程序可直接运行在YARN上.
作业的创建入口在MRAppMaster类中,如下所示:
public class MRAppMaster extends CompositeService {
public void start() {
...
job = createJob(getConfig());//创建Job
JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
jobEventDispatcher.handle(initJobEvent);//发送JOB_INI,创建MapTask、ReduceTask
startJobs();//启动作业,这是后续一切动作的触发之源
...
}
protected Job createJob(Configuration conf) {
Job newJob =
new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, fsTokens, clock,
completedTasksFromPreviousRun, metrics, committer, newApiCommitter,
currentUser.getUserName(), appSubmitTime, amInfos, context);
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
dispatcher.register(JobFinishEvent.Type.class,
createJobFinishEventHandler());
return newJob;
}
}
之后,MapReduce作业将依次经历作业/任务初始化和作业启动两个阶段。
(1)作业/任务初始化
JobImpl首先接收到JOB_INIT事件,然后触发调用函数InitTransition(),进而导致作业状态从NEW变为INITED,InitTransition函数主要工作是创建Map Task和Reduce Task,代码如下
public static class InitTransition
implements MultipleArcTransition<JobImpl, JobEvent, JobState> {
...
createMapTasks(job, inputLength, taskSplitMetaInfo);
createReduceTasks(job);
...
}
其中,createMapTasks函数实现如下:
private void createMapTasks(JobImpl job, long inputLength,
TaskSplitMetaInfo[] splits) {
for (int i=0; i < job.numMapTasks; ++i) {
TaskImpl task =
new MapTaskImpl(job.jobId, i,
job.eventHandler,
job.remoteJobConfFile,
job.conf, splits[i],
job.taskAttemptListener,
job.committer, job.jobToken, job.fsTokens,
job.clock, job.completedTasksFromPreviousRun,
job.applicationAttemptId.getAttemptId(),
job.metrics, job.appContext);
job.addTask(task);
}
}
(2)作业启动
启动作业的代码如下
public class MRAppMaster extends CompositeService {
protected void startJobs() {
JobEvent startJobEvent = new JobEvent(job.getID(), JobEventType.JOB_START);
dispatcher.getEventHandler().handle(startJobEvent);
}
}
JobImpl接收到JOB_START事件后,将执行函数StartTransition(),进而触发Map Task和Reduce Task的调度执行,同时使得作业状态从INITED变为RUNNING,具体如下:
public static class StartTransition
implements SingleArcTransition<JobImpl, JobEvent> {
public void transition(JobImpl job, JobEvent event) {
job.scheduleTasks(job.mapTasks);
job.scheduleTasks(job.reduceTasks);
}
}
之后,每个Map Task和Reduce Task负责各自的状态变化,ContainerAllocator模块会首先为Map Task申请资源,然后是Reduce Task,一旦一个Task获取到了资源,就会创建一个运行实例Task Attempt。如果该实例运行成功,则Task运行成功,否则,Task还会启动下一个运行实例Task Attempt,直到一个Task Attempt运行成功或者达到尝试次数上限。当所有Task运行成功后,Job运行成功。一个运行成功的作业/任务所经历的状态变化(不包含失败或者被杀死情况)
Task状态机
Task状态机维护了一个任务的生命周期,即从创建到运行结束整个过程。一个任务可能存在多次运行尝试,每次运行尝试被称为一个“运行实例”,Task状态机则负责管理这些运行实例。Task状态机由TaskImpl类实现,其主要包括7种状态和9种导致状态发生变化的事件
(1)Task状态
Task状态包括
NEW:任务初始状态。当一个任务被创建时,状态被置为NEW
SCHEDULED:任务开始被调度时所处的状态。该状态意味着,MRAppMaster开始为任务(向ResourceManager)申请资源,但任务尚未获取到资源。
RUNNING:任务的一个实例开始运行。该状态意味着,MRAppMaster已经为该任务申请到资源(Container),并与对应的NodeManager通信成功启动了Container。需要注意的,在某一时刻,一个任务可能有多个运行实例,且可能存在运行失败的运行实例
SUCCEEDED:任务的一个实例运行成功。该状态意味着,该任务成功运行并完成。需要注意的,只要任务的一个实例运行成功,则意味着该任务运行成功。
KILLED:任务被杀死所处的状态。当一个任务的所有运行实例被杀死后,才认为该任务被杀死。
FAILED:任务运行失败所处的状态。每个任务的运行实例数目有一定上限,一旦超过该上限,才认为该任务运行失败,其中,Map Task运行实例数目上限由参数mapreduce.map.maxattempts指定,默认值为4,Reduce Task运行实例数目上限由参数mapreduce.reduce.maxattempts指定,默认值为4。需要注意的是,一个任务运行失败并不一定会导致整个作业运行失败,这同样取决于作业的错误容忍率(默认是0)
TaskAttempt状态机
在YARN中,任务实例是运行在Container中的,因此,Container状态变化往往伴随任务实例的状态变化,比如任务实例运行完成后,会清理Container占用的空间,而Container空间的清理实际上就是任务实例空间的清理。目前每个任务存在两种类型的实例:Avataar.VIRGIN和Avataar.SPECULATIVE,分别表示原始任务和备份任务(通过推测执行机制启动的)。
(1)TaskAttempt状态
TaskAttmpt状态包括:
- NEW:任务实例初始状态。当一个任务实例被创建时,状态被置为NEW。
- UNASSIGNED:等待分配资源所处的状态。当一个任务实例被创建后,它会发出一个资源申请请求,等待MRAppMaster为它申请和分配资源
- ASSIGNED:任务实例获取到一个Container。
- RUNNING:任务实例成功启动。MRAppMaster将资源分配给某个任务后,会与对应的NodeManager通信,以启动Container。只要Container启动成功,则任务实例启动成功
- COMMIT_PENDING:任务实例等待提交最终结果。任务实例运行完成后,需向MRAppMaster请求提交最终结果,一旦提交成功后,该任务的其他实例将被杀死
- SUCCEEDED:成功清理完成Container空间。任务实例运行完成后,需清理它使用的Container占用的空间,只有该空间清理完成后,才认为任务实例运行完成
- FAILED:任务实例运行失败后所处的状态
- KILLED:任务实例被杀死后所处的状态。
资源申请与再分配
ContainerAllocator是MRAppMaster中负责资源申请和分配的模块。用户提交的作业被分解成Map Task和Reduce Task后,这些Task所需的资源统一由ContainerAllocator模块负责从ResourceManager中申请,而一旦ContainerAllocator得到资源后,需采用一定的策略进一步分配给作业的各个任务。
在YARN中,作业的资源需求可描述为5元组:<priority,hostname,capability,containers,relax_locality >,分别表示作业优先级、期望资源所在的host、资源量(当前支持内存与CPU两种资源)、Container数目、是否松弛本地性,比如
<10, "node1", "memory:1G,CPU:1", 3,true>//优先级是一个正整数,优先级值越小,优先级越高
<10, "node2", "memory:2G,CPU:1", 1, false>//1个必须来自node2上大小为2GB内存、1个CPU的Container(不能来自node2所在的机架或者其他节点)
<2, "*", "memory:1G,CPU:1", 20, false> //*表示这样的资源可来自任意一个节点,即不考虑数据本地性
ContainerAllocator周期性通过心跳与ResourceManager通信,以获取已分配的Container列表、完成的Container列表、最近更新的节点列表等信息,而ContainerAllocator根据这些信息完成相应的操作。
1.资源申请过程分析
当用户提交作业之后,MRAppMaster会为之初始化,并创建一系列Map Task和Reduce Task,由于Reduce Task依赖于Map task之间结果,所以Reduce Task会延后调度。在ContainerAllocator中,当Map Task数目完成一定比例(由mapreduce.job.reduce.slowstart.completedmaps指定,默认是0.05,即5%)且Reduce Task可允许占用的资源(Reduce Task可占用资源比由yarn.app.mapreduce.am.job.reduce.rampup.limit指定)能够折合成整数个任务时,才会调度Reduce Task。
考虑到Map Task和Reduce Task之间的依赖关系,因此,它们之间的状态转移也是不一样的,对于Map Task而言,会依次转移到以下几个任务集合中:
scheduled->assigned->completed
对于Reduce Task而言,则按照以下流程进行:
pending->scheduled->assigned->completed
其中,pending表示等待ContainerAllocator发送资源请求的任务集合;scheduled表示已经将资源请求发送给RM,但还没有收到分配的资源的任务集合;assigned是已经收到RM分配的资源的任务集合;completed表示已运行完成的任务集合。
Reduce Task之所以会多出一个pending状态,主要是为了根据Map Task情况调整Reduce Task状态(在pending和scheduled中相互转移)。进一步说,这主要是为了防止Map Task饿死,因为在YARN中不再有Map Slot和Reduce Slot的概念(这两个概念从一定程度上减少了作业饿死的可能性),只有内存、CPU等真实的资源,需要由ApplicationMaster控制资源申请的顺序,以防止可能产生的作业饿死
此外,ContainerAllocator将所有任务划分成三类,分别是Failed Map Task、Map Task和Reduce Task,并分别赋予它们优先级5、20和10,也就是说,当三种任务同时有资源需求时,会优先分配给Failed Map Task,然后是Reduce Task,最后是Map Task。
总结起来,ContainerAllocator工作流程如下:
步骤1 将Map Task的资源需求发送给RM。
步骤2 如果达到了Reduce Task调度条件,则开始为Reduce Task申请资源。
步骤3 如果为某个Task申请到了资源,则取消其他重复资源的申请。由于在HDFS中,任何一份数据通常有三备份,而对于一个任务而言,考虑到rack和any级别的本地性,它可能会对应7个资源请求,分别是:
<20, "node1", "memory:1G", 1, true>
<20, "node2", "memory:1G", 1, true>
<20, "node3", "memory:1G", 1, true>
<20, "rack1", "memory:1G", 1, true>
<20, "rack2", "memory:1G", 1, true>
<20, "rack3", "memory:1G", 1, true>
<20, "*", "memory:1G", 1, true>
一旦该任务获取了以上任意一种资源,都会取消其他6个的资源申请。
在作业运行过程中,会出现资源重新申请和资源取消的行为,具体如下:
- 如果任务运行失败,则会重新为该任务申请资源。
- 如果一个任务运行速度过慢,则会为其额外申请资源以启动备份任务(如果启动了推测执行功能)
- 如果一个节点失败的任务数目过多,则会撤销对该节点的所有资源的申请请求。
ContainerAllocator实际上是一接口,它只定义了三个事件:CONTAINER_REQ、CONTAINER_DEALLOCATE和CONTAINER_FAILED,分别表示请求Container、释放Container和Container运行失败。
ContainerAllocator的实现是RMContainerAllocator,它只接收和处理ContainerAllocator接口中定义的三种事件,它的运行是这三种事件驱动的。
RMContainerAllocator中最核心的框架是维护了一个心跳信息,在RMCommunicator类中的实现如下:
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(rmPollInterval);
try {
heartbeat();
} catch (YarnException e) {
LOG.error("Error communicating with RM: " + e.getMessage() , e);
return;
} catch (Exception e) {
LOG.error("ERROR IN CONTACTING RM. ", e);
continue;
}
lastHeartbeatTime = context.getClock().getTime();
executeHeartbeatCallbacks();
} catch (InterruptedException e) {
LOG.warn("Allocated thread interrupted. Returning.");
return;
}
}
其中,heartbeat()函数的定义(在RMContainerAllocator类中)如下:
protected synchronized void heartbeat() throws Exception {
scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
List<Container> allocatedContainers = getResources();
if (allocatedContainers.size() > 0) {
scheduledRequests.assign(allocatedContainers);
}
…
}
其中,getResources()函数用于向RM发送心跳信息,并处理心跳应答。需要注意的是,大部分情况下,心跳信息中并不包含新的资源请求信息,即空的心跳信息,这种心跳有以下几个作用:
- 周期性发送心跳,告诉RM自己还活着。
- 周期性询问RM,以获取新分配的资源和各个Container运行状况
assign()函数作用是将收到的Container分配给某个任务,如果这个Container无法分配下去(比如内存空间不够),则在下次心跳中通知RM释放该Container,如果Container可以分下去,则会释放对应任务的其他资源请求,同时会向TaskAttempt发送一个TA_ASSIGNED事件,以通知ContainerLauncher启动Container。
除了新分配和已经运行完成的Container列表外,ContainerAllocator还会从RM中获取节点更新列表,这个列表给出了最近发生变化的节点,比如新增节点、不可用节点等,当前ContainerAllocator仅处理了不可用节点,即一旦发现节点不可用,ContainerAllocator会将该节点上正运行的任务的状态置为被杀死状态,并重新为这些任务申请资源。
Container启动与释放
ContainerLauncher负责与各个NodeManager通信,以启动或者释放Container。在YARN中,运行Task所需的全部信息被封装到Container中,包括所需资源、依赖的外部文件、JAR包、运行时环境变量、运行命令等。ContainerLauncher通过RPC协议ContainerManager与NodeManager通信,以控制Container的启动与释放,进而控制任务的执行(比如启动任务、杀死任务等),ContainerManager协议定义了三个RPC接口,具体如下:
StartContainerResponse startContainer(StartContainerRequest request)
throws YarnRemoteException;//启动一个container
StopContainerResponse stopContainer(StopContainerRequest request)
throws YarnRemoteException;//停止一个container
GetContainerStatusResponse getContainerStatus(
GetContainerStatusRequest request) throws YarnRemoteException;//获取一个container运行情况
ContainerLauncher是一个Java接口,它定义了两种事件
CONTAINER_REMOTE_LAUNCH。启动一个Container。当ContainerAllocator为某个任务申请到资源后,会将运行该任务相关的所有信息封装到Container中,并要求对应的节点启动该Container
CONTAINER_REMOTE_CLEANUP。停止/杀死一个Container。存在多种可能触发该事件的行为,常见的有:
- 推测执行时一个任务运行完成,需杀死另一个同输入数据的任务
- 用户发送一个杀死任务请求;
- 任意一个任务运行结束时,YARN会触发一个杀死任务的命令,以释放对应Container占用的资源。
尤其需要注意的是第三种情况,YARN作为资源管理系统,应确保任何一个任务运行结束后资源得到释放,否则会造成资源泄露。而实现这一要求的可行方法是,任何一个任务结束后,不管它对应的资源是否得到释放,YARN均会主动显式检查和回收资源(container)。
ContainerLauncher接口由ContainerLauncherImpl类实现,它是一个服务,接收和处理来自事件调度器发送过来的CONTAINER_REMOTE_LAUNCH和CONTAINER_REMOTE_CLEANUP两种事件,它采用了线程池方式并行处理这两种事件。
对于CONTAINER_REMOTE_LAUNCH事件,它会调用Container.launch()函数与对应的NodeManager通信,以启动Container(可以同时启动多个Container),代码如下:
proxy = getCMProxy(containerMgrAddress, containerID);//构造一个RPC client
ContainerLaunchContext containerLaunchContext =
event.getContainer();
StartContainerRequest startRequest =
StartContainerRequest.newInstance(containerLaunchContext,
event.getContainerToken());
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
list.add(startRequest);
StartContainersRequest requestList =
StartContainersRequest.newInstance(list);
//调用RPC函数,获取返回值
StartContainersResponse response =
proxy.getContainerManagementProtocol().startContainers(requestList);
对于CONTAINER_REMOTE_CLEANUP事件,它会调用Container.kill()函数与对应的NodeManager通信,以杀死Container释放资源(可以同时杀死多个Container),代码如下:
proxy = getCMProxy(this.containerMgrAddress, this.containerID);
// kill the remote container if already launched
List<ContainerId> ids = new ArrayList<ContainerId>();
ids.add(this.containerID);
StopContainersRequest request = StopContainersRequest.newInstance(ids);
StopContainersResponse response =
proxy.getContainerManagementProtocol().stopContainers(request);
总之,ContainerLauncherImpl是一个非常简单的服务,其最核心的代码组织方式是“队列+线程池”,以处理事件调度器发送过来的CONTAINER_REMOTE_LAUNCH和CONTAINER_REMOTE_CLEANUP两种事件。
数据处理引擎
MRAppMaster仍采用了MRv1中的数据处理引擎,分别由数据处理引擎MapTask和ReduceTask完成Map任务和Reduce任务的处理,但相比于MRv1,MRAppMaster对这两个引擎进行了优化,这些优化主要体现在Shuffle阶段,具体如下。
1.Map端—用Netty代替Jetty
MRv1版本中,TaskTracker采用了Jetty服务器处理来自各个Reduce Task的数据读取请求。由于Jetty采用了非常简单的网络模型,因此性能比较低。在Hadoop 2.0中,MRAppMaster改用Netty—另一种开源的客户/服务器端编程框架,由于它内部采用了Java NIO技术,故其相比Jetty更加高效。Netty社区也比Jetty的更加活跃,且稳定性更好
2.Reduce端—批拷贝
MRv1版本中,在Shuffle过程中,Reduce Task会为每个数据分片建立一个专门的HTTP连接(One-connection-per-map),即使多个分片同时出现在一个TaskTracker上也是如此。为了提高数据复制效率,Hadoop 2.0尝试采用批拷贝技术:不再为每个Map Task建立一个HTTP连接,而是为同一个TaskTracker上的多个Map Task建立一个HTTP连接,进而能够一次读取多个数据分片,具体如图8-11所示