YARN中NodeManager资源下载的过程

ApplicationMaster提交startContianers的请求给NodeManager之后,NodeManager很重要的一个步骤是将启动容器所需要的Resource从HDFS上面下载到NodeManager本地磁盘。

ContainerInitEvent处理

文件下载的入口是NodeManager端的ContainerImpl收到ContainerInitEvent事件,该事件由ContainerImpl$RequestResourcesTransition进行处理。

  1. 获取本次Container启动所需参数的上下文ContainerLaunchContext;
  2. 通过ContainerLaunchContext#getLocalResources获取所需的资源;
  3. 将每个LocalResource封装成ContainerImpl可用的LocalResourceRequest对象;
  4. 将所有的LocalResourceRequest加入ContainerImpl#pendingResources(一个map),
    从名字可以看出这个是正在等待下载或正在下载中的resource,如果某个resource
    已经下载完成便会从这个map中删除,因此这个map可以作为最后判断所有resource
    是否下载完成的依据;
  5. 将资源分为PUBLIC,PRIVATEAPPLICATION分别存进ContainerImpl#publicRsrcs,
    ContainerImpl#privateRsrcs和ContainerImpl#appRsrcs中去;
  6. 最后触发ContainerLocalizationRequestEvent,ContainerLocalizationRequestEvent
    包含所有的resource请求和当前的ContainerImpl
// ContainerImpl$RequestResourcesTransition
static class RequestResourcesTransition implements
      MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
    @Override
    public ContainerState transition(ContainerImpl container,
        ContainerEvent event) {
      // ... 忽略其它逻辑,这里只看正常流程
      // Send requests for public, private resources
      
      final ContainerLaunchContext ctxt = container.launchContext;
      Map<String,LocalResource> cntrRsrc = ctxt.getLocalResources();
      if (!cntrRsrc.isEmpty()) {
        try {
          for (Map.Entry<String,LocalResource> rsrc : cntrRsrc.entrySet()) {
            try {
              LocalResourceRequest req =
                  new LocalResourceRequest(rsrc.getValue());
              List<String> links = container.pendingResources.get(req);
              if (links == null) {
                links = new ArrayList<String>();
                container.pendingResources.put(req, links);
              }
              links.add(rsrc.getKey());
              switch (rsrc.getValue().getVisibility()) {
              case PUBLIC:
                container.publicRsrcs.add(req);
                break;
              case PRIVATE:
                container.privateRsrcs.add(req);
                break;
              case APPLICATION:
                container.appRsrcs.add(req);
                break;
              }
            } catch (URISyntaxException e) {
              LOG.info("Got exception parsing " + rsrc.getKey()
                  + " and value " + rsrc.getValue());
              throw e;
            }
          }
        } catch (URISyntaxException e) {
          // malformed resource; abort container launch
          LOG.warn("Failed to parse resource-request", e);
          container.cleanup();
          container.metrics.endInitingContainer();
          return ContainerState.LOCALIZATION_FAILED;
        }
        Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
            new HashMap<LocalResourceVisibility, 
                        Collection<LocalResourceRequest>>();
        if (!container.publicRsrcs.isEmpty()) {
          req.put(LocalResourceVisibility.PUBLIC, container.publicRsrcs);
        }
        if (!container.privateRsrcs.isEmpty()) {
          req.put(LocalResourceVisibility.PRIVATE, container.privateRsrcs);
        }
        if (!container.appRsrcs.isEmpty()) {
          req.put(LocalResourceVisibility.APPLICATION, container.appRsrcs);
        }
        
        container.dispatcher.getEventHandler().handle(
              new ContainerLocalizationRequestEvent(container, req));
        return ContainerState.LOCALIZING;
      } else {
        container.sendLaunchEvent();
        container.metrics.endInitingContainer();
        return ContainerState.LOCALIZED;
      }
    }
  }

ContainerLocalizationRequestEvent处理

ContainerLocalizationRequestEvent由ResourceLocalizationService#handle接受,具体处理逻辑是在ResourceLocalizationService#handleInitContainerResources.
对于上一步的每一个LocalResourceRequest找到与之对应的LocalResourcesTracker,一个LocalResourceRequest在具体下载的时候会被封装成一个LocalizedResource,LocalizedResource是具有状态的resource,LocalResourcesTracker负责管理一组LocalizedResource如一个应用对应的所LocalizedResource会被一个LocalResourcesTracker进行处理。对于每一个LocalResourceRequest都会触发ResourceRequestEvent。

