2018-03-25

Hadoop v3.1 YARN和Kubernetes v1.9对GPU调度支持的比较


1.0. namespaces

简单来说,namespaces是linux内核于2008年提供的一套系统资源(用户、文件系统、网络等)隔离技术。
例如,user namespaces可以实现container user于host user的映射。直观来看,container里的root其实并不是host里的root,它没有权限修改删除任何host里的系统root权限文件。
Docker提供user namespaces功能,但是需要做以下配置才生效。

  1. 修改/etc/default/docker文件,添加DOCKER_OPTS=”--userns-remap=default”
  2. 重启docker服务
  3. docker会在host上创建dockremap用户

Hadoop 自v2.7.0增加了DockerContainerExecutor(DCE),然而由于该实现不支持user namespaces,最终在v3.0.0作废。现在Hadoop采用的方案是LinuxContainerExecutor+DockerContainerRuntime。官方文档的解释如下。

“Administrators should be aware that DCE doesn’t currently provide user name-space isolation. This means, in particular, that software running as root in the YARN container will have root privileges in the underlying NodeManager. ”

参考
Docker 使用 Linux namespace 隔离容器的运行环境
Isolate containers with a user namespace
Docker背后的内核知识——Namespace资源隔离

1.1. cgroups

简单来说,cgroups是linux内核于2006年提供的一种可以用来限制、管理和隔离进程所需系统物理资源(CPU、内存、GPU、网络等)的机制。Cent0S 7的默认挂载目录如下。


每个目录称为subsystem,独立控制一种资源。
cgroups具有树状层级结构。例如,Hadoop YARN配置cgroups后,会在相应subsystem下创建hierarchy, container_ID可以视作代表一组容器进程的task。

/sys/fs/cgroup/devices/yarn/container_1521423483464_0004_01_000001/devices.denyCGroups

cgroups对CPU的控制通过cpu,cpuacct subsystem来实现。作为一个简单的例子,如果想要限制一组进程的CPU使用率,可以做如下配置。

  1. 在cpu,cpuacct subsystem下面通过cgcreate创建一个新的子cgroup
  2. 在这个新的子cgroup里通过cgset cpu.cfs_quota_us=50000

由于cpu,cpuacct subsystem的cpu.cfs_quota_us默认值是100000,当上述子 cgroup的进程启动时,则最多只能占用CPU 50%的时间。当然实际中,还需要考虑CPU的核数。

cgroups对GPU的控制则是通过devices subsystem。
参考
Docker背后的内核知识——cgroups资源限制
Behind Docker - Quick look into cgroups

2. Hadoop v3.1 GPU

一个简单的Hadoop集群示意图如下。


2.1. ContainerExecutor初始化

YARN需要配置containerExecutor类型是LinuxContainerExecutor。
该类初始化过程中,会生成YARN当前支持所有资源的cgroups handlers—ResourceHandlerChain;还会根据runtimeType生成dockerContainerRuntime。
启动容器时,会调用初始化生成的ResourceHandlerChain,以完成当前容器的各项资源的cgroups设置。当然,如果容器类型是docker,则不必额外设置。这里,就包括对GPU的devices cgroup设置。


参考
源码

2.2. GPU发现与分配

Hadoop 3.1添加了resource plugin的新模块,可以支持GPU和FPGA。
YARN的每个nodeManager使用nvidia-smi -q -x发现GPU,并向resourceManager上报。
启动nodeManager节点时,日志会显示如下信息。

2018-03-16 11:17:21,867 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDiscoverer: Trying to discover GPU information ...
2018-03-16 11:17:22,173 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDiscoverer: === Gpus in the system ===
Driver Version:381.04
ProductName=Tesla K40m, MinorNumber=0, TotalMemory=11439MiB, Utilization=0.0%
ProductName=Tesla K40m, MinorNumber=1, TotalMemory=11439MiB, Utilization=0.0%
ProductName=Tesla K40m, MinorNumber=2, TotalMemory=11439MiB, Utilization=0.0%
ProductName=Tesla K40m, MinorNumber=3, TotalMemory=11439MiB, Utilization=0.0%
 
