Hadoop v3.1 YARN和Kubernetes v1.9对GPU调度支持的比较
1.0. namespaces
简单来说,namespaces是linux内核于2008年提供的一套系统资源(用户、文件系统、网络等)隔离技术。
例如,user namespaces可以实现container user于host user的映射。直观来看,container里的root其实并不是host里的root,它没有权限修改删除任何host里的系统root权限文件。
Docker提供user namespaces功能,但是需要做以下配置才生效。
- 修改/etc/default/docker文件,添加DOCKER_OPTS=”--userns-remap=default”
- 重启docker服务
- docker会在host上创建dockremap用户
Hadoop 自v2.7.0增加了DockerContainerExecutor(DCE),然而由于该实现不支持user namespaces,最终在v3.0.0作废。现在Hadoop采用的方案是LinuxContainerExecutor+DockerContainerRuntime。官方文档的解释如下。
“Administrators should be aware that DCE doesn’t currently provide user name-space isolation. This means, in particular, that software running as root in the YARN container will have root privileges in the underlying NodeManager. ”
参考
Docker 使用 Linux namespace 隔离容器的运行环境
Isolate containers with a user namespace
Docker背后的内核知识——Namespace资源隔离
1.1. cgroups
简单来说,cgroups是linux内核于2006年提供的一种可以用来限制、管理和隔离进程所需系统物理资源(CPU、内存、GPU、网络等)的机制。Cent0S 7的默认挂载目录如下。
每个目录称为subsystem,独立控制一种资源。
cgroups具有树状层级结构。例如,Hadoop YARN配置cgroups后,会在相应subsystem下创建hierarchy, container_ID可以视作代表一组容器进程的task。
/sys/fs/cgroup/devices/yarn/container_1521423483464_0004_01_000001/devices.denyCGroups
cgroups对CPU的控制通过cpu,cpuacct subsystem来实现。作为一个简单的例子,如果想要限制一组进程的CPU使用率,可以做如下配置。
- 在cpu,cpuacct subsystem下面通过cgcreate创建一个新的子cgroup
- 在这个新的子cgroup里通过cgset cpu.cfs_quota_us=50000
由于cpu,cpuacct subsystem的cpu.cfs_quota_us默认值是100000,当上述子 cgroup的进程启动时,则最多只能占用CPU 50%的时间。当然实际中,还需要考虑CPU的核数。
cgroups对GPU的控制则是通过devices subsystem。
参考
Docker背后的内核知识——cgroups资源限制
Behind Docker - Quick look into cgroups
2. Hadoop v3.1 GPU
一个简单的Hadoop集群示意图如下。
2.1. ContainerExecutor初始化
YARN需要配置containerExecutor类型是LinuxContainerExecutor。
该类初始化过程中,会生成YARN当前支持所有资源的cgroups handlers—ResourceHandlerChain;还会根据runtimeType生成dockerContainerRuntime。
启动容器时,会调用初始化生成的ResourceHandlerChain,以完成当前容器的各项资源的cgroups设置。当然,如果容器类型是docker,则不必额外设置。这里,就包括对GPU的devices cgroup设置。
参考
源码
2.2. GPU发现与分配
Hadoop 3.1添加了resource plugin的新模块,可以支持GPU和FPGA。
YARN的每个nodeManager使用nvidia-smi -q -x发现GPU,并向resourceManager上报。
启动nodeManager节点时,日志会显示如下信息。
2018-03-16 11:17:21,867 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDiscoverer: Trying to discover GPU information ...
2018-03-16 11:17:22,173 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDiscoverer: === Gpus in the system ===
Driver Version:381.04
ProductName=Tesla K40m, MinorNumber=0, TotalMemory=11439MiB, Utilization=0.0%
ProductName=Tesla K40m, MinorNumber=1, TotalMemory=11439MiB, Utilization=0.0%
ProductName=Tesla K40m, MinorNumber=2, TotalMemory=11439MiB, Utilization=0.0%
ProductName=Tesla K40m, MinorNumber=3, TotalMemory=11439MiB, Utilization=0.0%
2018-03-16 15:55:01,787 INFO org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM: Rolling master-key for container-tokens, got key with id 1262359944
2018-03-16 15:55:01,788 INFO org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl: Registered with ResourceManager as hpc-6-044:57667 with total resource of <memory:8192, vCores:8, yarn.io/gpu: 4>
YARN 3.1 的UI里面也添加了GPU信息如下。
实际上,nvidia-docker自己也有REST API来发现GPU。
[root@hpc-6-044 hadoop]# curl -s http://localhost:3476/v1.0/gpu/info/json
{"Version":{"Driver":"381.04","CUDA":"8.0"},"Devices":[{"UUID":"GPU-04a37fdb-8792-0fa7-c681-0ff8fbf77d6d","Path":"/dev/nvidia0","Model":"Tesla K40m","Power":235,"CPUAffinity":0,"PCI":{"BusID":"0000:03:00.0","BAR1":16384,"Bandwidth":15760},"Clocks":{"Cores":875,"Memory":3004},"Topology":[{"BusID":"0000:04:00.0","Link":3}],"Family":"Kepler","Arch":"3.5","Cores":2880,"Memory":{"ECC":true,"Global":11439,"Shared":48,"Constant":64,"L2Cache":1536,"Bandwidth":288384}},{"UUID":"GPU-282e08ed-df8f-d6ad-40c1-7f0afa5931a1","Path":"/dev/nvidia1","Model":"Tesla K40m","Power":235,"CPUAffinity":0,"PCI":{"BusID":"0000:04:00.0","BAR1":16384,"Bandwidth":15760},"Clocks":{"Cores":875,"Memory":3004},"Topology":[{"BusID":"0000:03:00.0","Link":3}],"Family":"Kepler","Arch":"3.5","Cores":2880,"Memory":{"ECC":true,"Global":11439,"Shared":48,"Constant":64,"L2Cache":1536,"Bandwidth":288384}},{"UUID":"GPU-4eb77e26-f97b-5827-9eb8-5b00f5e51cb9","Path":"/dev/nvidia2","Model":"Tesla K40m","Power":235,"CPUAffinity":1,"PCI":{"BusID":"0000:82:00.0","BAR1":16384,"Bandwidth":15760},"Clocks":{"Cores":875,"Memory":3004},"Topology":[{"BusID":"0000:83:00.0","Link":3}],"Family":"Kepler","Arch":"3.5","Cores":2880,"Memory":{"ECC":true,"Global":11439,"Shared":48,"Constant":64,"L2Cache":1536,"Bandwidth":288384}},{"UUID":"GPU-d9aadc5d-68b3-1cd7-e0de-f45367117071","Path":"/dev/nvidia3","Model":"Tesla K40m","Power":235,"CPUAffinity":1,"PCI":{"BusID":"0000:83:00.0","BAR1":16384,"Bandwidth":15760},"Clocks":{"Cores":875,"Memory":3004},"Topology":[{"BusID":"0000:82:00.0","Link":3}],"Family":"Kepler","Arch":"3.5","Cores":2880,"Memory":{"ECC":true,"Global":11439,"Shared":48,"Constant":64,"L2Cache":1536,"Bandwidth":288384}}]}
NodeManager通过GpuDiscoverer这个单例类实现对当前node上GPU的发现。initialize()方法会调用getGpuDeviceInformation(),其返还的GpuDeviceInformation是nvidia_smi_log XML的POJO,接着通过getGpusUsableByYarn()方法解析成预定义好的GpuDevice对象。
分配GPU时,GpuResourceAllocator里的方法都声明成synchronized,且此时会使用包含容器ID的AssignedGpuDevice类(GpuDevice的子类)。分配GPU的流程图如下。
参考
源码
2.3. 配置
Hadoop 3.1暂未发布,需要下载源码,本地使用maven编译。有几点需要注:
- Gcc需要升级到5.x
- Protobuf的版本必须是2.5.0
- 由于Hadoop要求container-executor和container-executor.cfg以及它们所在的父级目录的owner均为root:hadoop,所以在编译时可以强行指定container-executor.cfg的目录,从未避免直接修改$HADOOP_HOME/etc/hadoop/目录的owner所带来的不便。
//container-executor.cfg单独放在/etc/hadoop目录,和hadoop目录隔离
mvn package -Pdist,native -DskipTests -Dtar -Dcontainer-executor.conf.dir=/etc/hadoop
Hadoop 3.1 Docker+GPU的配置主要包括nvidia-docker、GPU resource和cgroups的配置。实际测试中,遇到和cgroups相关的错误,暂未实现对GPU资源的调度。
参考
Using GPU On YARN
Launching Applications Using Docker Containers
Hadoop 3.1 YARN failed with error 'exitCode=255: CGroups: Could not find file to write' when launching a Docker container
3. Kubernetes v1.9 GPU
一个简单的Kubernetes集群示意图如下。
K8S的GPU实现相较Hadoop YARN显得十分简洁。GPU类型定义、发现和分配的逻辑都写在kubernetes/pkg/kubelet/gpu/nvidia/nvidia_gpu_manager.go
其中,GPU数据结构是一个2维map, 外层key是podID,内层key是containerName,值是GPU在Docker容器内的/dev/nvidia目录。
type containerToGPU map[string]sets.String
// podGPUs represents a list of pod to GPU mappings.
type podGPUs struct {
podGPUMapping map[string]containerToGPU
}
func newPodGPUs() *podGPUs {
return &podGPUs{
podGPUMapping: make(map[string]containerToGPU),
}
}
podGPUMapping[podUID][contName].Insert(device)
之所以这样设计,是因为Kubernetes允许1个pod运行多个Docker容器。官方文档的解释如下。
"The primary reason that Pods can have multiple containers is to support helper applications that assist a primary application. Typical examples of helper applications are data pullers, data pushers, and proxies. Helper and primary applications often need to communicate with each other. Typically this is done through a shared filesystem."
类似Hadoop YARN里的GpuResourceAllocator,Kubernetes里的nvidiaGpuManager可以分配GPU, 并且通过sync.Mutex对所有写操作加锁限制。
// nvidiaGPUManager manages nvidia gpu devices.
type nvidiaGPUManager struct {
sync.Mutex
//当前节点上所有的GPU
allGPUs sets.String
//2维map,外层key是podID,内层key是containerName,值是GPU在Docker容器内的/dev/nvidia目录
allocated *podGPUs
defaultDevices []string
dockerClient dockertools.DockerInterface
activePodsLister activePodsLister
}
分配GPU的流程图如下。
虽然Hadoop YARN和Kubernetes存储GPU的数据结构和自身平台结构不同,但在GPU分配时二者流程相似,也都有考虑到GPU数据更新的线程安全问题。
参考
源码
4. Hadoop v3.1 YARN调度
YARN的调度分为FIFO、Capacity和Fair3种。FIFO适用单用户,而Capacity和Fair适用于多用户模式。下面主要来看Capacity调度。
Capacity调度的基本思想是通过一个层级队列实现对集群资源的有效分配。
对应的etc/hadoop/capacity-scheduler.xml配置如下。
<?xml version="1.0"?>
<configuration>
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>prod,dev</value> </property>
<property>
<name>yarn.scheduler.capacity.root.dev.queues</name>
<value>eng,science</value> </property>
<property>
<name>yarn.scheduler.capacity.root.prod.capacity</name>
<value>40</value> </property> <property>
<name>yarn.scheduler.capacity.root.dev.capacity</name>
<value>60</value> </property> <property>
<name>yarn.scheduler.capacity.root.dev.maximum-capacity</name>
<value>75</value> </property> <property>
<name>yarn.scheduler.capacity.root.dev.eng.capacity</name>
<value>50</value> </property> <property>
<name>yarn.scheduler.capacity.root.dev.science.capacity</name>
<value>50</value> </property>
</configuration>
例如,上面这个层级队列中dev队列分得集群60%的资源。dev队列下面有2个子队列eng和science将分得dev队列获得资源的各一半。当dev队列资源紧张时,Capacity调度可以从prod队列获取部分资源填补空缺(所谓弹性队列),而maximun-capacity 配置(75%)是允许其超越60%容量的上限,这样可以防止prod队列资源被全部侵占。用户通过YARN提交application的时候需要指定叶子队列的名字。因而,叶子队列的名字必须是惟一的。
Capacity调度的入口方法如下。该方法首先将备选Nodes随机划分成2组,然后分别遍历2组中的每个Node。
static void schedule(CapacityScheduler cs) throws InterruptedException{
// First randomize the start point
int current = 0;
Collection<FiCaSchedulerNode> nodes = cs.nodeTracker.getAllNodes();
int start = random.nextInt(nodes.size());
// Allocate containers of node [start, end)
for (FiCaSchedulerNode node : nodes) {
if (current++ >= start) {
if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
continue;
}
cs.allocateContainersToNode(node.getNodeID(), false);
}
}
current = 0;
// Allocate containers of node [0, start)
for (FiCaSchedulerNode node : nodes) {
if (current++ > start) {
break;
}
if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
continue;
}
cs.allocateContainersToNode(node.getNodeID(), false);
}
}
allocateContainersToNode方法继续调用allocateContainerOnSingleNode方法如下。
private CSAssignment allocateContainerOnSingleNode(
CandidateNodeSet<FiCaSchedulerNode> candidates, FiCaSchedulerNode node,
boolean withNodeHeartbeat) {
... ...
LeafQueue queue = ((LeafQueue) reservedApplication.getQueue());
//YARN只允许提交application给叶子队列;这里让叶子队列进一步处理container的申请
assignment = queue.assignContainers(getClusterResource(), candidates, new ResourceLimits(labelManager
.getResourceByLabel(RMNodeLabelsManager.NO_LABEL,
getClusterResource())),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
... ...
}
交由提交application时指定的叶子队列来调用assignContainers方法如下。关键的2步分别是:交由application调用assignContainers方法;调用resourceCalculator比较container申请的资源能否得到满足。为了实现GPU资源调度,这里需要将resourceCalculator配置成DominantResourceCalculator。否则默认的DefaultResourceCalculator只会比较内存单一资源。
public CSAssignment assignContainers(Resource clusterResource,
CandidateNodeSet<FiCaSchedulerNode> candidates,
ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
//检查queue的max-capacity limit,检查user的limit等等
... ...
//交由application的assignContainers方法;这里的application的类是FiCaSchedulerApp。
assignment = application.assignContainers(clusterResource,
candidates, currentResourceLimits, schedulingMode, null);
Resource assigned = assignment.getResource();
//调用配置的resourceCalculator比较assigned里的资源是否能得到满足;如果resourceCalculator是DominantResourceCalculator类,那么会比较每项资源,否则,默认只会比较内存单一资源
if (Resources.greaterThan(resourceCalculator, clusterResource, assigned,
Resources.none())) {
return assignment;
}
... ...
}
DominantResourceCalculator类的compare方法会比较所有支持的Resources,确保一个都不能少。
/**
* Compare two resources - if the value for every resource type for the lhs
* is greater than that of the rhs, return 1. If the value for every resource
* type in the lhs is less than the rhs, return -1. Otherwise, return 0
*/
private int compare(Resource lhs, Resource rhs) {
boolean lhsGreater = false;
boolean rhsGreater = false;
int ret = 0;
int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
for (int i = 0; i < maxLength; i++) {
ResourceInformation lhsResourceInformation = lhs
.getResourceInformation(i);
ResourceInformation rhsResourceInformation = rhs
.getResourceInformation(i);
int diff = lhsResourceInformation.compareTo(rhsResourceInformation);
if (diff >= 1) {
lhsGreater = true;
} else if (diff <= -1) {
rhsGreater = true;
}
}
if (lhsGreater && rhsGreater) {
ret = 0;
} else if (lhsGreater) {
ret = 1;
} else if (rhsGreater) {
ret = -1;
}
return ret;
}
目前支持的Resources类型如下。
public class ResourceInformation implements Comparable<ResourceInformation> {
// Known resource types
public static final String MEMORY_URI = "memory-mb";
public static final String VCORES_URI = "vcores";
public static final String GPU_URI = "yarn.io/gpu";
public static final String FPGA_URI = "yarn.io/fpga";
... ...
}
参考
YARN – THE CAPACITY SCHEDULER
Hadoop: Capacity Scheduler
源码
5. Kubernetes v1.9调度
Scheduler调度的入口方法如下。分为2个阶段,分别是Predicate和Prioritizing,对应findNodesThatFit()和PrioritizeNodes()方法。
// Schedule tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError error with reasons.
func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {
... ...
//Predicate
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer, g.equivalenceCache, g.schedulingQueue, g.alwaysCheckAllPredicates)
if err != nil {
return "", err
}
if len(filteredNodes) == 0 {
return "", &FitError{
Pod: pod,
NumAllNodes: len(nodes),
FailedPredicates: failedPredicateMap,
}
}
//如果Predicate只筛选出1个Node,那就不用继续做Prioritizing
if len(filteredNodes) == 1 {
metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
return filteredNodes[0].Name, nil
}
//Prioritizing
priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
if err != nil {
return "", err
}
return g.selectHost(priorityList)
}
5.1. Predicate
第一阶段的Predicate将会从备选Node list中滤除不符合predicate策略的节点。每一个predicate策略都对应一个具体的方法,而且它们的顺序是有区分的。一般,越靠前代表越为重要。这样,如果第一个predicate策略不满足且alwaysCheckAllPredicates==false,就不必再往下检查其余predicate策略,直接宣告当前node不满足调度要求。另外,这些predicate策略也可以手工配置。目前支持的predicate策略如下。
var (
predicatesOrdering = []string{CheckNodeConditionPred, CheckNodeUnschedulablePred,
GeneralPred, HostNamePred, PodFitsHostPortsPred,
MatchNodeSelectorPred, PodFitsResourcesPred, NoDiskConflictPred,
PodToleratesNodeTaintsPred, PodToleratesNodeNoExecuteTaintsPred, CheckNodeLabelPresencePred,
CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred,
MaxAzureDiskVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred,
CheckNodeMemoryPressurePred, CheckNodeDiskPressurePred, MatchInterPodAffinityPred}
)
findNodesThatFit方法会调用workqueue.Parallelize启动多个(同时最多16个)gorutine,并行检查备选Node list里面的所有Nodes。针对每个Node的podFitsOnNode则会按顺序遍历predicate方法做检查。
func findNodesThatFit(
pod *v1.Pod,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
nodes []*v1.Node,
predicateFuncs map[string]algorithm.FitPredicate,
extenders []algorithm.SchedulerExtender,
metadataProducer algorithm.PredicateMetadataProducer,
ecache *EquivalenceCache,
schedulingQueue SchedulingQueue,
alwaysCheckAllPredicates bool,
) ([]*v1.Node, FailedPredicateMap, error) {
var filtered []*v1.Node
failedPredicateMap := FailedPredicateMap{}
... ...
checkNode := func(i int) {
nodeName := nodes[i].Name
fits, failedPredicates, err := podFitsOnNode(
pod,
meta,
nodeNameToInfo[nodeName],
predicateFuncs,
ecache,
schedulingQueue,
alwaysCheckAllPredicates,
equivCacheInfo,
)
if err != nil {
predicateResultLock.Lock()
errs[err.Error()]++
predicateResultLock.Unlock()
return
}
if fits {
filtered[atomic.AddInt32(&filteredLen, 1)-1] = nodes[i]
} else {
predicateResultLock.Lock()
failedPredicateMap[nodeName] = failedPredicates
predicateResultLock.Unlock()
}
}
workqueue.Parallelize(16, len(nodes), checkNode)
... ...
return filtered, failedPredicateMap, nil
}
func podFitsOnNode(
pod *v1.Pod,
meta algorithm.PredicateMetadata,
info *schedulercache.NodeInfo,
predicateFuncs map[string]algorithm.FitPredicate,
ecache *EquivalenceCache,
queue SchedulingQueue,
alwaysCheckAllPredicates bool,
equivCacheInfo *equivalenceClassInfo,
) (bool, []algorithm.PredicateFailureReason, error) {
var (
eCacheAvailable bool
failedPredicates []algorithm.PredicateFailureReason
)
predicateResults := make(map[string]HostPredicate)
... ...
//顺序遍历predicates策略
for _, predicateKey := range predicates.Ordering() {
var (
fit bool
reasons []algorithm.PredicateFailureReason
err error
)
if predicate, exist := predicateFuncs[predicateKey]; exist {
func() {
var invalid bool
if eCacheAvailable {
//优先从equivalence cache查找当前predicate策略匹配结果
}
if !eCacheAvailable || invalid {
//equivalence cache不靠谱,调用当前predicate策略方法
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
if err != nil {
return
}
//更新equivalence cache
}
}()
if !fit {
if !alwaysCheckAllPredicates {
//如果当前predicate策略不满足且alwaysCheckAllPredicates==false,就此打住
break
}
... ...
return len(failedPredicates) == 0, failedPredicates, nil
}
GPU资源的调度属于predicate策略中排序比较靠前的PodFitsResources策略。对应的检查方法中将GPU和CPU、内存等一并考虑。
5.2. Prioritizing
如果predicate筛选的Node list不止一个,Scheduler会继续进行优先级排序,最终选择优先级最高的Node。
PrioritizeNodes方法也会调用workqueue.Parallelize启动多个(同时最多16个)gorutine并行检查通过predicate筛选的Node list里面的所有节点。
// HostPriority represents the priority of scheduling to a particular host, higher priority is better.
type HostPriority struct {
// Name of the host
Host string
// Score associated with the host
Score int
}
// HostPriorityList declares a []HostPriority type.
type HostPriorityList []HostPriority
func PrioritizeNodes(
pod *v1.Pod,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
meta interface{},
priorityConfigs []algorithm.PriorityConfig,
nodes []*v1.Node,
extenders []algorithm.SchedulerExtender,
) (schedulerapi.HostPriorityList, error) {
... ...
//results是一个2维slice
results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))
... ...
processNode := func(index int) {
nodeInfo := nodeNameToInfo[nodes[index].Name]
var err error
for i := range priorityConfigs {
if priorityConfigs[i].Function != nil {
continue
}
results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
if err != nil {
appendError(err)
return
}
}
}
workqueue.Parallelize(16, len(nodes), processNode)
... ...
//result是一个1维slice,由results每列加权相加得到,每个元素是节点nodeName和对应priority分数
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
for i := range nodes {
result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
for j := range priorityConfigs {
result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
}
}
... ...
return result, nil
}
和predicate策略类似,Prioritizing条件也存在多种。不过,不同的是Prioritizing阶段会对每个节点的所有priority条件进行评分,分数存储在一个2维度slice results。最终会以节点为维度进行加权,这样,每个节点就有了一个priority分数,存储在1维slice result里面。
最后,当存在多个节点都是最高分的时候,Scheduler为了避免调度不均衡,引入了一个简单算法如下。
// selectHost takes a prioritized list of nodes and then picks one
// in a round-robin manner from the nodes that had the highest score.
func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList) (string, error) {
if len(priorityList) == 0 {
return "", fmt.Errorf("empty priorityList")
}
sort.Sort(sort.Reverse(priorityList))
maxScore := priorityList[0].Score
firstAfterMaxScore := sort.Search(len(priorityList), func(i int) bool { return priorityList[i].Score < maxScore })
g.lastNodeIndexLock.Lock()
ix := int(g.lastNodeIndex % uint64(firstAfterMaxScore))
g.lastNodeIndex++
g.lastNodeIndexLock.Unlock()
return priorityList[ix].Host, nil
}
例如,假定queue里面有2个待调度的container1和container2,且各自经过Predicate和Prioritizings后得到的result相同。
[[‘Node1’,10],[‘Node2’,10],[‘Node3’,10],[‘Node4’,2],[‘Node5’,1],[‘Node6’,1],[‘Node7’,0]]
这时,采用上述算法得到以下调度结果。
对container1,lastNodeIndex=7, firstAfterMaxScore=3, 故选择7%3=1即[‘Node2’,10]; lastNodeIndex自增1变成8;
对container2, lastNodeIndex=8, firstAfterMaxScore=3, 故选择8%3=2即[Node3’,10];lastNodeIndex自增1变成9;意味下次将调度[‘Node0’,10];
Kubernetes和Hadoop YARN在调度上可谓各有千秋,就其异同点作如下简单小结。
- Hadoop YARN通过层级队列实现多用户的集群资源分配和管理;Kubernetes则通过namespace来达到类似效果,只不过namespace没有层级概念。
- 从源码中可以看出,Hadoop YARN在调度时需要遍历所有备选节点;Kubernetes则通过gorutines并行检查备选节点,当然每个节点还是需要遍历predicate条件直到全部通过或者失败退出。
- Hadoop YARN和Kubernetes在调度时都引入了随机性来确保节点调度均匀。Hadoop YARN是随机分割备选节点列表;Kubernetes则是通过round-robin从一组最优节点中挑选调度节点。
- Hadoop YARN的调度策略虽然也可以配置,但是Kubernetes的整体设计更加灵活可配置。。事实上,Kubernetes在早先版本中,scheduler是放在plugin目录下,其灵活性可见一斑。
- 单纯从代码可读性上来说,Kubernetes这点做的好太多了,不仅代码写的精巧,注释也非常充分。
参考
Kubernetes调度详解
干货 | kube-scheduler原理解析
Priority in Kubernetes API
源码
6. 后记
6.1. Docker容器化
TensorflowOnSpark在Github2200+的关注度,然而,不支持Docker的代价就是环境配置过于繁杂,每个executor节点的runtime从python版本到CUDA drive都要一致。固然通过spark-submit的各种option(例如-py-files或者-jars)可以实现运行环境的自动分发,但终归难以解决各种环境依赖问题等。从作者leewyang的回复来看,Docker容器化也是势在必行。
6.2. Hadoop YARN service API
不知道是不是看到K8S如此成功,YARN终于也要开放提供REST接口,而且可以像提交K8S YAML/JSON一样直接指定Docker镜像和所需CPU/Memory/GPU资源。不过默认Simple Auth情况下的user是dr.who,任务实际上会被拒绝。(可能通过配置可以解决。)
下面这个例子可以用HTTP Post请求替代原始YARN提交命令。
yarn jar /home/junzhang22/hadoop-3.1.0-SNAPSHOT/share/hadoop/yarn/hadoop-yarn-applications-distributedshell-3.1.0-SNAPSHOT.jar \
-jar /home/junzhang22/hadoop-3.1.0-SNAPSHOT/share/hadoop/yarn/hadoop-yarn-applications-distributedshell-3.1.0-SNAPSHOT.jar \
-shell_env YARN_CONTAINER_RUNTIME_TYPE=docker \
-shell_env YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=tensorflow-gpu-boto-test \
-shell_script ./run.sh \
-container_resources memory-mb=3072,vcores=1,yarn.io/gpu=1 \
-num_containers 1
POST /app/v1/services/ HTTP/1.1
Host: 10.1.86.15:8088
Content-Type: application/json
Request:
{
"name": "hello-world4",
"version": "1.0.0",
"description": "hello world example",
"components" :
[
{
"name": "hello",
"number_of_containers": 1,
"artifact": {
"id": "tensorflow-gpu-boto-test",
"type": "DOCKER"
},
"launch_command": "./run.sh",
"resource": {
"cpus": 1,
"memory": "1024",
"additional" : {
"yarn.io/gpu" : {
"value" : 1,
"unit" : ""
}
}
}
}
]
}
Response:
{
"uri": "/v1/services/hello-world4",
"diagnostics": "Application ID: application_1521616970183_0001",
"state": "ACCEPTED"
}