// ResourceLocalizationService#handleInitContainerResources
private void handleInitContainerResources(
      ContainerLocalizationRequestEvent rsrcReqs) {
    Container c = rsrcReqs.getContainer();
    // create a loading cache for the file statuses
    LoadingCache<Path,Future<FileStatus>> statCache =
        CacheBuilder.newBuilder().build(FSDownload.createStatusCacheLoader(getConfig()));
    LocalizerContext ctxt = new LocalizerContext(
        c.getUser(), c.getContainerId(), c.getCredentials(), statCache);
    Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
      rsrcReqs.getRequestedResources();
    for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
         rsrcs.entrySet()) {
      LocalResourcesTracker tracker =
          getLocalResourcesTracker(e.getKey(), c.getUser(),
              c.getContainerId().getApplicationAttemptId()
                  .getApplicationId());
      for (LocalResourceRequest req : e.getValue()) {
        tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt));
      }
    }
  }

ResourceRequestEvent的处理

ResourceRequestEvent首先由LocalResourcesTrackerImpl#handle接受,然后进行事件处理。

  1. 对于每一个LocalResourceRequest生成一个LocalizedResource,并以前者作为键,后者作为值
    保存进LocalResourceRequest#localrsrc(一个map)中。注意这里只管了事件类型REQUEST
  2. 由生成的LocalizedResource处理ResourceRequestEvent,处理是在LocalizedResource#FetchResourceTransition,
    逻辑很简单,将每一个生成的LocalizedResource封装进LocalizerResourceRequestEvent并触发之。
// LocalResourcesTrackerImpl#handle
public synchronized void handle(ResourceEvent event) {
    LocalResourceRequest req = event.getLocalResourceRequest();
    LocalizedResource rsrc = localrsrc.get(req);
    switch (event.getType()) {
    case LOCALIZED:
      if (useLocalCacheDirectoryManager) {
        inProgressLocalResourcesMap.remove(req);
      }
      break;
    case REQUEST:
      if (rsrc != null && (!isResourcePresent(rsrc))) {
        LOG.info("Resource " + rsrc.getLocalPath()
            + " is missing, localizing it again");
        removeResource(req);
        rsrc = null;
      }
      if (null == rsrc) {
        rsrc = new LocalizedResource(req, dispatcher);
        localrsrc.put(req, rsrc);
      }
      break;
    case RELEASE:
      if (null == rsrc) {
        // The container sent a release event on a resource which 
        // 1) Failed
        // 2) Removed for some reason (ex. disk is no longer accessible)
        ResourceReleaseEvent relEvent = (ResourceReleaseEvent) event;
        LOG.info("Container " + relEvent.getContainer()
            + " sent RELEASE event on a resource request " + req
            + " not present in cache.");
        return;
      }
      break;
    case LOCALIZATION_FAILED:
      /*
       * If resource localization fails then Localized resource will be
       * removed from local cache.
       */
      removeResource(req);
      break;
    case RECOVERED:
      if (rsrc != null) {
        LOG.warn("Ignoring attempt to recover existing resource " + rsrc);
        return;
      }
      rsrc = recoverResource(req, (ResourceRecoveredEvent) event);
      localrsrc.put(req, rsrc);
      break;
    }

    rsrc.handle(event);

    if (event.getType() == ResourceEventType.LOCALIZED) {
      if (rsrc.getLocalPath() != null) {
        try {
          stateStore.finishResourceLocalization(user, appId,
              buildLocalizedResourceProto(rsrc));
        } catch (IOException ioe) {
          LOG.error("Error storing resource state for " + rsrc, ioe);
        }
      } else {
        LOG.warn("Resource " + rsrc + " localized without a location");
      }
    }
  }

LocalizerResourceRequestEvent的处理

处理逻辑是在ResourceLocalizationService#handle中,这里正是对resource进行下载。

  • 对于PUBLIC的资源在ResourceLocalizationService中有一个专门的线程对其进行下载;
  • 对于PRIVATEAPPLICATION的资源每一个Container对应一个LocalizerRunner进行下载;
  • 本文主要针对LocalizerRunner进行说明。