2018-03-16 15:55:01,787 INFO org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM: Rolling master-key for container-tokens, got key with id 1262359944
2018-03-16 15:55:01,788 INFO org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl: Registered with ResourceManager as hpc-6-044:57667 with total resource of <memory:8192, vCores:8, yarn.io/gpu: 4>

YARN 3.1 的UI里面也添加了GPU信息如下。


实际上,nvidia-docker自己也有REST API来发现GPU。

[root@hpc-6-044 hadoop]# curl -s http://localhost:3476/v1.0/gpu/info/json
{"Version":{"Driver":"381.04","CUDA":"8.0"},"Devices":[{"UUID":"GPU-04a37fdb-8792-0fa7-c681-0ff8fbf77d6d","Path":"/dev/nvidia0","Model":"Tesla K40m","Power":235,"CPUAffinity":0,"PCI":{"BusID":"0000:03:00.0","BAR1":16384,"Bandwidth":15760},"Clocks":{"Cores":875,"Memory":3004},"Topology":[{"BusID":"0000:04:00.0","Link":3}],"Family":"Kepler","Arch":"3.5","Cores":2880,"Memory":{"ECC":true,"Global":11439,"Shared":48,"Constant":64,"L2Cache":1536,"Bandwidth":288384}},{"UUID":"GPU-282e08ed-df8f-d6ad-40c1-7f0afa5931a1","Path":"/dev/nvidia1","Model":"Tesla K40m","Power":235,"CPUAffinity":0,"PCI":{"BusID":"0000:04:00.0","BAR1":16384,"Bandwidth":15760},"Clocks":{"Cores":875,"Memory":3004},"Topology":[{"BusID":"0000:03:00.0","Link":3}],"Family":"Kepler","Arch":"3.5","Cores":2880,"Memory":{"ECC":true,"Global":11439,"Shared":48,"Constant":64,"L2Cache":1536,"Bandwidth":288384}},{"UUID":"GPU-4eb77e26-f97b-5827-9eb8-5b00f5e51cb9","Path":"/dev/nvidia2","Model":"Tesla K40m","Power":235,"CPUAffinity":1,"PCI":{"BusID":"0000:82:00.0","BAR1":16384,"Bandwidth":15760},"Clocks":{"Cores":875,"Memory":3004},"Topology":[{"BusID":"0000:83:00.0","Link":3}],"Family":"Kepler","Arch":"3.5","Cores":2880,"Memory":{"ECC":true,"Global":11439,"Shared":48,"Constant":64,"L2Cache":1536,"Bandwidth":288384}},{"UUID":"GPU-d9aadc5d-68b3-1cd7-e0de-f45367117071","Path":"/dev/nvidia3","Model":"Tesla K40m","Power":235,"CPUAffinity":1,"PCI":{"BusID":"0000:83:00.0","BAR1":16384,"Bandwidth":15760},"Clocks":{"Cores":875,"Memory":3004},"Topology":[{"BusID":"0000:82:00.0","Link":3}],"Family":"Kepler","Arch":"3.5","Cores":2880,"Memory":{"ECC":true,"Global":11439,"Shared":48,"Constant":64,"L2Cache":1536,"Bandwidth":288384}}]}

NodeManager通过GpuDiscoverer这个单例类实现对当前node上GPU的发现。initialize()方法会调用getGpuDeviceInformation(),其返还的GpuDeviceInformation是nvidia_smi_log XML的POJO,接着通过getGpusUsableByYarn()方法解析成预定义好的GpuDevice对象。

分配GPU时,GpuResourceAllocator里的方法都声明成synchronized,且此时会使用包含容器ID的AssignedGpuDevice类(GpuDevice的子类)。分配GPU的流程图如下。


