Hadoop最初的设计目的是支持大数据批处理作业,如日志挖掘、Web索引等作业,为此,Hadoop仅提供了一个非常简单的调度机制:FIFO,即先来先服务。在该调度机制下,所有作业被统一提交到一个队列中,Hadoop按照提交顺序依次运行这些作业。
但随着Hadoop的普及,单个Hadoop集群的用户量越来越大,不同用户提交的应用程序往往具有不同的服务质量要求(Quality Of Service, QoS),典型的应用有以下几种。
- 批处理作业:这种作业往往耗时较长,对完成时间一般没有严格要求,如数据挖掘、机器学习等方面的应用程序。
- 交互式作业:这种作业期望能及时返回结果,如SQL查询(Hive)等。
- 生产性作业:这种作业要求有一定量的资源保证,如统计值计算。
此外,这些应用程序对硬件资源的需求量也是不同的,如过滤、统计类作业一般为CPU密集型作业,而数据挖掘、机器学习作业一般为I/O密集型作业。因此,传统的FIFO调度策略不仅不能满足多样化需求,也不能充分利用硬件资源。
为了克服单队列FIFO调度器的不足,多种类型的多用户多队列调度器诞生了。这种调度器允许管理员按照应用需求对用户或者应用程序分组,并为不同的分组分配不同的资源量,同时通过添加各种约束防止单个用户或者应用程序独占资源,进而能够满足各种QoS需求。当前主要有两种多用户作业调度器的设计思路:第一种是在一个物理集群上虚拟多个Hadoop集群,这些集群各自拥有全套独立的Hadoop服务,比如JobTracker、TaskTracker等,典型的代表是HOD调度器;另一种是扩展Hadoop调度器,使之支持多个队列多用户,这样,不同的队列拥有不同的资源量,可以运行不同的应用程序,典型的代表是Yahoo!的Capacity Scheduler和Facebook的Fair Scheduler。接下来将分别介绍这两种多用户作业调度器。
HOD
HOD(Hadoop On Demand)调度器是一个在共享物理集群上管理若干个Hadoop集群的工具。用户可通过HOD调度器在一个共享物理集群上快速搭建若干个独立的虚拟Hadoop集群,以满足不同的用途,比如不同集群运行不同类型的应用程序,运行不同的Hadoop版本进行测试等。HOD调度器可使管理员和用户轻松地快速搭建和使用Hadoop。
HOD调度器首先使用Torque资源管理器为一个虚拟Hadoop集群分配节点,然后在分配的节点上启动MapReduce和HDFS中的各个守护进程,并自动为Hadoop守护进程和客户端生成合适的配置文件(包括mapred-site.xml, core-site.xml和hdfs-site.xml等)。接下来将分别介绍Torque资源管理器和HOD调度器的基本工作原理。
Torque资源管理器
HOD调度器的工作过程实现中依赖于一个资源管理器来为它分配、回收节点和管理各节点上的作业运行的情况,如监控作业的运行、维护作业的运行状态等。而HOD只需在资源管理器所分配的节点上运行Hadoop守护进程和MapReduce作业即可。当前HOD采用的资源管理器是开源的Torque资源管理器。
一个Torque集群由一个头节点和若干个计算节点组成。头节点上运行一个名为pbs_server的守护进程,主要用于管理计算节点和监控各个作业的运行状态。每个计算节点上运行一个名为pbs_mom的守护进程,用于执行主节点分配的作业。此外,用户可将任何节点作为客户端,用于提交和管理作业。
头节点内部还运行了一个调度器守护进程。该守护进程会与pbs_server进行通信,以决定对资源使用和作业分配的本地策略。默认情况下,调度守护进程采用了FIFO调度机制,它将所有作业存放到一个队列中,并按照到达时间依次对它们进行调度。需要注意的是,Torque中的调度机制是可插拔的,Torque还提供许多其他可选的作业调度器。
如图10-1所示,用户可通过qsub命令向物理集群中提交作业,而Torque内部执行流程如下:
步骤1 当pbs_server收到新作业后,会进一步通知调度器。
步骤2 调度器采用一定的策略为该作业分配节点,并将节点列表与节点对应的作业执行命令返回给pbs_server。
步骤3 pbs_server将作业发送给第一个节点。
步骤4 第一个节点启动作业,作业开始运行(该作业会通知其他节点执行相应命令)。
步骤5 作业运行完成或者资源申请到期后,Torque会回收资源。
HOD作业调度
理解了Torque工作原理后,HOD调度器工作原理便一目了然:首先利用Torque向物理集群申请一个虚拟机群,然后将Hadoop守护进程包装成一个Torque作业,并在申请的节点上启动,最后用户可直接向启动的Hadoop集群中提交作业。通过HOD调度器申请集群和运行作业的主要流程如下:
步骤1 用户向HOD调度器申请一个包含一定数目节点的集群,并要求该集群中运行一个Hadoop实例。
步骤2 HOD客户端利用资源管理器接口qsub提交一个被称为RingMaster的进程作为Torque作业,同时申请一定数目的节点。这个作业被提交到pbs_server上。
步骤3 在各个计算节点上,守护进程pbs_mom接受并处理pbs_server分配的作业。RingMaster进程在其中一个计算节点上开始运行。
步骤4 RingMaster通过Torque的另外一个接口pbsdsh在所有分配到的计算节点上运行第二个HOD组件HodRing,即运行于各个计算节点上的分布式任务。
步骤5 HodRing初始化之后会与RingMaster通信以获取Hadoop指令,并根据指令启动Hadoop服务进程。一旦服务进程开始启动,它们会向RingMaster登记,提供关于守护进程的信息。
步骤6 Hadoop实例启动之后,用户可以向集群中提交MapReduce作业。
步骤7 如果一段时间内Hadoop集群上没有作业运行,Torque会回收该虚拟Hadoop集群的资源。
管理员将一个物理集群划分成若干个Hadoop集群后,用户可将不同类型的应用程序提交到不同Hadoop集群上,这样可避免不同用户或者不同应用程序之间争夺资源,从而达到多用户共享集群的目的。
从集群管理和资源利用率两方面看,这种基于完全隔离的集群划分方法存在诸多问题。
- 从集群管理角度看,多个Hadoop集群会给运维人员造成管理上的诸多不便。
- 多个Hadoop集群会导致集群整体利用率低下,这主要是负载不均衡造成的,比如某个集群非常忙碌时另外一些集群可能空闲,也就是说,多个Hadoop集群无法实现资源共享。
考虑到虚拟集群回收后数据可能丢失,用户通常将虚拟集群中的数据写到外部的HDFS上。如图10-2所示,用户通常仅在虚拟集群上安装MapReduce,至于HDFS,则使用一个外部全局共享的HDFS。很明显,这种部署方法会导致丧失部分数据的本地特性。为了解决该问题,一种更好的方法是在整个集群中只保留一个Hadoop实例,而通过Hadoop调度器将整个集群中的资源划分给若干个队列,并让这些队列共享所有节点上的资源,当前Yahoo!的Capacity Scheduler和Facebook的Fair Scheduler正是采用了这个设计思路。
Hadoop队列管理机制
在学习Capacity Scheduler和Fair Scheduler之前,我们先要了解Hadoop的用户和作业管理机制,这是任何Hadoop可插拔调度器的基础。
Hadoop以队列为单位管理作业、用户和资源,整个Hadoop集群被划分成若干个队列,每个队列被分配一定的资源,且用户只能向对应的一个或者几个队列中提交作业。Hadoop队列管理机制由用户权限管理和系统资源管理两部分组成,下面依次进行介绍。
1.用户权限管理
Hadoop的用户管理模块构建在操作系统用户管理之上,增加了“队列”这一用户组织单元,并通过队列建立了操作系统用户和用户组之间的映射关系。管理员可配置每个队列对应的操作系统用户和用户组(需要注意的是,Hadoop允许一个操作系统用户或者用户组对应一个或者多个队列)也可以配置每个队列的管理员。他可以杀死该队列中任何作业,改变任何作业的优先级等。
Hadoop集群中所有队列需在配置文件mapred-site.xml中设置,这意味着该配置信息不可以动态加载。
【实例】如果一个集群中有四个队列,分别是queueA、queueB、queueC和default,那么可以在mapred-site.xml中配置如下:
<property>
<name>mapred.queue.names</name>
<value>queueA, queueB, queueC, default</value>
<description>Hadoop中所有队列名称</description>
</property>
<property>
<name>mapred.acls.enabled</name>
<value>true</value>
<description>是否启用权限管理功能</description>
</property>
队列权限相关的配置选项在配置文件mapred-queue-acls.xml中设置,这些信息可以动态加载。
【实例】如果规定用户linux_userA和用户组linux_groupA可以向队列queueA中提交作业,用户linux_groupA_admin可以管理(比如杀死任何一个作业或者改变任何作业的优先级)队列queueA,那么可以在mapred-queue-acls.xml中配置如下:
<configuration>
<property>
<name>mapred.queue.queueA.acl-submit-job</name>
<value>linux_userA linux_groupA</value>
</property>
<property>
<name>mapred.queue.queueA.acl-administer-jobs</name>
<value>linux_groupA_admin</value>
</property>
<!--配置其他队列.-->
</configuration>
2.系统资源管理
Hadoop资源管理由调度器完成。管理员可在调度器中设置各个队列的资源容量、各个用户可用资源量等信息,而调度器则按照相应的资源约束对作业进行调度。考虑到系统中的队列信息是在mapred-site.xml中设置的,而队列资源分配信息在各个调度器的配置文件中设置,因此,这两个配置文件中的队列信息应保持一致是十分重要的。如果调度器中的某个队列在mapred-site.xml中没有设置,则意味着该队列中的资源无法得到使用。
通常而言,不同的调度器对资源管理的方式是不同的。接下来将介绍Capacity Scheduler和Fair Scheduler两个调度器的工作原理。
Capacity Scheduler实现
Capacity Scheduler是Yahoo!开发的多用户调度器。它以队列为单位划分资源,每个队列可设定一定比例的资源最低保证和使用上限,同时,每个用户也可设定一定的资源使用上限以防止资源滥用,而当一个队列的资源有剩余时,可暂时将剩余资源共享给其他队列。总之,Capacity Scheduler主要有以下几个特点。
- 容量保证:管理员可为每个队列设置资源最低保证和资源使用上限,而所有提交到该队列的作业共享这些资源。
- 灵活性:如果一个队列中的资源有剩余,可以暂时共享给那些需要资源的队列,而一旦该队列有新的作业提交,则其他队列释放资源后会归还给该队列。相比于HOD调度器,这种资源灵活分配的方式可明显提高资源利用率。
- 多重租赁:支持多用户共享集群和多作业同时运行。为防止单个作业、用户或者队列独占集群中的资源,管理员可为之增加多重约束(比如单个作业同时运行的任务数等)。
- 支持作业优先级:默认情况下,在每个队列中,空闲资源优先分配给最早提交的作业,但也可让其支持作业优先级,这样,优先级高的作业将优先获取资源(两个作业优先级相同时,再按照提交时间优先的原则分配资源)。需要注意的是,当前Capacity Scheduler还不支持资源抢占,也就是说,如果优先级高的作业提交时间晚于优先级低的作业,则高优先级作业需等待低优先级作业释放资源。
Capacity Scheduler功能介绍
Capacity Scheduler是一个多用户调度器。它设计了多层级别的资源限制条件以更好地让多用户共享一个Hadoop集群,比如队列资源限制、用户资源限制、用户作业数目限制等。为了能够更详尽地了解Capacity Scheduler的功能,我们从它的配置文件讲起。Capacity Scheduler有自己的配置文件,即存放在conf目录下的capacity-scheduler.xml。
在Capacity Scheduler的配置文件中,队列queueX的参数Y的配置名称为mapred.capacity-scheduler.queue.queueX.Y,为了简单起见,我们记为Y,则每个队列可以配置的参数如下。
- capacity:队列的资源容量(百分比)。当系统非常繁忙时,应保证每个队列的容量得到满足,而如果每个队列作业较少,可将剩余资源共享给其他队列。注意,所有队列的容量之和应小于100。
- maximum-capacity:队列的资源使用上限(百分比)。由于存在资源共享,因此一个队列使用的资源量可能超过其容量,而最多使用资源量可通过该参数限制。
- supports-priority:是否支持作业优先级。默认情况下,每个队列内部,提交时间早的作业优先获得资源,而如果支持优先级,则优先级高的作业优先获得资源,如果两个作业优先级相同,则再进一步考虑提交时间。
- user-limit-factor:每个用户最多可使用的资源量(百分比)。比如,假设该值为30,则任何时刻,每个用户使用的资源量不能超过该队列容量的30%。
- maximum-initialized-active-tasks:队列中同时被初始化的任务数目上限。通过设置该参数可防止因过多的任务被初始化而占用大量内存。
- maximum-initialized-active-tasks-per-user:每个用户可同时被初始化的任务数目上限。
一个配置文件实例如下:
<configuration>
<property>
<name>mapred.capacity-scheduler.maximum-system-jobs</name>
<value>3000</value>
<description>系统中最多可被初始化的作业数目</description>
</property>
<property>
<name>mapred.capacity-scheduler.maximum-system-jobs</name>
<value>3000</value>
<description>Hadoop集群中最多同时被初始化的作业</description>
</property>
<property>
<name>mapred.capacity-scheduler.queue.myQueue.capacity</name>
<value>30</value>
<description>default队列的可用资源(百分比)</description>
</property>
<!--配置myQueue队列.-->
<property>
<name>mapred.capacity-scheduler.queue.myQueue.maximum-capacity</name>
<value>40</value>
<description>default队列的资源使用上限(百分比)</description>
</property>
<property>
<name>mapred.capacity-scheduler.queue.myQueue.supports-priority</name>
<value>false</value>
<description>是否考虑作业优先级</description>
</property>
<property>
<name>mapred.capacity-scheduler.queue.myQueue.minimum-user-limit-percent</name>
<value>100</value>
<description>每个用户最低资源保障(百分比)</description>
</property>
<property>
<name>mapred.capacity-scheduler.queue.myQueue.user-limit-factor</name>
<value>1</value>
<description>每个用户最多可使用的资源占队列总资源的比例</description>
</property>
<property>
<name>mapred.capacity-scheduler.queue.myQueue.maximum-initialized-active-tasks</name>
<value>200000</value>
<description>default队列可同时被初始化的任务数目</description>
</property>
<property>
<name>mapred.capacity-scheduler.queue.myQueue.maximum-initialized-active-tasks-per-
user</name>
<value>100000</value>
<description>default队列中每个用户可同时被初始化的任务数目</description>
</property>
<property>
<name>mapred.capacity-scheduler.queue.myQueue.init-accept-jobs-factor</name>
<value>10</value>
<description>default队列中可同时被初始化的作业数目,即该值与(maximum-system-jobs*
queue-capacity)的乘积</description>
</property>
<!--配置myQueue队列.-->
</configuration>
从上面这些参数可以看出,Capacity Scheduler将整个系统资源分成若干个队列,且每个队列有较为严格的资源使用限制,包括每个队列的资源容量限制、每个用户的资源量限制等。通过这些限制,Capacity Scheduler将整个Hadoop集群逻辑上划分成若干个拥有相对独立资源的子集群,而由于这些子集群实际上共用大集群中的资源,因此可以共享资源,相对于HOD而言,提高了资源利用率且降低了运维成本。
Capacity Scheduler实现
对于Capacity Scheduler而言,JobTracker启动时,会自动加载调度器类org.apache.hadoop.mapred.CapacityTaskScheduler(管理员需在参数mapred.jobtracker.taskScheduler中指定),而CapacityTaskScheduler启动时会加载自己的配置文件capacity-scheduler.xml,并向JobTracker注册监听器以随时获取作业变动信息。待调度器启动完后,用户可以提交作业。如图10-3所示,一个作业从提交到开始调度所经历的步骤大致如下:
步骤1 用户通过Shell命令提交作业后,JobClient会将作业提交到JobTracker端。
步骤2 JobTracker通过监听器机制,将新提交的作业同步给Capacity Scheduler中的监听器JobQueuesManager;JobQueuesManager收到新作业后将作业添加到等待队列中,由JobInitializationPoller线程按照一定的策略对作业进行初始化。
步骤3 某一时刻,一个TaskTracker向JobTracker汇报心跳,且它心跳信息中要求JobTracker为其分配新的任务。
步骤4 JobTracker检测到TaskTracker可以接收新的任务后,调用CapacityTaskScheduler.assignTasks()函数为其分配任务。
步骤5 JobTracker将分配到的新任务返回给对应的TaskTracker。
接下来将重点介绍作业初始化和作业调度相关实现。
2.作业初始化
一个作业经初始化后才能够进一步得到调度器的调度而获取计算资源,因此,作业初始化是作业开始获取资源的前提。一个初始化的作业会占用JobTracker内存,因此需防止大量不能立刻得到调度的作业被初始化而造成内存浪费。Capacity Scheduler通过优先初始化那些最可能被调度器调度的作业和限制用户初始化作业数目来限制内存使用量。
由于作业经初始化后才能得到调度,因此,如果任务初始化的速度慢于被调度速度,则可能会产生空闲资源等待任务的现象。为了避免该问题,Capacity Scheduler总会过量初始化一些任务,从而让一部分任务处于等待资源的状态。
Capacity Scheduler中作业初始化由线程JobInitializationPoller完成。该线程由若干个(可通过参数mapred.capacity-scheduler.init-worker-threads指定,默认是5)工作线程JobInitializationThread组成,每个工作线程负责一个或者多个队列的作业初始化工作。作业初始化流程如下:
步骤1 用户将作业提交到JobTracker端后,JobTracker会向所有注册的监听器广播该作业信息;Capacity Scheduler中的监听器JobQueuesManager收到新作业添加的信息后,检查是否能够满足以下三个约束,如果不满足,则提示作业初始化失败,否则将该作业添加到对应队列的等待作业列表中:
- 该作业的任务数目不超过maximum-initialized-active-tasks-per-user。
- 队列中等待初始化和已经初始化的作业数目不超过(init-accept-jobs-factor)×(maximum-system-jos)×capacity/100。
- 该用户等待初始化和已经初始化的作业数目不超过[(maximum-system-jobs)×capacity/100.0×(minimum-user-limit-percent)100.0]×(init-accept-jobs-factor)。
步骤2 在每个队列中,按照以下策略对未初始化的作业进行排序:如果支持作业优先级(supports-priority为true),则按照FIFO策略(先按照作业优先级排序,再按照到达时间排序)排序,否则,按照作业到达时间排序。每个工作线程每隔一段时间(可通过参数mapred.capacity-scheduler.init-poll-interval设定,默认是3 000毫秒)遍历其对应的作业队列,并选出满足以下几个条件的作业:
- 队列已初始化作业数目(正运行的作业数目与已初始化但未运行作业数目之和)不超过[(maximum-system-jobs)×capaity/100.0]。
- 队列中已初始化任务数目不超过maximum-initialized-active-tasks。
- 该用户已经初始化作业数目不超过[(maximum-system-jobs)×capacity/100.0×(minimum-user-limit-percent)/100.0]。
- 该用户已经初始化的任务数目不超过maximum-initialized-active-tasks-per-user。
步骤3 调用JobTracker.initJob()函数对筛选出来的作业进行初始化。
3.任务调度
每个TaskTracker周期性向JobTracker发送心跳汇报任务进度和资源使用情况,并在出现空闲资源时请求分配新任务。当需要为某个TaskTracker分配任务时,JobTracker会调用调度器的assignTasks函数为其返回一个待运行的任务列表。对于Capacity Scheduler而言,该assignTasks函数由类CapacityTaskScheduler实现。其主要工作流程如图10-4所示,主要分为三个步骤:
步骤1 更新队列资源使用量。在选择任务之前,需要更新各个队列的资源使用信息,以便根据最新的信息进行调度。更新的信息包括队列资源容量、资源使用上限、正在运行的任务和已经使用的资源量等。
步骤2 选择Map Task。Hadoop调度器通常采用三级调度策略,即依次选择一个队列、该队列中的一个作业和该作业中的一个任务,Capacity Scheduler也是如此。下面分别介绍Capacity Scheduler采用的调度策略。
- 选择队列:Capacity Scheduler总是优先将资源分配给资源使用率最低的队列,即numSlotsOccupied/capacity最小的队列,其中numSlotsOccupied表示队列当前已经使用的slot数目,capacity为队列的资源容量。
-
选择作业:在队列内部,待调度作业排序策略与待初始化作业排序策略一样,即如果支持作业优先级(supports-priority为true),则按照优先级策略排序,否则,按照作业到达时间排序。当选择任务时,调度器会依次遍历排好序的作业,并检查当前TaskTracker剩余资源是否足以运行当前作业的一个任务(注意,一个任务可能同时需要多个slot),如果满足,则从该作业中选择一个任务添加到已分配任务列表中。任务分配过程如图10-5所示。
Capacity Scheduler调度过程用到了以下几个机制。
机制1:大内存任务调度。
Capacity Scheduler提供了对大内存任务的调度机制。默认情况下,Hadoop假设所有任务是同质的,任何一个任务只能使用一个slot,考虑到一个slot代表的内存是一定的,因此这并没有考虑那些内存密集型的任务。为解决该问题,Capacity Scheduler可根据任务的内存需求量为其分配一个或者多个slot。如果当前TaskTracker空闲slot数目小于作业的单个任务的需求量,调度器会让TaskTracker为该作业预留当前空闲的slot,直到累计预留的slot数目满足当前作业的单个任务需求,此时,才会真正地将该任务分配给TaskTracker执行。
默认情况下,大内存任务调度机制是关闭的,只有当管理员配置了mapred.cluster.map.memory.mb、mapred.cluster.reduce.memory.mb、mapred.cluster.max.map.memory.mb、mapred.cluster.max.reduce.memory.mb四个参数后,才会支持大内存任务调度,此时,调度器会按照以下公式计算每个Map Task需要的slot数(Reduce Task计算方法类似):
「{mapred.cluster.map.memory.mb}」
机制2:通过任务延迟调度以提高数据本地性。
当任务的输入数据与分配到的slot位于同一个节点或者机架时,称该任务具有数据本地性。数据本地性包含三个级别,分别是node-local(输入数据和空闲slot位于同一个节点)、rack-local(输入数据和空闲slot位于同一个机架)和off-switch(输入数据和空闲slot位于不同机架)。由于为空闲slot选择具有本地性的任务可避免通过网络远程读取数据进而提高数据读取效率,所以Hadoop会优先选择node-local的任务,然后是rack-local,最后是off-switch。
为了提高任务的数据本地性,Capacity Scheduler采用了作业延迟调度的策略:当选中一个作业后,如果在该作业中未找到满足数据本地性的任务,则调度器会让该作业跳过一定数目的调度机会,直到找到一个满足本地性(node-local或rack-local)的任务或者达到跳过次数上限(requiredSlots×localityWaitFactor),其中,localityWaitFactor可通过参数mapreduce.job.locality.wait.factor配置,默认情况下,计算方法如下:
localityWaitFactor=min{jobNodes/clusterNodes,1}
机制3:批量任务分配。
为了加快任务分配速度,Capacity Scheduler支持批量任务分配,管理员可通过参数mapred.capacity-scheduler.maximum-tasks-per-heartbeat(默认是Short.MAX_VALUE)指定一次性为一个TaskTracker分配的最多任务数。需要注意的是,该机制倾向于将任务分配给优先发送心跳的TaskTracker,也就是说,当系统slot数目大于任务需要的数目时,会使得任务集中运行在少数几个节点上,且同一个作业的任务也可能会集中分配到几个节点上,这不利于负载均衡。
步骤3:选择Reduce Task。
相比于Map Task, Reduce Task选择机制就简单多了。它仅采用了大内存任务调度策略,至于其他策略,如任务延迟调度(Reduce Task没有数据本地性)和批量任务分配等,不再采用。调度器只要找到一个合适的Reduce Task便可以返回。
多层队列调度
在Hadoop 1.0中,队列以平级结构组织在一起,且每个队列不能再进一步划分。但在实际应用中,每个队列可能代表一个部门,该部门可能又进一步划分成若干个子部门或者将自己的资源按照应用类型划分到不同队列中,最终形成一个树形组织结构。一个典型的例子如图10-6所示。
为了支持这种多层队列组织方式,在Hadoop 2.0中,Capacity Scheduler在现有实现基础上添加了对多层队列的支持,主要特性如下:
- 整个组织结构由中间队列和叶子队列组成,其中,中间队列包含若干子队列,而叶子队列没有再分解的队列。
- 任何队列可划分成若干子队列,但子队列容量之和不能超过父队列总容量。
- 用户作业只能将作业提交到某个叶子队列中。
- 当某个队列出现空闲资源时,优先共享给同父亲的其他子队列。以图10-6为例,当队列C11中有剩余资源时,首先共享给C12,其次是C2,最后才是A1,A2和B1。
- 进行任务调度时,仅考虑叶子队列,且采用的调度机制与现有的调度机制一致。
Fair Scheduler实现
Fair Scheduler是Facebook开发的多用户调度器。与Capacity Scheduler类似,它以资源池(与队列一个概念)为单位划分资源,每个资源池可设定一定比例的资源最低保证和使用上限,同时,每个用户也可设定一定的资源使用上限以防止资源滥用;当一个资源池的资源有剩余时,可暂时将剩余资源共享给其他资源池。当然,Fair Scheduler也存在很多与Capacity Scheduler不同之处,主要体现在以下几个方面。
资源公平共享:在每个资源池中,Fair Scheduler可选择按照FIFO或者Fair策略为作业分配资源,其中Fair策略是一种基于最大最小公平算法实现的资源多路复用方式,默认情况下,每个队列内部采用该方式分配资源。这意味着,如果一个队列中有两个作业同时运行,则每个作业可得到1/2的资源;如果三个作业同时运行,则每个作业可得到1/3的资源。
支持资源抢占:当某个资源池中有剩余资源时,调度器会将这些资源共享给其他资源池;而当该资源池中有新的作业提交时,调度器要为它回收资源。为了尽可能降低不必要的计算浪费,调度器采用了先等待再强制回收的策略,即如果等待一段时间后尚有未归还的资源,则会进行资源抢占:从那些超额使用资源的队列中杀死一部分任务,进而释放资源。
负载均衡:Fair Scheduler提供了一个基于任务数目的负载均衡机制。该机制尽可能将系统中的任务均匀分配到各个节点上。此外,用户也可以根据自己的需要设计负载均衡机制。
任务延时调度:Fair Scheduler提供了一种基于延时等待的调度机制以提高任务的数据本地性。该机制通过暂时减少个别作业的资源量而提高系统整体吞吐率。
降低小作业调度延迟:由于采用了最大最小公平算法,小作业可以优化获取资源并运行完成。
Fair Scheduler功能介绍
与Capacity Scheduler类似,Fair Scheduler也是一个多用户调度器,它同样添加了多层级别的资源限制条件以更好地让多用户共享一个Hadoop集群,比如队列资源限制、用户作业数目限制等。然而,由于Fair Scheduler增加了很多新的特性,因此它的配置选项更多。为了能够更详尽地了解Fair Scheduler的功能,我们从它的配置文件讲起。Fair Scheduler的配置选项包括两部分,其中一部分在mapred-site.xml中,另外一部分在自己的配置文件中,默认情况下为存放在conf目录下的fair-scheduler.xml。
1.配置文件mapred-site.xml
启用Fair Scheduler时,可在配置文件mapred-site.xml中增加以下几个配置选项(其中,mapred.jobtracker.taskScheduler是必填的,其他自选)。
❑mapred. jobtracker.taskScheduler:采用的调度器所在的类,即为org.apache.hadoop.mapred.FairScheduler。
❑mapred. fairscheduler.poolnameproperty:资源池命名方式,包含以下三种命名方式。
〇user.name:默认值,一个UNIX用户对应一个资源池。
〇group.name:一个UNIX用户组对应一个资源池。
〇mapred.job.queue.name:一个队列对应一个资源池。如果设置为该值,则与Capacity Scheduler一样。
❑mapred. fairscheduler.allocation.file:Fair Scheduler配置文件所在位置,默认是$HADOOP_HOME/conf/fair-scheduler.xml。
❑mapred. fairscheduler.preemption:是否支持资源抢占,默认为false。
❑mapred. fairscheduler.preemption.only.log:是否只打印资源抢占日志,并不真正进行资源抢占。打开该选项可用于调试。
❑mapred. fairscheduler.assignmultiple:是否在一次心跳中同时分配Map Task和ReduceTask,默认为true。
❑mapred. fairscheduler.assignmultiple.maps:一次心跳最多分配的Map Task数目,默认是-1,表示不限制。
❑mapred. fairscheduler.assignmultiple.reduces:一次心跳最多分配的Reduce Task数目,默认是-1,表示不限制。
❑mapred. fairscheduler.sizebasedweight:是否按作业大小调整作业权重。将该参数置为true后,调度器会根据作业长度(任务数目)调整作业权重,以让长作业获取更多资源,默认是false。
❑mapred. fairscheduler.locality.delay.node:为了等待一个满足node-local的slot,作业可最长等待时间。
❑mapred. fairscheduler.locality.delay.rack:为了等待一个满足rack-local的slot,可最长等待时间。
❑mapred. fairscheduler.loadmanager:可插拔负载均衡器。用户可通过继承抽象类LoadManager实现一个负载均衡器,以决定每个TaskTracker上运行的Map Task和Reduce Task数目,默认实现是CapBasedLoadManager,它将集群中所有Task按照数量平均分配到各个TaskTracker上。
❑mapred. fairscheduler.taskselector:可插拔任务选择器。用户可通过继承TaskSelector抽象类实现一个任务选择器,以决定对于给定一个TaskTracker,为其选择作业中的哪个任务。具体实现时可考虑数据本地性,推测执行等机制。默认实现是DefaultTaskSelector,它使用了JobInProgress中提供的算法,具体可参考第6章。
❑mapred. fairscheduler.weightadjuster:可插拔权重调整器。用户可通过实现WeightAdjuster接口编写一个权重调整器,以动态调整运行作业的权重。
2.配置文件fair-scheduler.xml
fair-scheduler. xml是Fair Scheduler的配置文件,管理员可为每个pool添加一些资源约束以限制资源使用。对于每个pool,用户可配置以下几个选项。
❑minMaps:最少保证的Map slot数目,即最小资源量。
❑maxMaps:最多可以使用的Map slot数目。
❑minReduces:最少保证的Reduce slot数目,即最小资源量。
❑maxReduces:最多可以使用的Reduce slot数目。
❑maxRunningJobs:最多同时运行的作业数目。通过限制该数目,可防止超量MapTask同时运行时产生的中间输出结果撑爆磁盘。
❑minSharePreemptionTimeout:最小共享量抢占时间。如果一个资源池在该时间内使用的资源量一直低于最小资源量,则开始抢占资源。
❑schedulingMode:队列采用的调度模式,可以是FIFO或者Fair。
管理员也可为单个用户添加maxRunningJobs属性限制其最多同时运行的作业数目。此外,管理员也可通过以下参数设置以上属性的默认值。
❑poolMaxJobsDefault:资源池的maxRunningJobs属性的默认值。
❑userMaxJobsDefault:用户的maxRunningJobs属性的默认值。
❑defaultMinSharePreemptionTimeout:资源池的minSharePreemptionTimeout属性的默认值。
❑defaultPoolSchedulingMode:资源池的schedulingMode属性的默认值。
❑fairSharePreemptionTimeout:公平共享量抢占时间。如果一个资源池在该时间内使用资源量一直低于公平共享量的一半,则开始抢占资源。
❑defaultPoolSchedulingMode:Pool的schedulingMode属性的默认值。
【实例】假设要为一个Hadoop集群增加三个资源池poolA、poolB和default,且规定普通用户最多可同时运行40个作业,但用户userA最多可同时运行400个作业,那么可在fair-scheduler.xml中进行如下配置:
<allocations>
<pool name="poolA">
<minMaps>100</minMaps>
<maxMaps>150</maxMaps>
<minReduces>50</minReduces>
<maxReduces>100</maxReduces>
<maxRunningJobs>200</maxRunningJobs>
<minSharePreemptionTimeout>300</minSharePreemptionTimeout>
<weight>1.0</weight>
</pool>
<pool name="poolB">
<minMaps>80</minMaps>
<maxMaps>80</maxMaps>
<minReduces>50</minReduces>
<maxReduces>50</maxReduces>
<maxRunningJobs>30</maxRunningJobs>
<minSharePreemptionTimeout>500</minSharePreemptionTimeout>
<weight>1.0</weight>
</pool>
<pool name="default">
<minMaps>0</minMaps>
<maxMaps>10</maxMaps>
<minReduces>0</minReduces>
<maxReduces>10</maxReduces>
<maxRunningJobs>50</maxRunningJobs>
<minSharePreemptionTimeout>500</minSharePreemptionTimeout>
<weight>1.0</weight>
</pool>
<user name="userA">
<maxRunningJobs>400</maxRunningJobs>
</user>
<userMaxJobsDefault>40</userMaxJobsDefault>
<fairSharePreemptionTimeout>6000</fairSharePreemptionTimeout>
</allocations>
Fair Scheduler实现
1.Fair Scheduler基本设计思想
Fair Scheduler核心设计思想是基于资源池的最小资源量和公平共享量进行任务调度。其中,最小资源量是管理员配置的,而公平共享量是根据队列或作业权重计算得到的。资源分配具体过程如下:
步骤1 根据最小资源量将所有系统中所有slot分配给各个资源池。如果某个资源池实际需要的资源量小于它的最小资源量,则只需将实际资源需求量分配给它即可。
步骤2 根据资源池的权重将剩余的资源分配给各个资源池。
步骤3 在各个资源池中,按照作业权重将资源分配给各个作业,最终每个作业可以分到的资源量即为作业的公平共享量。其中,作业权重是由作业优先级转换而来的,它们的映射关系如表10-1所示。
【实例】如图10-7所示,假设一个Hadoop集群中共有100有slot(为了简单,不区分Map或者Reduce slot)和四个资源池(依次为P1、P2、P3和P4),它们的最小资源量依次为:25、19、26和28。如图10-7 a所示,在某一时刻,四个资源池实际需要的资源量(与未运行的任务数目相关)依次为20、26、37和30,则资源分配过程如下:
步骤1 根据最小资源量将资源分配各个资源池。对于资源池P1而言,由于它实际需要的资源量少于其最小资源量,因此只需将它实际需要的资源分配给它即可,如图10-7b和图10-7c所示。经过这一轮分配,四个资源池获得的slot数目依次为:20、19、26和28。
步骤2 经过第一轮分配后,尚剩余7个slot,此时需按照权重将剩余资源分配给尚需资源的资源池P2,P3和P4。不妨假设这三个资源池的权重依次为2.0、3.0和2.0,则它们额外分得的slot数目依次为2、3和2。这样,如图10-7 d所示,三个资源池最终获得的资源总量依次为:21、29和30。
步骤3 在各个资源池内部,按照作业的权重将资源分配给各个作业。以P4为例,假设P4中有三个作业,优先级依次为VERY_HIGH, NORMAL和NORMAL,则它们可能获得的slot数目依次为:这三个值即为对应作业的公平共享量。
2.Fair Scheduler实现
Fair Scheduler内部组织结构如图10-8所示。涉及的模块有:配置文件加载模块、作业监听模块、状态更新模块和调度模块。下面分别介绍这几个模块。
- 配置文件加载模块:由类PoolManager完成,负责将配置文件fair-scheduler.xml中的信息加载到内存中。
- 作业监听模块:Fair Scheduler启动时会向JobTracker注册作业监听器JobListener,以便能够随时获取作业变化信息。
- 状态更新模块:由线程UpdateThread完成,该线程每隔mapred.fairscheduler.update.interval(默认是500毫秒)时间更新一次队列和作业的信息,以便将最新的信息提供给调度模块进行任务调度。
- 调度模块:当某个TaskTracker通过心跳请求任务时,该模块根据最新的队列和作业的信息为该TaskTracker选择一个或多个任务。
前面提到了作业公平共享量的计算方法,而调度器的任务就是将与公平共享量相等的资源分配给作业。在实际的Hadoop集群中,由于资源使用情况是动态变化的,且任务运行的时间长短不一,因此时刻保证每个作业实际分到的资源量与公平共享量一致是不可能的。为此,0.20.X版本采用了基于缺额的调度策略。该策略采用了贪心算法以保证尽可能公平地将资源分配给各个作业。
缺额(jobDeficit)是作业的公平共享量与实际分配到的资源量之间的差值。它反映了资源分配过程中产生的“理想与现实的差距”。调度器在实际资源分配时,应保证所有作业的缺额尽可能小。缺额的基本计算公式为:
jobDeficit=jobDeficit+(jobFairShare-runningTasks)×timeDelta
其中,jobFairShare为作业的公平共享量,runningTasks为作业正在运行的任务数目(对应实际分配到的资源量),timeDelta为缺额更新时间间隔。
从上面公式可以看出,作业缺额是随着时间积累的。在进行资源分配时,调度器总是优先将空闲资源分配给当前缺额最大的作业。如果在一段时间内一个作业一直没有获得资源,则它的缺额会越来越大,最终缺额变得最大,从而可以获得资源。这种基于缺额的调度机制并不能保证作业时时刻刻均能获得与其公平共享量对应的资源,但如果所有作业的运行时间足够长,则该机制能够保证每个作业实际平均分配到的资源量逼近它的公平共享量。
选择队列
Fair Scheduler选择队列时,在不同的条件下采用不同的策略,具体如下:
- 当存在资源使用量小于最小资源量的资源池时,优先选择资源使用率最低的资源池,即runningTasks/minShare最小的资源池,其中runningTasks是资源池当前正在运行的Task数目(也就是正在使用的slot数目),minShare为资源池的最小资源量。
- 否则,选择任务权重比最小的资源池,其中资源池的任务权重比(tasksToWeightRatio)定义如下:
tasksToWeightRatio=runningTasks/poolWeight
其中,runningTasks为资源池中正在运行的任务数目;poolWeight是管理员配置的资源池权重。
选择作业
选定一个资源池后,Fair Scheduler总是优先将资源分配给资源池中任务权重比最小的作业,其中作业的任务权重比的计算方法与资源池的一致,即为该作业正在运行的任务数目与作业权重的比值。但需要注意的是,作业权重比是由作业优先级转换而来的。此外,Fair Scheduler为管理员提供了另外两种改变作业的权重的方法:
- 将参数mapred.fairscheduler.sizebasedweight置为true,则计算作业权重时会考虑作业长度,具体计算方法如下:
jobWeight=jobWeightByPriority×log2(runnableTasks)
其中,jobWeightByPriority是通过优先级转化来的权重;runnableTasks是作业正在运行和尚未运行的任务之和。
- 通过实现WeightAdjuster接口,编写一个权重调整器,并通过参数mapred.fairscheduler.weightadjuster使之生效,此时,作业权重即为WeightAdjuster中方法adjustWeight的返回值。
3.Fair Scheduler优化机制
我们知道,提高Map Task的数据本地性可提高作业运行效率。为了提高数据本地性,Fair Scheduler采用了延时调度机制:当出现一个空闲slot时,如果选中的作业没有node-local或者rack-local的任务,则暂时把资源让给其他作业,直到找到一个满足数据本地性的任务或者达到一个时间阈值,此时不得不为之选择一个非本地性的任务。
为了实现延时调度,Fair Scheduler为每个作业j维护三个变量:level、wait和skipped,分别表示最近一次调度时作业的本地性级别(0、1、2分别对应node-local、rack-local和off-switch)、已等待时间和是否延时调度,并依次初始化为:j.level=0、j.wait=0和j.skipped=false。此外,当不存在node-local任务时,为了尽可能选择一个本地性较好的任务,Fair Scheduler采用了双层延迟调度算法:为了找到一个node-local任务最长可等待W1或者进一步等待W2找一个rack-local任务。
机制2:负载均衡。
Fair Scheduler为用户提供了一个可扩展的负载均衡器:CapBasedLoadManager。它会将系统中所有任务按照数量平均分配到各个节点上。当然,用户也可通过继承抽象类LoadManager实现自己的负载均衡器。
机制3:资源抢占。
当一个资源池有剩余资源时,Fair Scheduler会将这些资源暂时共享给其他资源池;而一旦该资源池有新作业提交,调度器则为它回收资源。如果在一段时间后该资源池仍得不到本属于自己的资源,则调度器会通过杀死任务的方式抢占资源。Fair Scheduler同时采用了两种资源抢占方式:最小资源量抢占和公平共享量抢占。如果一个资源池的最小资源量在一定时间内得不到满足,则会从其他超额使用资源的资源池中抢占资源,这就是最小资源量抢占;而如果一定时间内一个资源池的公平共享量的一半得不到满足,则该资源池也会从其他资源池中抢占,这称为公平共享量抢占。
进行资源抢占时,调度器会选出超额使用资源的资源池,并从中找出启动时间最早的任务,再将其杀掉,进而释放资源。
Fair Scheduler与Capacity Scheduler对比
小结
本章介绍了几种常见的多用户作业调度器。相比于FIFO调度器,多用户调度器能够更好地满足不同应用程序的服务质量要求。
前主要有两种多用户作业调度器的设计思路:第一种是在一个物理集群上虚拟多个Hadoop集群,这些集群各自拥有全套独立的Hadoop服务,比如JobTracker、TaskTracker等,典型的代表是HOD(Hadoop On Demand)调度器;另一种是扩展Hadoop调度器,使之支持多个队列多用户,典型的代表是Yahoo!的Capacity Scheduler和Facebook的Fair Scheduler。
HOD调度器是一个在共享物理集群上管理若干个Hadoop集群的工具,它可以帮助用户在一个共享物理集群上快速搭建若干个独立的虚拟Hadoop集群。由于该调度器会产生多个独立的小集群,因此会增加集群运维成本和降低资源利用率。
为了克服HOD的缺点,Capacity Scheduler和Fair Scheduler出现了。它们通过扩展调度器功能,在不拆分集群的前提下,将集群中的资源和用户分成若干个队列,并为每个队列分配一定量的资源,同时添加各种限制以防止用户或者队列独占资源。由于这种方式能够保证只有一个Hadoop集群,因此可大大降低运维成本,同时很容易实现资源共享,进而可明显提高资源利用率。