// 处理逻辑是在ResourceLocalizationService#handle
public void handle(LocalizerEvent event) {
      String locId = event.getLocalizerId();
      switch (event.getType()) {
      case REQUEST_RESOURCE_LOCALIZATION:
        // 0) find running localizer or start new thread
        LocalizerResourceRequestEvent req =
          (LocalizerResourceRequestEvent)event;
        switch (req.getVisibility()) {
        case PUBLIC:
          publicLocalizer.addResource(req);
          break;
        case PRIVATE:
        case APPLICATION:
          synchronized (privLocalizers) {
            LocalizerRunner localizer = privLocalizers.get(locId);
            if (null == localizer) {
              LOG.info("Created localizer for " + locId);
              localizer = new LocalizerRunner(req.getContext(), locId);
              privLocalizers.put(locId, localizer);
              localizer.start();
            }
            // 1) propagate event
            localizer.addResource(req);
          }
          break;
        }
        break;
      }
    }

下载过程

客户端的处理

LocalizerRunner#run会将具体下载任务委托给ContainerExecutor(默认实现为DefaultContainerExecutor),
而DefaultContainerExecutor则会创建一个RPC客户端ContainerLocalizer通过LocalizationProtocol与
ResourceLocalizationService(作为服务端)进行通讯,按顺序获取需要下载启动容器需要资源的信息。

  • 上面两个组件都是NodeManager端的,为什么利用RPC进行通讯,咱也不知道。

ContainerLocalizer#runLocalization将所有的下载任务给一个线程池执行就完事了。注意这是一个由CompletionService封装的线程池,CompletionService具有这样的功能:将所有任务的Futrue封装进一个BlockingQueue,可以通过这个队列获取所有任务的future,完成一些阻塞操作。

// ContainerLocalizer#runLocalization
public int runLocalization(final InetSocketAddress nmAddr)
      throws IOException, InterruptedException {
    // ...忽略不必要的代码
    ExecutorService exec = null;
    try {
      exec = createDownloadThreadPool();
      CompletionService<Path> ecs = createCompletionService(exec);
      localizeFiles(nodeManager, ecs, ugi);
      return 0;
    } catch (Throwable e) {
      // Print traces to stdout so that they can be logged by the NM address
      // space.
      e.printStackTrace(System.out);
      return -1;
    } finally {
      try {
        if (exec != null) {
          exec.shutdownNow();
        }
        LocalDirAllocator.removeContext(appCacheDirContextName);
      } finally {
        closeFileSystems(ugi);
      }
    }
  }

具体下载逻辑是在ContainerLocalizer#localizeFiles中,这也是Client与Server通过RPC交互的地方

  1. 这个方法是一个死循环
  2. 每次发送RPC请求之前都会调用ContainerLocalizer#createStatus
  3. ContainerLocalizer#createStatus会更新ContainerLocalizer#pendingResources中所有资源的状态,
    对于每一个提交给线程池下在的资源都会保存在ContainerLocalizer#pendingResources中。
  4. 服务端就是根据这些状态来判断资源下载成功、下载失败、还是正在下载中,这几个装填是依据每个
    任务的Future进行处理的。
  5. 发送请求nodemanager.heartbeat(status)获得的相应是当前应该下载的资源,即应该下载哪一个资源都是由服务端发送过来的。
// ContainerLocalizer#localizeFiles
protected void localizeFiles(LocalizationProtocol nodemanager,
      CompletionService<Path> cs, UserGroupInformation ugi)
      throws IOException {
    while (true) {
      try {
        LocalizerStatus status = createStatus();
        LocalizerHeartbeatResponse response = nodemanager.heartbeat(status);
        switch (response.getLocalizerAction()) {
        case LIVE:
          List<ResourceLocalizationSpec> newRsrcs = response.getResourceSpecs();
          for (ResourceLocalizationSpec newRsrc : newRsrcs) {
            if (!pendingResources.containsKey(newRsrc.getResource())) {
              pendingResources.put(newRsrc.getResource(), cs.submit(download(
                new Path(newRsrc.getDestinationDirectory().getFile()),
                newRsrc.getResource(), ugi)));
            }
          }
          break;
        case DIE:
          // killall running localizations
          for (Future<Path> pending : pendingResources.values()) {
            pending.cancel(true);
          }
          status = createStatus();
          // ignore response
          try {
            nodemanager.heartbeat(status);
          } catch (YarnException e) { }
          return;
        }
        cs.poll(1000, TimeUnit.MILLISECONDS);
      } catch (InterruptedException e) {
        return;
      } catch (YarnException e) {
        // TODO cleanup
        return;
      }
    }
  }