参考
源码

2.3. 配置

Hadoop 3.1暂未发布,需要下载源码,本地使用maven编译。有几点需要注:

  • Gcc需要升级到5.x
  • Protobuf的版本必须是2.5.0
  • 由于Hadoop要求container-executor和container-executor.cfg以及它们所在的父级目录的owner均为root:hadoop,所以在编译时可以强行指定container-executor.cfg的目录,从未避免直接修改$HADOOP_HOME/etc/hadoop/目录的owner所带来的不便。
//container-executor.cfg单独放在/etc/hadoop目录,和hadoop目录隔离
mvn package -Pdist,native -DskipTests -Dtar -Dcontainer-executor.conf.dir=/etc/hadoop

Hadoop 3.1 Docker+GPU的配置主要包括nvidia-docker、GPU resource和cgroups的配置。实际测试中,遇到和cgroups相关的错误,暂未实现对GPU资源的调度。

参考
Using GPU On YARN
Launching Applications Using Docker Containers
Hadoop 3.1 YARN failed with error 'exitCode=255: CGroups: Could not find file to write' when launching a Docker container

3. Kubernetes v1.9 GPU

一个简单的Kubernetes集群示意图如下。


K8S的GPU实现相较Hadoop YARN显得十分简洁。GPU类型定义、发现和分配的逻辑都写在kubernetes/pkg/kubelet/gpu/nvidia/nvidia_gpu_manager.go
其中,GPU数据结构是一个2维map, 外层key是podID,内层key是containerName,值是GPU在Docker容器内的/dev/nvidia目录。

type containerToGPU map[string]sets.String

// podGPUs represents a list of pod to GPU mappings.
type podGPUs struct {
    podGPUMapping map[string]containerToGPU
}

func newPodGPUs() *podGPUs {
    return &podGPUs{
        podGPUMapping: make(map[string]containerToGPU),
    }
}

podGPUMapping[podUID][contName].Insert(device)

之所以这样设计,是因为Kubernetes允许1个pod运行多个Docker容器。官方文档的解释如下。

"The primary reason that Pods can have multiple containers is to support helper applications that assist a primary application. Typical examples of helper applications are data pullers, data pushers, and proxies. Helper and primary applications often need to communicate with each other. Typically this is done through a shared filesystem."

类似Hadoop YARN里的GpuResourceAllocator,Kubernetes里的nvidiaGpuManager可以分配GPU, 并且通过sync.Mutex对所有写操作加锁限制。

// nvidiaGPUManager manages nvidia gpu devices.
type nvidiaGPUManager struct {
    sync.Mutex
    //当前节点上所有的GPU
    allGPUs        sets.String
    //2维map,外层key是podID,内层key是containerName,值是GPU在Docker容器内的/dev/nvidia目录
    allocated      *podGPUs
    defaultDevices []string
    dockerClient     dockertools.DockerInterface
    activePodsLister activePodsLister
}

分配GPU的流程图如下。


虽然Hadoop YARN和Kubernetes存储GPU的数据结构和自身平台结构不同,但在GPU分配时二者流程相似,也都有考虑到GPU数据更新的线程安全问题。

参考
源码

4. Hadoop v3.1 YARN调度

YARN的调度分为FIFO、Capacity和Fair3种。FIFO适用单用户,而Capacity和Fair适用于多用户模式。下面主要来看Capacity调度。

Capacity调度的基本思想是通过一个层级队列实现对集群资源的有效分配。


对应的etc/hadoop/capacity-scheduler.xml配置如下。

<?xml version="1.0"?>
<configuration>
  <property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>prod,dev</value> </property>
