FairScheduler中Queue管理

FairScheduler的资源分配第一层调度的对象是FSQueue,即资源的分配都是在队列中完成的。队列抽象类FSQueue包含FSParentQueue和FSLeafQueue两个子类,只有FSLeafQueue才能包含具体的应用(具体对应的类是FSAppAttempt)。QueueManager管理了FairScheduler中所有的FSQueue,fair-scheduler.xml是用户提供的队列的所有配置,是通过AllocationFileLoaderService进行加载。

image.png

初始化和新建队列

初始化

  1. QueueManager的构造函数什么都没做;
  2. QueueManager#initialize是在FairScheduler#initScheduler中调用,可以看到initialize方法除了新建了root节点以及名为root.default的叶子节点也什么都没有做;
public QueueManager(FairScheduler scheduler) {
    this.scheduler = scheduler;
  }
  
  public FSParentQueue getRootQueue() {
    return rootQueue;
  }

  public void initialize(Configuration conf) throws IOException,
      SAXException, AllocationConfigurationException, ParserConfigurationException {
    rootQueue = new FSParentQueue("root", scheduler, null);
    queues.put(rootQueue.getName(), rootQueue);
    
    // Create the default queue
    getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true);
  }

新建队列

主要逻辑是在QueueManager#createQueue

  1. 新建队列的时候传入的参数name一般是root.parent1.parent2...leafName,注意name的最后一个节点可能是一个FSLeafQueue也可能不是,按照参数queueType确定即可;
  2. 找到队列中第一个存在的parent,并将leafName和沿途不存在的parent按照顺序放进List;
  3. 按照list倒序依次构建不存在的parent和leafName.
private FSQueue createQueue(String name, FSQueueType queueType) {
    List<String> newQueueNames = new ArrayList<String>();
    newQueueNames.add(name);
    int sepIndex = name.length();
    FSParentQueue parent = null;

    // Move up the queue tree until we reach one that exists.
    while (sepIndex != -1) {
      sepIndex = name.lastIndexOf('.', sepIndex-1);
      FSQueue queue;
      String curName = null;
      curName = name.substring(0, sepIndex);
      queue = queues.get(curName);

      if (queue == null) {
        newQueueNames.add(curName);
      } else {
        if (queue instanceof FSParentQueue) {
          parent = (FSParentQueue)queue;
          break;
        } else {
          return null;
        }
      }
    }
    
    // At this point, parent refers to the deepest existing parent of the
    // queue to create.
    // Now that we know everything worked out, make all the queues
    // and add them to the map.
    AllocationConfiguration queueConf = scheduler.getAllocationConfiguration();
    FSLeafQueue leafQueue = null;
    for (int i = newQueueNames.size()-1; i >= 0; i--) {
      String queueName = newQueueNames.get(i);
      if (i == 0 && queueType != FSQueueType.PARENT) {
        try {
          leafQueue = new FSLeafQueue(name, scheduler, parent);
          leafQueue.setPolicy(queueConf.getDefaultSchedulingPolicy());
        } catch (AllocationConfigurationException ex) {
          LOG.warn("Failed to set default scheduling policy "
              + queueConf.getDefaultSchedulingPolicy() + " on new leaf queue.", ex);
        }
        parent.addChildQueue(leafQueue);
        queues.put(leafQueue.getName(), leafQueue);
        leafQueues.add(leafQueue);
        leafQueue.updatePreemptionVariables();
        return leafQueue;
      } else {
        FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent);
        try {
          newParent.setPolicy(queueConf.getDefaultSchedulingPolicy());
        } catch (AllocationConfigurationException ex) {
          LOG.warn("Failed to set default scheduling policy "
              + queueConf.getDefaultSchedulingPolicy() + " on new parent queue.", ex);
        }
        parent.addChildQueue(newParent);
        queues.put(newParent.getName(), newParent);
        newParent.updatePreemptionVariables();
        parent = newParent;
      }
    }

    return parent;
  }

可以看到整个QueueManager的初始化只做了那么其它的节点以及节点的配置是在什么时候加入到QueueManager中的呢?

FairScheduler配置文件加载

配置文件获取

FairScheduler的默认配置文件是fair-scheduler.xml,当然也可以通过yarn.scheduler.fair.allocation.file修改配置文件的路径。AllocationFileLoaderService#reloadThread每隔10s钟会重新加载一遍fair-scheduler.xml,除此之外在FairScheduler#initScheduler(即在开始调度之前)中会调用一次AllocationFileLoaderService#reloadAllocations来加载配置文件。读取配置文件之后会调用QueueManager#updateAllocationConfiguration将所有的队列信息更新进QueueManager之中这样所有的队列就算是新建完成了。注意QueueManager只是保存了队列,其它的配置是保存在FairSchduler#allocConf中,需要的时候从FairSchduler#allocConf获取即可。

queue配置更新到QueueManager中

将配置文件读取到FairSchduler#allocConf之后,通过QueueManager#updateAllocationConfiguration对QueueManager中的队列进行更新。这个更新一多半是新建节点了一般是新建节点,新建之前通过QueueManager#removeEmptyIncompatibleQueues判断给定的节点是否满足新建的条件,如果不满足则
不新建,此节点算是更新失败咯,虽然更新失败,但是不会报错,因为有AllocationService是定时调度,
下次更新的时候可能会更新成功。

QueueManager#removeEmptyIncompatibleQueues判断是否能新建,假设传入参数的队列类型是FSQueueType.LEAF,指定的名字为queue1

  1. 如果QueueManager存在queue1且类型是FSQueueType.LEAF返回true;
  2. 如果QueueManager存在queue1且类型是FSQueueType.PARENT返回
    QueueManager#removeQueueIfEmpty的结果,如果queue1队列中的自队列有应用则返回false,
    如果没有应用则删除queue1返回true;
  3. 如果QueueManager不存在queue1,查看queue1的路径中是否包含FSQueueType.LEAF类型的节点,
    有则按照QueueManager#removeQueueIfEmpty的结果处理。

QueueManager#updateAllocationConfiguration是先更新FSQueueType.LEAF,再更FSQueueType.PARENT没有感觉这样做的好处是什么,因为也有可能将一个FSQueueType.PARENT节点转换成FSQueueType.LEAF节点。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。