服务端的处理

ResourceLocalizationService作为服务端,处理逻辑当然是在ResourceLocalizationService#heartbeat中,最终会委托给
LocalizerRunner#processHeartbeat进行处理。

  1. 判断请求中所有resource的状态,对于下载成功的资源FETCH_SUCCESS触发ResourceLocalizedEvent,这很重要,只有resource下载成功之后才会从ContainerImpl#pendingResources中移除,这样当ContainerImpl#pendingResources为空之时就是容器启动之日;
  2. 找出下一个需要下载的资源,并作为响应给客户端进行下载.
// LocalizerRunner#processHeartbeat
LocalizerHeartbeatResponse processHeartbeat(
        List<LocalResourceStatus> remoteResourceStatuses) {
      LocalizerHeartbeatResponse response =
        recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);

      String user = context.getUser();
      ApplicationId applicationId =
          context.getContainerId().getApplicationAttemptId().getApplicationId();

      boolean fetchFailed = false;
      // Update resource statuses.
      for (LocalResourceStatus stat : remoteResourceStatuses) {
        LocalResource rsrc = stat.getResource();
        LocalResourceRequest req = null;
        try {
          req = new LocalResourceRequest(rsrc);
        } catch (URISyntaxException e) {
          // TODO fail? Already translated several times...
        }
        LocalizerResourceRequestEvent assoc = scheduled.get(req);
        if (assoc == null) {
          // internal error
          LOG.error("Unknown resource reported: " + req);
          continue;
        }
        switch (stat.getStatus()) {
          case FETCH_SUCCESS:
            // notify resource
            try {
            getLocalResourcesTracker(req.getVisibility(), user, applicationId)
              .handle(
                new ResourceLocalizedEvent(req, ConverterUtils
                  .getPathFromYarnURL(stat.getLocalPath()), stat.getLocalSize()));
            } catch (URISyntaxException e) { }

            // unlocking the resource and removing it from scheduled resource
            // list
            assoc.getResource().unlock();
            scheduled.remove(req);
            break;
          case FETCH_PENDING:
            break;
          case FETCH_FAILURE:
            final String diagnostics = stat.getException().toString();
            LOG.warn(req + " failed: " + diagnostics);
            fetchFailed = true;
            getLocalResourcesTracker(req.getVisibility(), user, applicationId)
              .handle(new ResourceFailedLocalizationEvent(
                  req, diagnostics));

            // unlocking the resource and removing it from scheduled resource
            // list
            assoc.getResource().unlock();
            scheduled.remove(req);
            break;
          default:
            LOG.info("Unknown status: " + stat.getStatus());
            fetchFailed = true;
            getLocalResourcesTracker(req.getVisibility(), user, applicationId)
              .handle(new ResourceFailedLocalizationEvent(
                  req, stat.getException().getMessage()));
            break;
        }
      }
      if (fetchFailed || killContainerLocalizer.get()) {
        response.setLocalizerAction(LocalizerAction.DIE);
        return response;
      }

      // Give the localizer resources for remote-fetching.
      List<ResourceLocalizationSpec> rsrcs =
          new ArrayList<ResourceLocalizationSpec>();

      /*
       * TODO : It doesn't support multiple downloads per ContainerLocalizer
       * at the same time. We need to think whether we should support this.
       */
      LocalResource next = findNextResource();
      if (next != null) {
        try {
          ResourceLocalizationSpec resource =
              NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
                getPathForLocalization(next));
          rsrcs.add(resource);
        } catch (IOException e) {
          LOG.error("local path for PRIVATE localization could not be " +
            "found. Disks might have failed.", e);
        } catch (IllegalArgumentException e) {
          LOG.error("Inorrect path for PRIVATE localization."
              + next.getResource().getFile(), e);
        } catch (URISyntaxException e) {
            //TODO fail? Already translated several times...
        }
      }

      response.setLocalizerAction(LocalizerAction.LIVE);
      response.setResourceSpecs(rsrcs);
      return response;
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。