<property>
<name>yarn.scheduler.capacity.root.dev.queues</name>
<value>eng,science</value> </property>
<property>
<name>yarn.scheduler.capacity.root.prod.capacity</name>
<value>40</value> </property> <property>
<name>yarn.scheduler.capacity.root.dev.capacity</name>
<value>60</value> </property> <property>
<name>yarn.scheduler.capacity.root.dev.maximum-capacity</name>
<value>75</value> </property> <property>
<name>yarn.scheduler.capacity.root.dev.eng.capacity</name>
<value>50</value> </property> <property>
<name>yarn.scheduler.capacity.root.dev.science.capacity</name>
<value>50</value> </property>
</configuration>

例如,上面这个层级队列中dev队列分得集群60%的资源。dev队列下面有2个子队列eng和science将分得dev队列获得资源的各一半。当dev队列资源紧张时,Capacity调度可以从prod队列获取部分资源填补空缺(所谓弹性队列),而maximun-capacity 配置(75%)是允许其超越60%容量的上限,这样可以防止prod队列资源被全部侵占。用户通过YARN提交application的时候需要指定叶子队列的名字。因而,叶子队列的名字必须是惟一的。

Capacity调度的入口方法如下。该方法首先将备选Nodes随机划分成2组,然后分别遍历2组中的每个Node。

static void schedule(CapacityScheduler cs) throws InterruptedException{
    // First randomize the start point
    int current = 0;
    Collection<FiCaSchedulerNode> nodes = cs.nodeTracker.getAllNodes();
    int start = random.nextInt(nodes.size());

    // Allocate containers of node [start, end)
    for (FiCaSchedulerNode node : nodes) {
      if (current++ >= start) {
        if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
          continue;
        }
        cs.allocateContainersToNode(node.getNodeID(), false);
      }
    }

    current = 0;

    // Allocate containers of node [0, start)
    for (FiCaSchedulerNode node : nodes) {
      if (current++ > start) {
        break;
      }
      if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging))      {
        continue;
      }
      cs.allocateContainersToNode(node.getNodeID(), false);
    }
  }

allocateContainersToNode方法继续调用allocateContainerOnSingleNode方法如下。

