ApplicationMaster提交startContianers的请求给NodeManager之后,NodeManager很重要的一个步骤是将启动容器所需要的Resource从HDFS上面下载到NodeManager本地磁盘。
ContainerInitEvent处理
文件下载的入口是NodeManager端的ContainerImpl收到ContainerInitEvent事件,该事件由ContainerImpl$RequestResourcesTransition进行处理。
- 获取本次Container启动所需参数的上下文ContainerLaunchContext;
- 通过ContainerLaunchContext#getLocalResources获取所需的资源;
- 将每个LocalResource封装成ContainerImpl可用的LocalResourceRequest对象;
- 将所有的LocalResourceRequest加入ContainerImpl#pendingResources(一个map),
从名字可以看出这个是正在等待下载或正在下载中的resource,如果某个resource
已经下载完成便会从这个map中删除,因此这个map可以作为最后判断所有resource
是否下载完成的依据; - 将资源分为
PUBLIC
,PRIVATE
和APPLICATION
分别存进ContainerImpl#publicRsrcs,
ContainerImpl#privateRsrcs和ContainerImpl#appRsrcs中去; - 最后触发ContainerLocalizationRequestEvent,ContainerLocalizationRequestEvent
包含所有的resource请求和当前的ContainerImpl
// ContainerImpl$RequestResourcesTransition
static class RequestResourcesTransition implements
MultipleArcTransition<ContainerImpl,ContainerEvent,ContainerState> {
@Override
public ContainerState transition(ContainerImpl container,
ContainerEvent event) {
// ... 忽略其它逻辑,这里只看正常流程
// Send requests for public, private resources
final ContainerLaunchContext ctxt = container.launchContext;
Map<String,LocalResource> cntrRsrc = ctxt.getLocalResources();
if (!cntrRsrc.isEmpty()) {
try {
for (Map.Entry<String,LocalResource> rsrc : cntrRsrc.entrySet()) {
try {
LocalResourceRequest req =
new LocalResourceRequest(rsrc.getValue());
List<String> links = container.pendingResources.get(req);
if (links == null) {
links = new ArrayList<String>();
container.pendingResources.put(req, links);
}
links.add(rsrc.getKey());
switch (rsrc.getValue().getVisibility()) {
case PUBLIC:
container.publicRsrcs.add(req);
break;
case PRIVATE:
container.privateRsrcs.add(req);
break;
case APPLICATION:
container.appRsrcs.add(req);
break;
}
} catch (URISyntaxException e) {
LOG.info("Got exception parsing " + rsrc.getKey()
+ " and value " + rsrc.getValue());
throw e;
}
}
} catch (URISyntaxException e) {
// malformed resource; abort container launch
LOG.warn("Failed to parse resource-request", e);
container.cleanup();
container.metrics.endInitingContainer();
return ContainerState.LOCALIZATION_FAILED;
}
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
new HashMap<LocalResourceVisibility,
Collection<LocalResourceRequest>>();
if (!container.publicRsrcs.isEmpty()) {
req.put(LocalResourceVisibility.PUBLIC, container.publicRsrcs);
}
if (!container.privateRsrcs.isEmpty()) {
req.put(LocalResourceVisibility.PRIVATE, container.privateRsrcs);
}
if (!container.appRsrcs.isEmpty()) {
req.put(LocalResourceVisibility.APPLICATION, container.appRsrcs);
}
container.dispatcher.getEventHandler().handle(
new ContainerLocalizationRequestEvent(container, req));
return ContainerState.LOCALIZING;
} else {
container.sendLaunchEvent();
container.metrics.endInitingContainer();
return ContainerState.LOCALIZED;
}
}
}
ContainerLocalizationRequestEvent处理
ContainerLocalizationRequestEvent由ResourceLocalizationService#handle接受,具体处理逻辑是在ResourceLocalizationService#handleInitContainerResources.
对于上一步的每一个LocalResourceRequest找到与之对应的LocalResourcesTracker,一个LocalResourceRequest在具体下载的时候会被封装成一个LocalizedResource,LocalizedResource是具有状态的resource,LocalResourcesTracker负责管理一组LocalizedResource如一个应用对应的所LocalizedResource会被一个LocalResourcesTracker进行处理。对于每一个LocalResourceRequest都会触发ResourceRequestEvent。
// ResourceLocalizationService#handleInitContainerResources
private void handleInitContainerResources(
ContainerLocalizationRequestEvent rsrcReqs) {
Container c = rsrcReqs.getContainer();
// create a loading cache for the file statuses
LoadingCache<Path,Future<FileStatus>> statCache =
CacheBuilder.newBuilder().build(FSDownload.createStatusCacheLoader(getConfig()));
LocalizerContext ctxt = new LocalizerContext(
c.getUser(), c.getContainerId(), c.getCredentials(), statCache);
Map<LocalResourceVisibility, Collection<LocalResourceRequest>> rsrcs =
rsrcReqs.getRequestedResources();
for (Map.Entry<LocalResourceVisibility, Collection<LocalResourceRequest>> e :
rsrcs.entrySet()) {
LocalResourcesTracker tracker =
getLocalResourcesTracker(e.getKey(), c.getUser(),
c.getContainerId().getApplicationAttemptId()
.getApplicationId());
for (LocalResourceRequest req : e.getValue()) {
tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt));
}
}
}
ResourceRequestEvent的处理
ResourceRequestEvent首先由LocalResourcesTrackerImpl#handle接受,然后进行事件处理。
- 对于每一个LocalResourceRequest生成一个LocalizedResource,并以前者作为键,后者作为值
保存进LocalResourceRequest#localrsrc(一个map)中。注意这里只管了事件类型REQUEST
- 由生成的LocalizedResource处理ResourceRequestEvent,处理是在LocalizedResource#FetchResourceTransition,
逻辑很简单,将每一个生成的LocalizedResource封装进LocalizerResourceRequestEvent并触发之。
// LocalResourcesTrackerImpl#handle
public synchronized void handle(ResourceEvent event) {
LocalResourceRequest req = event.getLocalResourceRequest();
LocalizedResource rsrc = localrsrc.get(req);
switch (event.getType()) {
case LOCALIZED:
if (useLocalCacheDirectoryManager) {
inProgressLocalResourcesMap.remove(req);
}
break;
case REQUEST:
if (rsrc != null && (!isResourcePresent(rsrc))) {
LOG.info("Resource " + rsrc.getLocalPath()
+ " is missing, localizing it again");
removeResource(req);
rsrc = null;
}
if (null == rsrc) {
rsrc = new LocalizedResource(req, dispatcher);
localrsrc.put(req, rsrc);
}
break;
case RELEASE:
if (null == rsrc) {
// The container sent a release event on a resource which
// 1) Failed
// 2) Removed for some reason (ex. disk is no longer accessible)
ResourceReleaseEvent relEvent = (ResourceReleaseEvent) event;
LOG.info("Container " + relEvent.getContainer()
+ " sent RELEASE event on a resource request " + req
+ " not present in cache.");
return;
}
break;
case LOCALIZATION_FAILED:
/*
* If resource localization fails then Localized resource will be
* removed from local cache.
*/
removeResource(req);
break;
case RECOVERED:
if (rsrc != null) {
LOG.warn("Ignoring attempt to recover existing resource " + rsrc);
return;
}
rsrc = recoverResource(req, (ResourceRecoveredEvent) event);
localrsrc.put(req, rsrc);
break;
}
rsrc.handle(event);
if (event.getType() == ResourceEventType.LOCALIZED) {
if (rsrc.getLocalPath() != null) {
try {
stateStore.finishResourceLocalization(user, appId,
buildLocalizedResourceProto(rsrc));
} catch (IOException ioe) {
LOG.error("Error storing resource state for " + rsrc, ioe);
}
} else {
LOG.warn("Resource " + rsrc + " localized without a location");
}
}
}
LocalizerResourceRequestEvent的处理
处理逻辑是在ResourceLocalizationService#handle中,这里正是对resource进行下载。
- 对于
PUBLIC
的资源在ResourceLocalizationService中有一个专门的线程对其进行下载; - 对于
PRIVATE
和APPLICATION
的资源每一个Container对应一个LocalizerRunner进行下载; - 本文主要针对LocalizerRunner进行说明。
// 处理逻辑是在ResourceLocalizationService#handle
public void handle(LocalizerEvent event) {
String locId = event.getLocalizerId();
switch (event.getType()) {
case REQUEST_RESOURCE_LOCALIZATION:
// 0) find running localizer or start new thread
LocalizerResourceRequestEvent req =
(LocalizerResourceRequestEvent)event;
switch (req.getVisibility()) {
case PUBLIC:
publicLocalizer.addResource(req);
break;
case PRIVATE:
case APPLICATION:
synchronized (privLocalizers) {
LocalizerRunner localizer = privLocalizers.get(locId);
if (null == localizer) {
LOG.info("Created localizer for " + locId);
localizer = new LocalizerRunner(req.getContext(), locId);
privLocalizers.put(locId, localizer);
localizer.start();
}
// 1) propagate event
localizer.addResource(req);
}
break;
}
break;
}
}
下载过程
客户端的处理
LocalizerRunner#run会将具体下载任务委托给ContainerExecutor(默认实现为DefaultContainerExecutor),
而DefaultContainerExecutor则会创建一个RPC客户端ContainerLocalizer通过LocalizationProtocol与
ResourceLocalizationService(作为服务端)进行通讯,按顺序获取需要下载启动容器需要资源的信息。
- 上面两个组件都是NodeManager端的,为什么利用RPC进行通讯,咱也不知道。
ContainerLocalizer#runLocalization将所有的下载任务给一个线程池执行就完事了。注意这是一个由CompletionService封装的线程池,CompletionService具有这样的功能:将所有任务的Futrue封装进一个BlockingQueue,可以通过这个队列获取所有任务的future,完成一些阻塞操作。
// ContainerLocalizer#runLocalization
public int runLocalization(final InetSocketAddress nmAddr)
throws IOException, InterruptedException {
// ...忽略不必要的代码
ExecutorService exec = null;
try {
exec = createDownloadThreadPool();
CompletionService<Path> ecs = createCompletionService(exec);
localizeFiles(nodeManager, ecs, ugi);
return 0;
} catch (Throwable e) {
// Print traces to stdout so that they can be logged by the NM address
// space.
e.printStackTrace(System.out);
return -1;
} finally {
try {
if (exec != null) {
exec.shutdownNow();
}
LocalDirAllocator.removeContext(appCacheDirContextName);
} finally {
closeFileSystems(ugi);
}
}
}
具体下载逻辑是在ContainerLocalizer#localizeFiles中,这也是Client与Server通过RPC交互的地方
- 这个方法是一个死循环
- 每次发送RPC请求之前都会调用ContainerLocalizer#createStatus
- ContainerLocalizer#createStatus会更新ContainerLocalizer#pendingResources中所有资源的状态,
对于每一个提交给线程池下在的资源都会保存在ContainerLocalizer#pendingResources中。 - 服务端就是根据这些状态来判断资源下载成功、下载失败、还是正在下载中,这几个装填是依据每个
任务的Future进行处理的。 - 发送请求
nodemanager.heartbeat(status)
获得的相应是当前应该下载的资源,即应该下载哪一个资源都是由服务端发送过来的。
// ContainerLocalizer#localizeFiles
protected void localizeFiles(LocalizationProtocol nodemanager,
CompletionService<Path> cs, UserGroupInformation ugi)
throws IOException {
while (true) {
try {
LocalizerStatus status = createStatus();
LocalizerHeartbeatResponse response = nodemanager.heartbeat(status);
switch (response.getLocalizerAction()) {
case LIVE:
List<ResourceLocalizationSpec> newRsrcs = response.getResourceSpecs();
for (ResourceLocalizationSpec newRsrc : newRsrcs) {
if (!pendingResources.containsKey(newRsrc.getResource())) {
pendingResources.put(newRsrc.getResource(), cs.submit(download(
new Path(newRsrc.getDestinationDirectory().getFile()),
newRsrc.getResource(), ugi)));
}
}
break;
case DIE:
// killall running localizations
for (Future<Path> pending : pendingResources.values()) {
pending.cancel(true);
}
status = createStatus();
// ignore response
try {
nodemanager.heartbeat(status);
} catch (YarnException e) { }
return;
}
cs.poll(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
return;
} catch (YarnException e) {
// TODO cleanup
return;
}
}
}
服务端的处理
ResourceLocalizationService作为服务端,处理逻辑当然是在ResourceLocalizationService#heartbeat中,最终会委托给
LocalizerRunner#processHeartbeat进行处理。
- 判断请求中所有resource的状态,对于下载成功的资源
FETCH_SUCCESS
触发ResourceLocalizedEvent,这很重要,只有resource下载成功之后才会从ContainerImpl#pendingResources中移除,这样当ContainerImpl#pendingResources为空之时就是容器启动之日; - 找出下一个需要下载的资源,并作为响应给客户端进行下载.
// LocalizerRunner#processHeartbeat
LocalizerHeartbeatResponse processHeartbeat(
List<LocalResourceStatus> remoteResourceStatuses) {
LocalizerHeartbeatResponse response =
recordFactory.newRecordInstance(LocalizerHeartbeatResponse.class);
String user = context.getUser();
ApplicationId applicationId =
context.getContainerId().getApplicationAttemptId().getApplicationId();
boolean fetchFailed = false;
// Update resource statuses.
for (LocalResourceStatus stat : remoteResourceStatuses) {
LocalResource rsrc = stat.getResource();
LocalResourceRequest req = null;
try {
req = new LocalResourceRequest(rsrc);
} catch (URISyntaxException e) {
// TODO fail? Already translated several times...
}
LocalizerResourceRequestEvent assoc = scheduled.get(req);
if (assoc == null) {
// internal error
LOG.error("Unknown resource reported: " + req);
continue;
}
switch (stat.getStatus()) {
case FETCH_SUCCESS:
// notify resource
try {
getLocalResourcesTracker(req.getVisibility(), user, applicationId)
.handle(
new ResourceLocalizedEvent(req, ConverterUtils
.getPathFromYarnURL(stat.getLocalPath()), stat.getLocalSize()));
} catch (URISyntaxException e) { }
// unlocking the resource and removing it from scheduled resource
// list
assoc.getResource().unlock();
scheduled.remove(req);
break;
case FETCH_PENDING:
break;
case FETCH_FAILURE:
final String diagnostics = stat.getException().toString();
LOG.warn(req + " failed: " + diagnostics);
fetchFailed = true;
getLocalResourcesTracker(req.getVisibility(), user, applicationId)
.handle(new ResourceFailedLocalizationEvent(
req, diagnostics));
// unlocking the resource and removing it from scheduled resource
// list
assoc.getResource().unlock();
scheduled.remove(req);
break;
default:
LOG.info("Unknown status: " + stat.getStatus());
fetchFailed = true;
getLocalResourcesTracker(req.getVisibility(), user, applicationId)
.handle(new ResourceFailedLocalizationEvent(
req, stat.getException().getMessage()));
break;
}
}
if (fetchFailed || killContainerLocalizer.get()) {
response.setLocalizerAction(LocalizerAction.DIE);
return response;
}
// Give the localizer resources for remote-fetching.
List<ResourceLocalizationSpec> rsrcs =
new ArrayList<ResourceLocalizationSpec>();
/*
* TODO : It doesn't support multiple downloads per ContainerLocalizer
* at the same time. We need to think whether we should support this.
*/
LocalResource next = findNextResource();
if (next != null) {
try {
ResourceLocalizationSpec resource =
NodeManagerBuilderUtils.newResourceLocalizationSpec(next,
getPathForLocalization(next));
rsrcs.add(resource);
} catch (IOException e) {
LOG.error("local path for PRIVATE localization could not be " +
"found. Disks might have failed.", e);
} catch (IllegalArgumentException e) {
LOG.error("Inorrect path for PRIVATE localization."
+ next.getResource().getFile(), e);
} catch (URISyntaxException e) {
//TODO fail? Already translated several times...
}
}
response.setLocalizerAction(LocalizerAction.LIVE);
response.setResourceSpecs(rsrcs);
return response;
}