private CSAssignment allocateContainerOnSingleNode(
      CandidateNodeSet<FiCaSchedulerNode> candidates, FiCaSchedulerNode node,
      boolean withNodeHeartbeat) {
      ... ...
       LeafQueue queue = ((LeafQueue) reservedApplication.getQueue());
      //YARN只允许提交application给叶子队列;这里让叶子队列进一步处理container的申请
      assignment = queue.assignContainers(getClusterResource(), candidates, new ResourceLimits(labelManager
              .getResourceByLabel(RMNodeLabelsManager.NO_LABEL,
                  getClusterResource())),
          SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
      ... ...

}

交由提交application时指定的叶子队列来调用assignContainers方法如下。关键的2步分别是:交由application调用assignContainers方法;调用resourceCalculator比较container申请的资源能否得到满足。为了实现GPU资源调度,这里需要将resourceCalculator配置成DominantResourceCalculator。否则默认的DefaultResourceCalculator只会比较内存单一资源。

public CSAssignment assignContainers(Resource clusterResource,
      CandidateNodeSet<FiCaSchedulerNode> candidates,
      ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
      //检查queue的max-capacity limit,检查user的limit等等
      ... ...
      //交由application的assignContainers方法;这里的application的类是FiCaSchedulerApp。
      assignment = application.assignContainers(clusterResource,
          candidates, currentResourceLimits, schedulingMode, null);
      Resource assigned = assignment.getResource();
      //调用配置的resourceCalculator比较assigned里的资源是否能得到满足;如果resourceCalculator是DominantResourceCalculator类,那么会比较每项资源,否则,默认只会比较内存单一资源
      if (Resources.greaterThan(resourceCalculator, clusterResource, assigned,
          Resources.none())) {
        return assignment;
      }
      ... ...
}

DominantResourceCalculator类的compare方法会比较所有支持的Resources,确保一个都不能少。

  /**
   * Compare two resources - if the value for every resource type for the lhs
   * is greater than that of the rhs, return 1. If the value for every resource
   * type in the lhs is less than the rhs, return -1. Otherwise, return 0
   */
private int compare(Resource lhs, Resource rhs) {
    boolean lhsGreater = false;
    boolean rhsGreater = false;
    int ret = 0;

    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
    for (int i = 0; i < maxLength; i++) {
      ResourceInformation lhsResourceInformation = lhs
          .getResourceInformation(i);
      ResourceInformation rhsResourceInformation = rhs
          .getResourceInformation(i);
      int diff = lhsResourceInformation.compareTo(rhsResourceInformation);
      if (diff >= 1) {
        lhsGreater = true;
      } else if (diff <= -1) {
        rhsGreater = true;
      }
    }
    if (lhsGreater && rhsGreater) {
      ret = 0;
    } else if (lhsGreater) {
      ret = 1;
    } else if (rhsGreater) {
      ret = -1;
    }
    return ret;
  }

目前支持的Resources类型如下。

public class ResourceInformation implements Comparable<ResourceInformation> {
  // Known resource types
  public static final String MEMORY_URI = "memory-mb";
  public static final String VCORES_URI = "vcores";
  public static final String GPU_URI = "yarn.io/gpu";
  public static final String FPGA_URI = "yarn.io/fpga";
  ... ...
}

参考
YARN – THE CAPACITY SCHEDULER
Hadoop: Capacity Scheduler
源码

5. Kubernetes v1.9调度

Scheduler调度的入口方法如下。分为2个阶段,分别是Predicate和Prioritizing,对应findNodesThatFit()和PrioritizeNodes()方法。

// Schedule tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError error with reasons.
func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister) (string, error) {
    ... ...
    //Predicate
    filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, g.cachedNodeInfoMap, nodes, g.predicates, g.extenders, g.predicateMetaProducer, g.equivalenceCache, g.schedulingQueue, g.alwaysCheckAllPredicates)
    if err != nil {
        return "", err
    }

    if len(filteredNodes) == 0 {
        return "", &FitError{
            Pod:              pod,
            NumAllNodes:      len(nodes),
            FailedPredicates: failedPredicateMap,
        }
    }

    //如果Predicate只筛选出1个Node,那就不用继续做Prioritizing
    if len(filteredNodes) == 1 {
        metrics.SchedulingAlgorithmPriorityEvaluationDuration.Observe(metrics.SinceInMicroseconds(startPriorityEvalTime))
        return filteredNodes[0].Name, nil
    }
    
    //Prioritizing
    priorityList, err := PrioritizeNodes(pod, g.cachedNodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders)
    if err != nil {
        return "", err
    }
    return g.selectHost(priorityList)
}

5.1. Predicate

第一阶段的Predicate将会从备选Node list中滤除不符合predicate策略的节点。每一个predicate策略都对应一个具体的方法,而且它们的顺序是有区分的。一般,越靠前代表越为重要。这样,如果第一个predicate策略不满足且alwaysCheckAllPredicates==false,就不必再往下检查其余predicate策略,直接宣告当前node不满足调度要求。另外,这些predicate策略也可以手工配置。目前支持的predicate策略如下。

var (
    predicatesOrdering = []string{CheckNodeConditionPred, CheckNodeUnschedulablePred,
        GeneralPred, HostNamePred, PodFitsHostPortsPred,
        MatchNodeSelectorPred, PodFitsResourcesPred, NoDiskConflictPred,
        PodToleratesNodeTaintsPred, PodToleratesNodeNoExecuteTaintsPred, CheckNodeLabelPresencePred,
        CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred,
        MaxAzureDiskVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred,
        CheckNodeMemoryPressurePred, CheckNodeDiskPressurePred, MatchInterPodAffinityPred}
)

findNodesThatFit方法会调用workqueue.Parallelize启动多个(同时最多16个)gorutine,并行检查备选Node list里面的所有Nodes。针对每个Node的podFitsOnNode则会按顺序遍历predicate方法做检查。

func findNodesThatFit(
    pod *v1.Pod,
    nodeNameToInfo map[string]*schedulercache.NodeInfo,
    nodes []*v1.Node,
    predicateFuncs map[string]algorithm.FitPredicate,
    extenders []algorithm.SchedulerExtender,
    metadataProducer algorithm.PredicateMetadataProducer,
    ecache *EquivalenceCache,
    schedulingQueue SchedulingQueue,
    alwaysCheckAllPredicates bool,
) ([]*v1.Node, FailedPredicateMap, error) {
    var filtered []*v1.Node
    failedPredicateMap := FailedPredicateMap{}
    ... ...

        checkNode := func(i int) {
            nodeName := nodes[i].Name
            fits, failedPredicates, err := podFitsOnNode(
                pod,
                meta,
                nodeNameToInfo[nodeName],
                predicateFuncs,
                ecache,
                schedulingQueue,
                alwaysCheckAllPredicates,
                equivCacheInfo,
            )
            if err != nil {
                predicateResultLock.Lock()
                errs[err.Error()]++
                predicateResultLock.Unlock()
                return
            }
            if fits {
                filtered[atomic.AddInt32(&filteredLen, 1)-1] = nodes[i]
            } else {
                predicateResultLock.Lock()
                failedPredicateMap[nodeName] = failedPredicates
                predicateResultLock.Unlock()
            }
        }
        
        workqueue.Parallelize(16, len(nodes), checkNode)
    ... ...
    return filtered, failedPredicateMap, nil
}

func podFitsOnNode(
    pod *v1.Pod,
    meta algorithm.PredicateMetadata,
    info *schedulercache.NodeInfo,
    predicateFuncs map[string]algorithm.FitPredicate,
    ecache *EquivalenceCache,
    queue SchedulingQueue,
    alwaysCheckAllPredicates bool,
    equivCacheInfo *equivalenceClassInfo,
) (bool, []algorithm.PredicateFailureReason, error) {
    var (
        eCacheAvailable  bool
        failedPredicates []algorithm.PredicateFailureReason
    )
    predicateResults := make(map[string]HostPredicate)
    ... ...
        //顺序遍历predicates策略
        for _, predicateKey := range predicates.Ordering() {
            var (
                fit     bool
                reasons []algorithm.PredicateFailureReason
                err     error
            )
            if predicate, exist := predicateFuncs[predicateKey]; exist {
                func() {
                    var invalid bool
                    if eCacheAvailable {
                        //优先从equivalence cache查找当前predicate策略匹配结果
                    }

                    if !eCacheAvailable || invalid {
                        //equivalence cache不靠谱,调用当前predicate策略方法
                        fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
                        if err != nil {
                            return
                        }
                        //更新equivalence cache
                    }
                }()
                
                if !fit {
                    if !alwaysCheckAllPredicates {
                        //如果当前predicate策略不满足且alwaysCheckAllPredicates==false,就此打住
                        break
                    }
    ... ...
    return len(failedPredicates) == 0, failedPredicates, nil
}

GPU资源的调度属于predicate策略中排序比较靠前的PodFitsResources策略。对应的检查方法中将GPU和CPU、内存等一并考虑。

5.2. Prioritizing

如果predicate筛选的Node list不止一个,Scheduler会继续进行优先级排序,最终选择优先级最高的Node。
PrioritizeNodes方法也会调用workqueue.Parallelize启动多个(同时最多16个)gorutine并行检查通过predicate筛选的Node list里面的所有节点。

// HostPriority represents the priority of scheduling to a particular host, higher priority is better.
type HostPriority struct {
    // Name of the host
    Host string
    // Score associated with the host
    Score int
}

// HostPriorityList declares a []HostPriority type.
type HostPriorityList []HostPriority

func PrioritizeNodes(
    pod *v1.Pod,
    nodeNameToInfo map[string]*schedulercache.NodeInfo,
    meta interface{},
    priorityConfigs []algorithm.PriorityConfig,
    nodes []*v1.Node,
    extenders []algorithm.SchedulerExtender,
) (schedulerapi.HostPriorityList, error) {
    ... ...
    //results是一个2维slice
    results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))

    ... ...
    processNode := func(index int) {
        nodeInfo := nodeNameToInfo[nodes[index].Name]
        var err error
        for i := range priorityConfigs {
            if priorityConfigs[i].Function != nil {
                continue
            }
            results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
            if err != nil {
                appendError(err)
                return
            }
        }
    }
    workqueue.Parallelize(16, len(nodes), processNode)
    ... ...
    //result是一个1维slice,由results每列加权相加得到,每个元素是节点nodeName和对应priority分数
    result := make(schedulerapi.HostPriorityList, 0, len(nodes))

    for i := range nodes {
        result = append(result, schedulerapi.HostPriority{Host: nodes[i].Name, Score: 0})
        for j := range priorityConfigs {
            result[i].Score += results[j][i].Score * priorityConfigs[j].Weight
        }
    }
    ... ...
    return result, nil
}

和predicate策略类似,Prioritizing条件也存在多种。不过,不同的是Prioritizing阶段会对每个节点的所有priority条件进行评分,分数存储在一个2维度slice results。最终会以节点为维度进行加权,这样,每个节点就有了一个priority分数,存储在1维slice result里面。

最后,当存在多个节点都是最高分的时候,Scheduler为了避免调度不均衡,引入了一个简单算法如下。

// selectHost takes a prioritized list of nodes and then picks one
// in a round-robin manner from the nodes that had the highest score.
func (g *genericScheduler) selectHost(priorityList schedulerapi.HostPriorityList) (string, error) {
    if len(priorityList) == 0 {
        return "", fmt.Errorf("empty priorityList")
    }

    sort.Sort(sort.Reverse(priorityList))
    maxScore := priorityList[0].Score
    firstAfterMaxScore := sort.Search(len(priorityList), func(i int) bool { return priorityList[i].Score < maxScore })

    g.lastNodeIndexLock.Lock()
    ix := int(g.lastNodeIndex % uint64(firstAfterMaxScore))
    g.lastNodeIndex++
    g.lastNodeIndexLock.Unlock()

    return priorityList[ix].Host, nil
}

例如,假定queue里面有2个待调度的container1和container2,且各自经过Predicate和Prioritizings后得到的result相同。

[[‘Node1’,10],[‘Node2’,10],[‘Node3’,10],[‘Node4’,2],[‘Node5’,1],[‘Node6’,1],[‘Node7’,0]]

这时,采用上述算法得到以下调度结果。

对container1,lastNodeIndex=7, firstAfterMaxScore=3, 故选择7%3=1即[‘Node2’,10]; lastNodeIndex自增1变成8;

对container2, lastNodeIndex=8, firstAfterMaxScore=3, 故选择8%3=2即[Node3’,10];lastNodeIndex自增1变成9;意味下次将调度[‘Node0’,10];

Kubernetes和Hadoop YARN在调度上可谓各有千秋,就其异同点作如下简单小结。

  1. Hadoop YARN通过层级队列实现多用户的集群资源分配和管理;Kubernetes则通过namespace来达到类似效果,只不过namespace没有层级概念。
  2. 从源码中可以看出,Hadoop YARN在调度时需要遍历所有备选节点;Kubernetes则通过gorutines并行检查备选节点,当然每个节点还是需要遍历predicate条件直到全部通过或者失败退出。
  3. Hadoop YARN和Kubernetes在调度时都引入了随机性来确保节点调度均匀。Hadoop YARN是随机分割备选节点列表;Kubernetes则是通过round-robin从一组最优节点中挑选调度节点。
  4. Hadoop YARN的调度策略虽然也可以配置,但是Kubernetes的整体设计更加灵活可配置。。事实上,Kubernetes在早先版本中,scheduler是放在plugin目录下,其灵活性可见一斑。
  5. 单纯从代码可读性上来说,Kubernetes这点做的好太多了,不仅代码写的精巧,注释也非常充分。

参考
Kubernetes调度详解
干货 | kube-scheduler原理解析
Priority in Kubernetes API
源码

6. 后记

6.1. Docker容器化

TensorflowOnSpark在Github2200+的关注度,然而,不支持Docker的代价就是环境配置过于繁杂,每个executor节点的runtime从python版本到CUDA drive都要一致。固然通过spark-submit的各种option(例如-py-files或者-jars)可以实现运行环境的自动分发,但终归难以解决各种环境依赖问题等。从作者leewyang的回复来看,Docker容器化也是势在必行。


6.2. Hadoop YARN service API

不知道是不是看到K8S如此成功,YARN终于也要开放提供REST接口,而且可以像提交K8S YAML/JSON一样直接指定Docker镜像和所需CPU/Memory/GPU资源。不过默认Simple Auth情况下的user是dr.who,任务实际上会被拒绝。(可能通过配置可以解决。)


下面这个例子可以用HTTP Post请求替代原始YARN提交命令。

yarn jar /home/junzhang22/hadoop-3.1.0-SNAPSHOT/share/hadoop/yarn/hadoop-yarn-applications-distributedshell-3.1.0-SNAPSHOT.jar \

-jar /home/junzhang22/hadoop-3.1.0-SNAPSHOT/share/hadoop/yarn/hadoop-yarn-applications-distributedshell-3.1.0-SNAPSHOT.jar \

-shell_env YARN_CONTAINER_RUNTIME_TYPE=docker \

-shell_env YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=tensorflow-gpu-boto-test \

-shell_script ./run.sh \

-container_resources memory-mb=3072,vcores=1,yarn.io/gpu=1 \

-num_containers 1
POST /app/v1/services/ HTTP/1.1

Host: 10.1.86.15:8088

Content-Type: application/json

Request:

{

 "name": "hello-world4",

 "version": "1.0.0",

 "description": "hello world example",

 "components" :

 [

 {

 "name": "hello",

 "number_of_containers": 1,

 "artifact": {

 "id": "tensorflow-gpu-boto-test",

 "type": "DOCKER"

 },

 "launch_command": "./run.sh",

 "resource": {

 "cpus": 1,

 "memory": "1024",

 "additional" : {

 "yarn.io/gpu" : {

 "value" : 1,

 "unit" : ""

 }

 }

 }

 }

 ]

}

Response:

{

 "uri": "/v1/services/hello-world4",

 "diagnostics": "Application ID: application_1521616970183_0001",

 "state": "ACCEPTED"

}

参考
YarnServiceAPI

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容

  • 全局队列 但是我们不需要用retain和release来管理全局的并发队列,为全局队列对于程序来说是全局的,ret...
    听风赏花_fc3e阅读 99评论 0 0
  • 上周看到一个简报,说google的项目flutter进入Beta阶段。从简报上大体了解了下,是用来开发跨平台应用的...
    dinfer阅读 1,346评论 4 1
  • git reset 与 git revert区别 基本概念 首先我们来了解下Git的工作区、暂存区及HEAD的概念...
    小明阿李阅读 217评论 0 0
  • 回顾 大家都趁着跨年之夜,细数着2016年的点点滴滴。可是我却像个断片的酒鬼,过往是杂乱无章的片段。 2月...
    闹钟阅读 142评论 0 0
  • 现金流量--“大于100/100/10” 的概念,其中第三个10,指的是现金再投资比率。 一、概念 现金再投资比率...
    海风珠海阅读 649评论 0 0