Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
Slot 概念
Flink中的Slot是一组资源的集合,包含CPU核心数,task堆内存,task对外内存,管理内存和网络内存。同时slot也是Flink的资源分配单位。
一个TaskManager中包含一个或者多个Slot。根据slot共享配置,一个slot中可同时运行多个task。这些task以工作线程的形式存在于slot中。
TaskManager,Slot,Task和并行度parallelism的关系如下图所示(引用官网的图):
Slot 相关的一些类
SchedulerImpl
SchedulerImpl
负责为Execution节点的任务执行分配slot。
在后面的分析中涉及到的SchedulerImpl
两个最重要的方法为allocateSlot
和allocateBatchSlot
。这两个方法的逻辑基本相同,只是前一个方法参数中多了分配slot超时时间。
具体分配slot的流程较为复杂,在后面分析slot申请流程的时候再讲解。
SlotSharingManager
SlotSharingManager
负责Slot共享。Slot共享指的是不同的task在同一个slot中运行。
SlotSharingManager
维护了一个slot层级结构:其中根节点和层级结构的中间节点为MultiTaskSlot
。MultiTaskSlot
可从属于另一个MultiTaskSlot
,同时它又包含多个MultiTaskSlot
或SingleTaskSlot
,这样就形成了层级结构。SingleTaskSlot
是slot层级结构中的最底层节点,只能拥有一个parent作为它的父节点。
Slot共享正是通过这种层级结构体现出来的。一个Slot被多个task共享,以Slot层级结构表示就是一个MultiTaskSlot
包含多个SingleTaskSlot
。
下面我们分析下几个重要的方法。
createRootSlot
创建一个根节点slot,该Slot的类型为MultiTaskSlot
。
@Nonnull
MultiTaskSlot createRootSlot(
SlotRequestId slotRequestId,
CompletableFuture<? extends SlotContext> slotContextFuture,
SlotRequestId allocatedSlotRequestId) {
LOG.debug("Create multi task slot [{}] in slot [{}].", slotRequestId, allocatedSlotRequestId);
final CompletableFuture<SlotContext> slotContextFutureAfterRootSlotResolution = new CompletableFuture<>();
// 创建一个根节点
// 这个方法同时将创建出的MultiTaskSlot存入到allTaskSlots和unresolvedRootSlots集合中
final MultiTaskSlot rootMultiTaskSlot = createAndRegisterRootSlot(
slotRequestId,
allocatedSlotRequestId,
slotContextFutureAfterRootSlotResolution);
// 当slotContextFuture完成后执行
// slotContextFuture是向SlotPool申请slot的过程
// 这个future在SlotPoolImpl的tryFulfillSlotRequestOrMakeAvailable方法中complete
FutureUtils.forward(
slotContextFuture.thenApply(
(SlotContext slotContext) -> {
// add the root node to the set of resolved root nodes once the SlotContext future has
// been completed and we know the slot's TaskManagerLocation
// 此时slot已经分配完毕,将该slot从unresolvedRootSlots集合移除
// 存入到resolvedRootSlots集合中
tryMarkSlotAsResolved(slotRequestId, slotContext);
return slotContext;
}),
slotContextFutureAfterRootSlotResolution);
return rootMultiTaskSlot;
}
SlotPool
SlotPool
用于缓存slot。它接收ExecutionGraph
发起的slot申请,为其分配执行任务所需的slot。如果SlotPool无法处理slot请求,他会尝试去连接ResourceManager
获取新的slot。如果ResourceManager
目前状态不可用,被ResourceManager
拒绝或者是请求超时,则slot申请失败。SlotPool
缓存了一部分slot,在ResourceManager
不可用的时候,SlotPool仍然可以提供已注册的空闲slot。这些Slot只会在它们不再被使用的时候释放掉。比如说作业在运行但仍有空闲slot这种情况。
启动方法
SlotPool
在JobMaster
的startJobMasterServices
中启动。该方法中注册了两个周期任务:检测空闲的slot和批量检测超时的slot
public void start(
@Nonnull JobMasterId jobMasterId,
@Nonnull String newJobManagerAddress,
@Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) throws Exception {
this.jobMasterId = jobMasterId;
this.jobManagerAddress = newJobManagerAddress;
this.componentMainThreadExecutor = componentMainThreadExecutor;
scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);
scheduleRunAsync(this::checkBatchSlotTimeout, batchSlotTimeout);
if (log.isDebugEnabled()) {
scheduleRunAsync(this::scheduledLogStatus, STATUS_LOG_INTERVAL_MS, TimeUnit.MILLISECONDS);
}
}
checkIdleSlot
该方法逻辑为SlotPool
周期运行任务之一,用户定期检测空闲slot。
protected void checkIdleSlot() {
// The timestamp in SlotAndTimestamp is relative
// 获取当前时间
final long currentRelativeTimeMillis = clock.relativeTimeMillis();
// 创建用于保存空闲slot的集合
final List<AllocatedSlot> expiredSlots = new ArrayList<>(availableSlots.size());
// 遍历找出所有空闲的slot
for (SlotAndTimestamp slotAndTimestamp : availableSlots.availableSlots.values()) {
if (currentRelativeTimeMillis - slotAndTimestamp.timestamp > idleSlotTimeout.toMilliseconds()) {
expiredSlots.add(slotAndTimestamp.slot);
}
}
final FlinkException cause = new FlinkException("Releasing idle slot.");
for (AllocatedSlot expiredSlot : expiredSlots) {
// 获取每个过期slot的allocation ID
final AllocationID allocationID = expiredSlot.getAllocationId();
// 移除该allocation id对应的slot
if (availableSlots.tryRemove(allocationID) != null) {
log.info("Releasing idle slot [{}].", allocationID);
// RPC调用空闲slot所在的TaskManager,通知去释放掉这个slot
final CompletableFuture<Acknowledge> freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(
allocationID,
cause,
rpcTimeout);
// RPC调用完成执行
// 如果释放slot出现异常,废弃掉这个slot,下次心跳的时候向taskManager同步slot状态
FutureUtils.whenCompleteAsyncIfNotDone(
freeSlotFuture,
componentMainThreadExecutor,
(Acknowledge ignored, Throwable throwable) -> {
if (throwable != null) {
// The slot status will be synced to task manager in next heartbeat.
log.debug("Releasing slot [{}] of registered TaskExecutor {} failed. Discarding slot.",
allocationID, expiredSlot.getTaskManagerId(), throwable);
}
});
}
}
// 安排下一次调用时间,实现周期调用
scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);
}
checkBatchSlotTimeout
protected void checkBatchSlotTimeout() {
// 如果没开启批量超时检测,方法直接返回
if (!batchSlotRequestTimeoutCheckEnabled) {
return;
}
final Collection<PendingRequest> pendingBatchRequests = getPendingBatchRequests();
if (!pendingBatchRequests.isEmpty()) {
// 获取积压的slot请求
final Set<ResourceProfile> allocatedResourceProfiles = getAllocatedResourceProfiles();
//将这些slot申请按照资源要求进行分组,和已分配过的slot的资源要求相同的分为一组,其余的在另一组
final Map<Boolean, List<PendingRequest>> fulfillableAndUnfulfillableRequests = pendingBatchRequests
.stream()
.collect(Collectors.partitioningBy(canBeFulfilledWithAllocatedSlot(allocatedResourceProfiles)));
// 提取出资源要求相同和不同的两组积压的slot请求
final List<PendingRequest> fulfillableRequests = fulfillableAndUnfulfillableRequests.get(true);
final List<PendingRequest> unfulfillableRequests = fulfillableAndUnfulfillableRequests.get(false);
final long currentTimestamp = clock.relativeTimeMillis();
// 标记为可满足要求
for (PendingRequest fulfillableRequest : fulfillableRequests) {
fulfillableRequest.markFulfillable();
}
for (PendingRequest unfulfillableRequest : unfulfillableRequests) {
// 更新请求为无法满足,并设置时间
unfulfillableRequest.markUnfulfillable(currentTimestamp);
if (unfulfillableRequest.getUnfulfillableSince() + batchSlotTimeout.toMilliseconds() <= currentTimestamp) {
// 如果请求已超时,调用超时处理逻辑,后面分析
timeoutPendingSlotRequest(unfulfillableRequest.getSlotRequestId());
}
}
}
// 安排下一次调用时间,实现周期调用
scheduleRunAsync(this::checkBatchSlotTimeout, batchSlotTimeout);
}
// 处理超时积压请求的方法
protected void timeoutPendingSlotRequest(SlotRequestId slotRequestId) {
log.info("Pending slot request [{}] timed out.", slotRequestId);
// 从waitingForResourceManager和pendingRequests中移除这个request
final PendingRequest pendingRequest = removePendingRequest(slotRequestId);
// 异步抛出请求超时异常
if (pendingRequest != null) {
pendingRequest
.getAllocatedSlotFuture()
.completeExceptionally(new TimeoutException("Pending slot request timed out in SlotPool."));
}
}
ResourceManager
和JobManager,TaskManager一样,ResourceManager
也是Flink中的一个重要角色。ResourceManager
负责资源的分配和撤回,以及资源的登记保管。ResourceManager
具有HA功能,可参与选主。ResourceManager
还持有JobManager的连接。后来创建出的TaskManager可以通过registerTaskExecutor
方法注册到ResourceManager
中。
ResourceManager
中最为重要的成员为SlotManager
。可用的slot交由SlotManager
维护。
ResourceManager
本身是一个抽象类。它有两个子类
- StandaloneResourceManager:用于standalone模式部署的时候。
- ActiveResourceManager:用于非standalone模式。其中有一个成员变量
ResourceManagerDriver
。ResourceManagerDriver
有多个子类,分别对应着支持Kubernetes, Mesos和Yarn。
SlotManager
SlotManager
负责维护所有已注册的slot。SlotManager
统计了所有的已注册slot,空闲的slot,积压待分配的slot(pendingSlot),积压的slot请求(pendingSlotRequest)以及以满足的slot请求。
TaskSlotTable
Task Manager上的slot和task的分配表,是TaskSlot的容器。它维护了多个索引,用于快速访问task和分配给它的slot。
下面我们分析下它的主要方法。
start 方法
在使用TaskSlotTable
之前必须先启动它。启动方法为start
如下所示:
start
方法:
@Override
public void start(SlotActions initialSlotActions, ComponentMainThreadExecutor mainThreadExecutor) {
// 检查状态,必须为CREATED
Preconditions.checkState(
state == State.CREATED,
"The %s has to be just created before starting",
TaskSlotTableImpl.class.getSimpleName());
// 设置slotAction,下面分析
this.slotActions = Preconditions.checkNotNull(initialSlotActions);
// 设置主线程执行器
this.mainThreadExecutor = Preconditions.checkNotNull(mainThreadExecutor);
// 一个定时任务,可以schedule多个事件,到期时通知对应的timeout listener
timerService.start(this);
// 改变状态为RUNNING,防止反复启动
state = State.RUNNING;
}
SlotAction
SlotAction
包含了Slot分配动作的回调逻辑。该接口包含了两个回调动作:
- freeSlot:slot被释放的时候回调
- timeoutSlot:slot超时的时候回调
接口代码如下:
public interface SlotActions {
/**
* Free the task slot with the given allocation id.
*
* @param allocationId to identify the slot to be freed
*/
void freeSlot(AllocationID allocationId);
/**
* Timeout the task slot for the given allocation id. The timeout is identified by the given
* ticket to filter invalid timeouts out.
*
* @param allocationId identifying the task slot to be timed out
* @param ticket allowing to filter invalid timeouts out
*/
void timeoutSlot(AllocationID allocationId, UUID ticket);
}
其中AllocationID是JobManager通过ResourceManager分配的物理Slot对应的唯一标识。在JobManager第一次请求的时候指定,重试的时候保持不变。这个ID用于TaskManager和ResourceManager追踪和同步slot的分配状态。和SlotRequestId不同的是,task从SlotPool中请求逻辑slot的时候使用SlotRequestId。由于存在slot共享的缘故,多个逻辑slot的请求可能映射到同一个物理slot请求。
SlotAction
唯一的实现类是SlotActionsImpl
,位于TaskExecutor.java
中。稍后用到的时候在分析它。
allocateSlot 方法
为指定的job分配一个slot,使用指定的index。如果index为负数则使用自增的index。如果slot可以分配,返回true。
@Override
public boolean allocateSlot(
int index,
JobID jobId,
AllocationID allocationId,
ResourceProfile resourceProfile,
Time slotTimeout) {
// 检查TaskSlotTable的状态是否为RUNNING
checkRunning();
Preconditions.checkArgument(index < numberSlots);
// 检查这个allocation id是否已经分配过slot
// 如果分配过,直接返回
TaskSlot<T> taskSlot = allocatedSlots.get(allocationId);
if (taskSlot != null) {
LOG.info("Allocation ID {} is already allocated in {}.", allocationId, taskSlot);
return false;
}
// 如果taskSlots列表包含这个index
if (taskSlots.containsKey(index)) {
// 获取这个重复的taskslot
TaskSlot<T> duplicatedTaskSlot = taskSlots.get(index);
LOG.info("Slot with index {} already exist, with resource profile {}, job id {} and allocation id {}.",
index,
duplicatedTaskSlot.getResourceProfile(),
duplicatedTaskSlot.getJobId(),
duplicatedTaskSlot.getAllocationId());
// 只有在这个重复的taskslot的job id和allocation id相同的情况下,才允许分配
return duplicatedTaskSlot.getJobId().equals(jobId) &&
duplicatedTaskSlot.getAllocationId().equals(allocationId);
} else if (allocatedSlots.containsKey(allocationId)) {
// 如果allocation id已经分配过slot,返回true
// 这里有疑问,上面已经检测过是否已分配,不太可能进入这个分支
return true;
}
// 如果index大于等于0,使用默认的ResourceProfile,否则使用方法传入的resourceProfile
resourceProfile = index >= 0 ? defaultSlotResourceProfile : resourceProfile;
// 检查是否还能够分配出满足条件的资源
if (!budgetManager.reserve(resourceProfile)) {
LOG.info("Cannot allocate the requested resources. Trying to allocate {}, "
+ "while the currently remaining available resources are {}, total is {}.",
resourceProfile,
budgetManager.getAvailableBudget(),
budgetManager.getTotalBudget());
return false;
}
// 创建一个新的TaskSlot
taskSlot = new TaskSlot<>(index, resourceProfile, memoryPageSize, jobId, allocationId, memoryVerificationExecutor);
if (index >= 0) {
// 存入taskSlots集合
taskSlots.put(index, taskSlot);
}
// update the allocation id to task slot map
// 加入到已分配slot的集合中
allocatedSlots.put(allocationId, taskSlot);
// register a timeout for this slot since it's in state allocated
// 注册slot的超时时间定时器,在slot超时后会调用超时处理逻辑
timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());
// add this slot to the set of job slots
// 将slot和job id关联起来
Set<AllocationID> slots = slotsPerJob.get(jobId);
if (slots == null) {
slots = new HashSet<>(4);
slotsPerJob.put(jobId, slots);
}
slots.add(allocationId);
return true;
}
addTask
将task添加到slot中,通过allocation id匹配。
@Override
public boolean addTask(T task) throws SlotNotFoundException, SlotNotActiveException {
// 检查TaskSlotTable是否在运行
checkRunning();
Preconditions.checkNotNull(task);
// 从allocatedSlots集合获取taskSlot
TaskSlot<T> taskSlot = getTaskSlot(task.getAllocationId());
if (taskSlot != null) {
// 如果taskSlot在运行状态,job id和allocation id与task的相同
if (taskSlot.isActive(task.getJobID(), task.getAllocationId())) {
// 将task指定给taskslot,并且设定映射关系
if (taskSlot.add(task)) {
taskSlotMappings.put(task.getExecutionId(), new TaskSlotMapping<>(task, taskSlot));
return true;
} else {
return false;
}
} else {
throw new SlotNotActiveException(task.getJobID(), task.getAllocationId());
}
} else {
throw new SlotNotFoundException(task.getAllocationId());
}
}
createSlotReport
这个方法返回当前TaskManager中slot分配情况的报告。
返回的SlotReport
是TaskExecutor
中一系列slot状态的报告。
@Override
public SlotReport createSlotReport(ResourceID resourceId) {
List<SlotStatus> slotStatuses = new ArrayList<>();
// 获取固定分配的slot状态
for (int i = 0; i < numberSlots; i++) {
SlotID slotId = new SlotID(resourceId, i);
SlotStatus slotStatus;
if (taskSlots.containsKey(i)) {
TaskSlot<T> taskSlot = taskSlots.get(i);
slotStatus = new SlotStatus(
slotId,
taskSlot.getResourceProfile(),
taskSlot.getJobId(),
taskSlot.getAllocationId());
} else {
slotStatus = new SlotStatus(
slotId,
defaultSlotResourceProfile,
null,
null);
}
slotStatuses.add(slotStatus);
}
// 获取自动分配的slot状态
for (TaskSlot<T> taskSlot : allocatedSlots.values()) {
// slot id小于0表示该slot是动态分配的
if (taskSlot.getIndex() < 0) {
SlotID slotID = SlotID.generateDynamicSlotID(resourceId);
SlotStatus slotStatus = new SlotStatus(
slotID,
taskSlot.getResourceProfile(),
taskSlot.getJobId(),
taskSlot.getAllocationId());
slotStatuses.add(slotStatus);
}
}
final SlotReport slotReport = new SlotReport(slotStatuses);
return slotReport;
}
TaskSlot
TaskSlot
是TaskSlotTable
中维护的Slot的类型的包装类。TaskSlot
是一个容器,内部有一个tasks变量,负责维护属于同一个slot的所有tasks。
TaskSlot 的成员变量
/** Index of the task slot. */
// taskSlot的索引
// 小于0的值表示动态分配
private final int index;
/** Resource characteristics for this slot. */
private final ResourceProfile resourceProfile;
/** Tasks running in this slot. */
private final Map<ExecutionAttemptID, T> tasks;
private final MemoryManager memoryManager;
/** State of this slot. */
private TaskSlotState state;
/** Job id to which the slot has been allocated. */
private final JobID jobId;
/** Allocation id of this slot. */
private final AllocationID allocationId;
/** The closing future is completed when the slot is freed and closed. */
private final CompletableFuture<Void> closingFuture;
ResourceProfile
ResourceProfile
是Slot资源需求的一个包装类。它的所有字段都是final类型,一旦创建后不可再修改。
ResourceProfile
的主要成员变量如下所示:
/** How many cpu cores are needed. Can be null only if it is unknown. */
// CPU核心数
@Nullable
private final Resource cpuCores;
/** How much task heap memory is needed. */
// task堆内存
@Nullable // can be null only for UNKNOWN
private final MemorySize taskHeapMemory;
/** How much task off-heap memory is needed. */
// task堆外内存
@Nullable // can be null only for UNKNOWN
private final MemorySize taskOffHeapMemory;
/** How much managed memory is needed. */
// 管理内存
@Nullable // can be null only for UNKNOWN
private final MemorySize managedMemory;
/** How much network memory is needed. */
// 网络传输缓存
@Nullable // can be null only for UNKNOWN
private final MemorySize networkMemory;
/** A extensible field for user specified resources from {@link ResourceSpec}. */
// 其他类型的资源,在Resource中指定
private final Map<String, Resource> extendedResources = new HashMap<>(1);
AllocationID
AllocationID
是JobManager
通过ResourceManger
申请物理slot时的唯一标识。它在SlotPoolImpl
的requestSlotFromResourceManager
方法中创建并确定下来,以后即便是请求重试,AllocationID
也不会再改变。
调用流程
Flink Slot分配全过程涉及到的几个重点类的调用流程如下图。
Slot 申请流程
Slot申请流程我们从ExecutionGraph
分配资源开始分析,一路跟踪,直到TaskExecutor
中创建出slot。
我们从JobManager分配资源的入口开始逐个分析调用流程。
Execution 的 allocateAndAssignSlotForExecution
该方法为ExecutionGraph
中的一个顶点vertex分配其执行所需的slot。
private CompletableFuture<LogicalSlot> allocateAndAssignSlotForExecution(
SlotProviderStrategy slotProviderStrategy,
LocationPreferenceConstraint locationPreferenceConstraint,
@Nonnull Set<AllocationID> allPreviousExecutionGraphAllocationIds) {
checkNotNull(slotProviderStrategy);
// 检测必须在JobMaster的主线程执行
assertRunningInJobMasterMainThread();
// 获取ExecutionGraph任务定点的slot共享组配置
// 在slotSharingGroup是软限制,位于同一个slotSharingGroup的task可在同一个slot中运行
final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();
// CoLocationConstraint管理task的执行位置
final CoLocationConstraint locationConstraint = vertex.getLocationConstraint();
// this method only works if the execution is in the state 'CREATED'
// 从CREATE状态变更为SCHEDULED状态,只有初始为CREATED状态时才返回true
if (transitionState(CREATED, SCHEDULED)) {
// 获取SlotSharingGroup的ID
final SlotSharingGroupId slotSharingGroupId = sharingGroup.getSlotSharingGroupId();
// 构建调度单元,将Execution vertex,slotSharingGroup和locationConstraint封装在一起
ScheduledUnit toSchedule = locationConstraint == null ?
new ScheduledUnit(this, slotSharingGroupId) :
new ScheduledUnit(this, slotSharingGroupId, locationConstraint);
// try to extract previous allocation ids, if applicable, so that we can reschedule to the same slot
ExecutionVertex executionVertex = getVertex();
// 获取最近一次执行分配的allocation id
AllocationID lastAllocation = executionVertex.getLatestPriorAllocation();
// allocation id装入集合中
Collection<AllocationID> previousAllocationIDs =
lastAllocation != null ? Collections.singletonList(lastAllocation) : Collections.emptyList();
// calculate the preferred locations
// 计算task首选运行位置,在哪些task manager
final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture =
calculatePreferredLocations(locationPreferenceConstraint);
final SlotRequestId slotRequestId = new SlotRequestId();
final CompletableFuture<LogicalSlot> logicalSlotFuture =
preferredLocationsFuture.thenCompose(
(Collection<TaskManagerLocation> preferredLocations) -> {
LOG.info("Allocating slot with SlotRequestID {} for the execution attempt {}.", slotRequestId, attemptId);
// 组合上一个CompletableFuture
// 等task执行位置计算完毕后,调用SlotProviderStrategy(slot供给策略)的分配slot逻辑
return slotProviderStrategy.allocateSlot(
slotRequestId,
toSchedule,
SlotProfile.priorAllocation(
vertex.getResourceProfile(),
getPhysicalSlotResourceProfile(vertex),
preferredLocations,
previousAllocationIDs,
allPreviousExecutionGraphAllocationIds));
});
// register call back to cancel slot request in case that the execution gets canceled
// 当分配的资源被回收的时候调用
releaseFuture.whenComplete(
(Object ignored, Throwable throwable) -> {
// 如果slot请求取消
// 调用取消Slot请求的逻辑
if (logicalSlotFuture.cancel(false)) {
slotProviderStrategy.cancelSlotRequest(
slotRequestId,
slotSharingGroupId,
new FlinkException("Execution " + this + " was released."));
}
});
// This forces calls to the slot pool back into the main thread, for normal and exceptional completion
// 将携带了LogicalSlot的future返回
return logicalSlotFuture.handle(
(LogicalSlot logicalSlot, Throwable failure) -> {
if (failure != null) {
throw new CompletionException(failure);
}
// 如果logicalSlot可以分配给execution,返回true
if (tryAssignResource(logicalSlot)) {
return logicalSlot;
} else {
// release the slot
// 如果无法分配,释放掉这个slot
logicalSlot.releaseSlot(new FlinkException("Could not assign logical slot to execution " + this + '.'));
throw new CompletionException(
new FlinkException(
"Could not assign slot " + logicalSlot + " to execution " + this + " because it has already been assigned "));
}
});
} else {
// call race, already deployed, or already done
throw new IllegalExecutionStateException(this, CREATED, state);
}
}
这里涉及到两个类:SlotSharingGroup
和CoLocationConstraint
。
其中SlotSharingGroup
是slot共享的软限制。group id相同的Execution Vertex可以被调度到同一个slot中执行。它包含3个成员变量:
// 保存属于这个group的execution vertex
private final Set<JobVertexID> ids = new TreeSet<>();
// group id,由long类型的lowerPart和upperPart构成
private final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
// 组内所有task的资源需求
private ResourceSpec resourceSpec = ResourceSpec.ZERO;
相对于SlotSharingGroup
而言,CoLocationConstraint
是slot共享的硬限制。CoLocationConstraint
规定了task(execution顶点)在哪里执行。CoLocationConstraint
将ColocationGroup
和TaskManagerLocation
绑定在一起,属于同一个ColocationGroup
的task都在指定的TaskManager
中运行。ColocationGroup
持有一系列JobVertex
的集合。这里就不在贴出代码了。
接着我们重点跟踪SlotProviderStrategy
的allocateSlot
方法。
SlotProviderStrategy
具有两个子类:
- BatchSlotProviderStrategy:不指定分配slot操作的超时时间
- NormalSlotProviderStrategy:指定分配slot操作的超时时间,除此之外其他逻辑和
BatchSlotProviderStrategy
一模一样
以NormalSlotProviderStrategy
为例,它的allocateSlot
方法调用了SchedulerImpl
的allocateSlot
。一路追踪调用:allocateSlot
-> allocateSlotInternal
-> internalAllocateSlot
-> allocateSharedSlot
。
SchedulerImpl 的 allocateSharedSlot
private CompletableFuture<LogicalSlot> allocateSharedSlot(
SlotRequestId slotRequestId,
ScheduledUnit scheduledUnit,
SlotProfile slotProfile,
@Nullable Time allocationTimeout) {
// allocate slot with slot sharing
// 构建一个SlotSharingManager
// 负责管理slot共享。slot共享允许同一个slot运行不同的任务
final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent(
scheduledUnit.getSlotSharingGroupId(),
id -> new SlotSharingManager(
id,
slotPool,
this));
// MultiTaskSlotLocality为MultiTaskSlot和Locality的封装
final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality;
try {
// 判断是否有colocation限制,调用不同的分配多任务slot方法
if (scheduledUnit.getCoLocationConstraint() != null) {
multiTaskSlotLocality = allocateCoLocatedMultiTaskSlot(
scheduledUnit.getCoLocationConstraint(),
multiTaskSlotManager,
slotProfile,
allocationTimeout);
} else {
multiTaskSlotLocality = allocateMultiTaskSlot(
scheduledUnit.getJobVertexId(),
multiTaskSlotManager,
slotProfile,
allocationTimeout);
}
} catch (NoResourceAvailableException noResourceException) {
return FutureUtils.completedExceptionally(noResourceException);
}
// sanity check
// 检查这个multiTaskSlotLocality对象的MultiTaskSlot或者是其子slot需要包含jobVertex id
Preconditions.checkState(!multiTaskSlotLocality.getMultiTaskSlot().contains(scheduledUnit.getJobVertexId()));
// 在这个MultiTaskSlot下分配一个SingleTaskSlot
final SlotSharingManager.SingleTaskSlot leaf = multiTaskSlotLocality.getMultiTaskSlot().allocateSingleTaskSlot(
slotRequestId,
slotProfile.getTaskResourceProfile(),
scheduledUnit.getJobVertexId(),
multiTaskSlotLocality.getLocality());
return leaf.getLogicalSlotFuture();
}
allocateCoLocatedMultiTaskSlot
该方法分配具有colocation限制的MultiTaskSlot
。
在分析这个方法之前我们要先了解下Locality
这个枚举,它表示task需要如何调度执行。各个值的解释如下:
- UNCONSTRAINED:没有限制task调度到何处
- LOCAL:task分配到同一个TaskManager中
- HOST_LOCAL:task分配到同一个主机上
- NON_LOCAL:task分配到除了locality偏好之外的地方
- UNKNOWN:未知
下面是allocateCoLocatedMultiTaskSlot
方法的代码和分析:
private SlotSharingManager.MultiTaskSlotLocality allocateCoLocatedMultiTaskSlot(
CoLocationConstraint coLocationConstraint,
SlotSharingManager multiTaskSlotManager,
SlotProfile slotProfile,
@Nullable Time allocationTimeout) throws NoResourceAvailableException {
final SlotRequestId coLocationSlotRequestId = coLocationConstraint.getSlotRequestId();
if (coLocationSlotRequestId != null) {
// we have a slot assigned --> try to retrieve it
// 获取SlotSharingManager中slot request id对应的taskSlot
final SlotSharingManager.TaskSlot taskSlot = multiTaskSlotManager.getTaskSlot(coLocationSlotRequestId);
if (taskSlot != null) {
// 检查这个slot必须是MultiTaskSlot
Preconditions.checkState(taskSlot instanceof SlotSharingManager.MultiTaskSlot);
SlotSharingManager.MultiTaskSlot multiTaskSlot = (SlotSharingManager.MultiTaskSlot) taskSlot;
// 如果这个MultiTaskSlot持有的资源满足slotProfile的要求,返回这个slot,模式为在同一TM运行
if (multiTaskSlot.mayHaveEnoughResourcesToFulfill(slotProfile.getTaskResourceProfile())) {
return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, Locality.LOCAL);
}
// 否则抛出异常,资源不足
throw new NoResourceAvailableException("Not enough resources in the slot for all co-located tasks.");
} else {
// the slot may have been cancelled in the mean time
// 执行这个方法的时候可能slot被取消,因此增加这个逻辑
coLocationConstraint.setSlotRequestId(null);
}
}
// 如果这个constraint的运行位置已经指定
if (coLocationConstraint.isAssigned()) {
// refine the preferred locations of the slot profile
// 更新slot profile,加入首选的运行位置(TaskManager位置)
slotProfile = SlotProfile.priorAllocation(
slotProfile.getTaskResourceProfile(),
slotProfile.getPhysicalSlotResourceProfile(),
Collections.singleton(coLocationConstraint.getLocation()),
slotProfile.getPreferredAllocations(),
slotProfile.getPreviousExecutionGraphAllocations());
}
// get a new multi task slot
// 前面逻辑已经判断过,如果之前没有申请过slot,在这里分配一个
SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = allocateMultiTaskSlot(
coLocationConstraint.getGroupId(),
multiTaskSlotManager,
slotProfile,
allocationTimeout);
// check whether we fulfill the co-location constraint
// 检查constraint状态和能否在同一个TM运行
if (coLocationConstraint.isAssigned() && multiTaskSlotLocality.getLocality() != Locality.LOCAL) {
// 如果不能,不符合限制要求,释放掉这个slot
multiTaskSlotLocality.getMultiTaskSlot().release(
new FlinkException("Multi task slot is not local and, thus, does not fulfill the co-location constraint."));
// 抛出资源不足异常
throw new NoResourceAvailableException("Could not allocate a local multi task slot for the " +
"co location constraint " + coLocationConstraint + '.');
}
// 为这个MultiTaskSlot分配一个子MultiTaskSlot
final SlotRequestId slotRequestId = new SlotRequestId();
final SlotSharingManager.MultiTaskSlot coLocationSlot =
multiTaskSlotLocality.getMultiTaskSlot().allocateMultiTaskSlot(
slotRequestId,
coLocationConstraint.getGroupId());
// mark the requested slot as co-located slot for other co-located tasks
// 将coLocationConstraint和slot request关联起来,表示这个slot是具有运行位置限制的slot
coLocationConstraint.setSlotRequestId(slotRequestId);
// lock the co-location constraint once we have obtained the allocated slot
// slot分配完毕之后执行
coLocationSlot.getSlotContextFuture().whenComplete(
(SlotContext slotContext, Throwable throwable) -> {
if (throwable == null) {
// check whether we are still assigned to the co-location constraint
// 如果没有异常,绑定coLocationConstraint的位置限制
if (Objects.equals(coLocationConstraint.getSlotRequestId(), slotRequestId)) {
coLocationConstraint.lockLocation(slotContext.getTaskManagerLocation());
} else {
log.debug("Failed to lock colocation constraint {} because assigned slot " +
"request {} differs from fulfilled slot request {}.",
coLocationConstraint.getGroupId(),
coLocationConstraint.getSlotRequestId(),
slotRequestId);
}
} else {
log.debug("Failed to lock colocation constraint {} because the slot " +
"allocation for slot request {} failed.",
coLocationConstraint.getGroupId(),
coLocationConstraint.getSlotRequestId(),
throwable);
}
});
return SlotSharingManager.MultiTaskSlotLocality.of(coLocationSlot, multiTaskSlotLocality.getLocality());
}
接下来我们分析下MultiTaskSlot
是怎么分配出来的。
allocateMultiTaskSlot
private SlotSharingManager.MultiTaskSlotLocality allocateMultiTaskSlot(
AbstractID groupId,
SlotSharingManager slotSharingManager,
SlotProfile slotProfile,
@Nullable Time allocationTimeout) {
// 返回所有根slot的信息
Collection<SlotSelectionStrategy.SlotInfoAndResources> resolvedRootSlotsInfo =
slotSharingManager.listResolvedRootSlotInfo(groupId);
// 根据slot选择策略,从SlotSharingManager中选择出一个最适合的根slot
SlotSelectionStrategy.SlotInfoAndLocality bestResolvedRootSlotWithLocality =
slotSelectionStrategy.selectBestSlotForProfile(resolvedRootSlotsInfo, slotProfile).orElse(null);
// 将这个选择出的slot包装为MultiTaskSlotLocality
final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = bestResolvedRootSlotWithLocality != null ?
new SlotSharingManager.MultiTaskSlotLocality(
slotSharingManager.getResolvedRootSlot(bestResolvedRootSlotWithLocality.getSlotInfo()),
bestResolvedRootSlotWithLocality.getLocality()) :
null;
// 如果这个slot资源充足,可以LOCAL模式运行,返回这个multiTaskSlotLocality
if (multiTaskSlotLocality != null && multiTaskSlotLocality.getLocality() == Locality.LOCAL) {
return multiTaskSlotLocality;
}
final SlotRequestId allocatedSlotRequestId = new SlotRequestId();
final SlotRequestId multiTaskSlotRequestId = new SlotRequestId();
// 尝试从SlotPool中查找一个最合适的slot
Optional<SlotAndLocality> optionalPoolSlotAndLocality = tryAllocateFromAvailable(allocatedSlotRequestId, slotProfile);
// 如果找到了
if (optionalPoolSlotAndLocality.isPresent()) {
SlotAndLocality poolSlotAndLocality = optionalPoolSlotAndLocality.get();
// 校验下如果这个slot资源充足,并且在SlotSharingManager中没有找到最合适slot
if (poolSlotAndLocality.getLocality() == Locality.LOCAL || bestResolvedRootSlotWithLocality == null) {
final PhysicalSlot allocatedSlot = poolSlotAndLocality.getSlot();
// 在SlotSharingManager中创建这个slot对应的MultiTaskSlot
final SlotSharingManager.MultiTaskSlot multiTaskSlot = slotSharingManager.createRootSlot(
multiTaskSlotRequestId,
CompletableFuture.completedFuture(poolSlotAndLocality.getSlot()),
allocatedSlotRequestId);
// 将multiTaskSlot加入到allocatedSlot的负载中
if (allocatedSlot.tryAssignPayload(multiTaskSlot)) {
return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, poolSlotAndLocality.getLocality());
} else {
multiTaskSlot.release(new FlinkException("Could not assign payload to allocated slot " +
allocatedSlot.getAllocationId() + '.'));
}
}
}
if (multiTaskSlotLocality != null) {
// prefer slot sharing group slots over unused slots
// 如果在SlotSharingManager和SlotPool都找到了匹配的slot,优先使用SlotSharingManager中的
// 将SlotPool中的匹配slot释放掉
if (optionalPoolSlotAndLocality.isPresent()) {
slotPool.releaseSlot(
allocatedSlotRequestId,
new FlinkException("Locality constraint is not better fulfilled by allocated slot."));
}
return multiTaskSlotLocality;
}
// there is no slot immediately available --> check first for uncompleted slots at the slot sharing group
// 到这里说明目前没有可用的slot,从unresolvedRootSlots中获取一个尚未分配的slot
SlotSharingManager.MultiTaskSlot multiTaskSlot = slotSharingManager.getUnresolvedRootSlot(groupId);
// 如果没有找到
if (multiTaskSlot == null) {
// it seems as if we have to request a new slot from the resource manager, this is always the last resort!!!
// 到这里意味着我们必须去ResourceManager申请一个新的slot
final CompletableFuture<PhysicalSlot> slotAllocationFuture = requestNewAllocatedSlot(
allocatedSlotRequestId,
slotProfile,
allocationTimeout);
// 在SlotSharingManager中创建一个root slot
multiTaskSlot = slotSharingManager.createRootSlot(
multiTaskSlotRequestId,
slotAllocationFuture,
allocatedSlotRequestId);
// 设定分配成功之后的逻辑
slotAllocationFuture.whenComplete(
(PhysicalSlot allocatedSlot, Throwable throwable) -> {
final SlotSharingManager.TaskSlot taskSlot = slotSharingManager.getTaskSlot(multiTaskSlotRequestId);
if (taskSlot != null) {
// still valid
// 遇到异常的时候,释放掉slot
if (!(taskSlot instanceof SlotSharingManager.MultiTaskSlot) || throwable != null) {
taskSlot.release(throwable);
} else {
if (!allocatedSlot.tryAssignPayload(((SlotSharingManager.MultiTaskSlot) taskSlot))) {
taskSlot.release(new FlinkException("Could not assign payload to allocated slot " +
allocatedSlot.getAllocationId() + '.'));
}
}
} else {
slotPool.releaseSlot(
allocatedSlotRequestId,
new FlinkException("Could not find task slot with " + multiTaskSlotRequestId + '.'));
}
});
}
return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, Locality.UNKNOWN);
}
这里我们顺便深入下SlotSelectionStrategy
。它是一系列挑选最匹配slot的子类共同的接口。它的子类如下:
- DefaultLocationPreferenceSlotSelectionStrategy:从待选slot集合中找到第一个返回。
- EvenlySpreadOutLocationPreferenceSlotSelectionStrategy:找到所有匹配slot中所在TM资源使用率最低的返回。
- PreviousAllocationSlotSelectionStrategy:先使用SlotProfile中指定的首选运行位置,如果没有,再使用其他Slot选择策略。
下面我们开始分析SchedulerImpl
的requestNewAllocatedSlot
,即请求分配新的slot。
@Nonnull
private CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
SlotRequestId slotRequestId,
SlotProfile slotProfile,
@Nullable Time allocationTimeout) {
if (allocationTimeout == null) {
return slotPool.requestNewAllocatedBatchSlot(slotRequestId, slotProfile.getPhysicalSlotResourceProfile());
} else {
return slotPool.requestNewAllocatedSlot(slotRequestId, slotProfile.getPhysicalSlotResourceProfile(), allocationTimeout);
}
}
这个方法根据是否指定了分配超时时间来调用SlotPool的对应方法。requestNewAllocatedBatchSlot
和requestNewAllocatedSlot
逻辑基本相同,只是后者增加了超时检测逻辑。我们选择最为复杂的requestNewAllocatedSlot
方法分析。
SlotPoolImpl
的requestNewAllocatedSlot
方法如下所示:
@Nonnull
@Override
public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
@Nonnull SlotRequestId slotRequestId,
@Nonnull ResourceProfile resourceProfile,
@Nullable Time timeout) {
// 检查方法在主线程执行
componentMainThreadExecutor.assertRunningInMainThread();
// 创建一个Slot请求对象
// SlotPool先将Slot请求缓存起来,当TaskManager获取slot的时候才会真正创建
final PendingRequest pendingRequest = PendingRequest.createStreamingRequest(slotRequestId, resourceProfile);
// 如果传入了超时时间,注册超时处理
if (timeout != null) {
// register request timeout
FutureUtils
.orTimeout(
pendingRequest.getAllocatedSlotFuture(),
timeout.toMilliseconds(),
TimeUnit.MILLISECONDS,
componentMainThreadExecutor)
.whenComplete(
(AllocatedSlot ignored, Throwable throwable) -> {
if (throwable instanceof TimeoutException) {
timeoutPendingSlotRequest(slotRequestId);
}
});
}
// 调用requestNewAllocatedSlotInternal请求新slot
return requestNewAllocatedSlotInternal(pendingRequest)
.thenApply((Function.identity()));
}
SlotPoolImpl
的requestNewAllocatedSlotInternal
方法如下所示。这个方法SlotPool
请求ResourceManager
来分配一个新的slot。
@Nonnull
private CompletableFuture<AllocatedSlot> requestNewAllocatedSlotInternal(PendingRequest pendingRequest) {
if (resourceManagerGateway == null) {
// 如果没有ResourceManager网关,先将请求入栈,放入到waitingForResourceManager中
// 这个LinkedHashMap保存了slot request id和slot request的对应关系
stashRequestWaitingForResourceManager(pendingRequest);
} else {
// 从ResourceManager请求slot
requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
}
return pendingRequest.getAllocatedSlotFuture();
}
我们重点分析下SlotPoolImpl
的requestSlotFromResourceManager
方法:
private void requestSlotFromResourceManager(
final ResourceManagerGateway resourceManagerGateway,
final PendingRequest pendingRequest) {
checkNotNull(resourceManagerGateway);
checkNotNull(pendingRequest);
// 创建一个allocationID,lowerPart和upperPart使用随机long
final AllocationID allocationId = new AllocationID();
// 为pendingRequest指定allocationID
pendingRequest.setAllocationId(allocationId);
// 放入pendingRequests集合
// 这是一个复合key map,分别使用slot request id和allocation id作为key
pendingRequests.put(pendingRequest.getSlotRequestId(), allocationId, pendingRequest);
// 指定slot分配完成时的操作
pendingRequest.getAllocatedSlotFuture().whenComplete(
(AllocatedSlot allocatedSlot, Throwable throwable) -> {
if (throwable != null) {
// the allocation id can be remapped so we need to get it from the pendingRequest
// where it will be updated timely
// 重新获取allocationID,因为这个id可能会在申请slot过程中改变
final Optional<AllocationID> updatedAllocationId = pendingRequest.getAllocationId();
// 处理出错逻辑,取消申请slot
if (updatedAllocationId.isPresent()) {
// cancel the slot request if there is a failure
resourceManagerGateway.cancelSlotRequest(updatedAllocationId.get());
}
}
});
log.info("Requesting new slot [{}] and profile {} with allocation id {} from resource manager.",
pendingRequest.getSlotRequestId(), pendingRequest.getResourceProfile(), allocationId);
// 向ResourceManager发送一个SlotRequest,请求slot
CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(
jobMasterId,
new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress),
rpcTimeout);
// slot请求完毕后执行
FutureUtils.whenCompleteAsyncIfNotDone(
rmResponse,
componentMainThreadExecutor,
(Acknowledge ignored, Throwable failure) -> {
// on failure, fail the request future
if (failure != null) {
// 如果失败,调用失败处理逻辑,调用future的completeExceptionally方法
slotRequestToResourceManagerFailed(pendingRequest.getSlotRequestId(), failure);
}
});
}
ResourceManager
请求slot的逻辑如下:
@Override
public CompletableFuture<Acknowledge> requestSlot(
JobMasterId jobMasterId,
SlotRequest slotRequest,
final Time timeout) {
JobID jobId = slotRequest.getJobId();
// 获取作业ID对应的JobManager
JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
if (null != jobManagerRegistration) {
if (Objects.equals(jobMasterId, jobManagerRegistration.getJobMasterId())) {
log.info("Request slot with profile {} for job {} with allocation id {}.",
slotRequest.getResourceProfile(),
slotRequest.getJobId(),
slotRequest.getAllocationId());
try {
// 注册slot申请给SlotManager
slotManager.registerSlotRequest(slotRequest);
} catch (ResourceManagerException e) {
return FutureUtils.completedExceptionally(e);
}
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
return FutureUtils.completedExceptionally(new ResourceManagerException("The job leader's id " +
jobManagerRegistration.getJobMasterId() + " does not match the received id " + jobMasterId + '.'));
}
} else {
return FutureUtils.completedExceptionally(new ResourceManagerException("Could not find registered job manager for job " + jobId + '.'));
}
}
分析到这里,我们得知ResourceManager
最终将SlotRequest
交给了内部的SlotManager
来处理。
@Override
public boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException {
// 检查状态是否为started
checkInit();
// 检查已满足的slot请求和积压的slot请求中有没有allocation id和下面方法参数相同的
// 如果重复,返回false
if (checkDuplicateRequest(slotRequest.getAllocationId())) {
LOG.debug("Ignoring a duplicate slot request with allocation id {}.", slotRequest.getAllocationId());
return false;
} else {
PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);
// 包装slot并存入pendingSlotRequests集合
pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);
try {
// 调用内部请求slot的方法,下面分析
internalRequestSlot(pendingSlotRequest);
} catch (ResourceManagerException e) {
// requesting the slot failed --> remove pending slot request
pendingSlotRequests.remove(slotRequest.getAllocationId());
throw new ResourceManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);
}
return true;
}
}
我们继续跟踪internalRequestSlot
方法。
internalRequestSlot
private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
// 获取slotRequest的资源要求
final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
OptionalConsumer.of(findMatchingSlot(resourceProfile))
.ifPresent(taskManagerSlot -> allocateSlot(taskManagerSlot, pendingSlotRequest))
.ifNotPresent(() -> fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest));
}
这里的findMatchingSlot
方法通过slotMatchingStrategy
在freeSlots
集合中查找出资源需求匹配的slot,确保匹配的slot状态为SlotState.FREE
,将其从freeSlots
集合中剔除后返回。
Flink把查找匹配slot的逻辑封装为slotMatchingStrategy
,它有两个子类:
-
AnyMatchingSlotMatchingStrategy
:找到第一个匹配的slot,只要发现的slot持有的资源大于资源需求就返回。 -
LeastUtilizationSlotMatchingStrategy
:在前一个策略的基础上,还会计算每个TaskExecutor
的slot利用率,将利用率最低的TaskExecutor
上的slot返回。
通过findMatchingSlot
方法,如果找到了匹配的slot,调用allocateSlot
方法,通知TaskExecutor
分配slot。如果没有匹配到,调用fulfillPendingSlotRequestWithPendingTaskManagerSlot
。
fulfillPendingSlotRequestWithPendingTaskManagerSlot
该方法将根据所需资源(pendingSlotRequest.getResourceProfile()
),创建出PendingTaskManagerSlot
放入到pendingSlot
中保存。这些处于pending状态的slot在registerTaskManager
的时候会被注册(registerSlot
)。在这个时候,pending slot才会被真正的分配出来,在对应的TaskExecutor中创建。
private void fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
// 获取PendingSlotRequest的资源要求
ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
// 从PendingTaskManagerSlot中找到一个符合资源需求的slot
Optional<PendingTaskManagerSlot> pendingTaskManagerSlotOptional = findFreeMatchingPendingTaskManagerSlot(resourceProfile);
// 如果没有找到资源需求匹配的pending slot
// 分配resourceProfile指定的资源,创建一个pending slot
if (!pendingTaskManagerSlotOptional.isPresent()) {
pendingTaskManagerSlotOptional = allocateResource(resourceProfile);
}
// 如果创建成功,执行assignPendingTaskManagerSlot方法
// 此方法将pendingSlotRequest和pendingTaskManagerSlot关联起来
OptionalConsumer.of(pendingTaskManagerSlotOptional)
.ifPresent(pendingTaskManagerSlot -> assignPendingTaskManagerSlot(pendingSlotRequest, pendingTaskManagerSlot))
.ifNotPresent(() -> {
// request can not be fulfilled by any free slot or pending slot that can be allocated,
// check whether it can be fulfilled by allocated slots
if (failUnfulfillableRequest && !isFulfillableByRegisteredOrPendingSlots(pendingSlotRequest.getResourceProfile())) {
throw new UnfulfillableSlotRequestException(pendingSlotRequest.getAllocationId(), pendingSlotRequest.getResourceProfile());
}
});
}
allocateSlot
现在我们分析下internalRequestSlot
逻辑的里一个分支allocateSlot
方法调用。
SlotManagerImpl
的allocateSlot
方法内容如下:
private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pendingSlotRequest) {
Preconditions.checkState(taskManagerSlot.getState() == SlotState.FREE);
// 从slot中获取和TaskManager的连接信息
TaskExecutorConnection taskExecutorConnection = taskManagerSlot.getTaskManagerConnection();
// 获取RPC调用端
TaskExecutorGateway gateway = taskExecutorConnection.getTaskExecutorGateway();
final CompletableFuture<Acknowledge> completableFuture = new CompletableFuture<>();
final AllocationID allocationId = pendingSlotRequest.getAllocationId();
final SlotID slotId = taskManagerSlot.getSlotId();
final InstanceID instanceID = taskManagerSlot.getInstanceId();
// 为slot指定PendingSlotRequest
taskManagerSlot.assignPendingSlotRequest(pendingSlotRequest);
pendingSlotRequest.setRequestFuture(completableFuture);
// 如果这个PendingSlotRequest已经分配slot,需要先归还
returnPendingTaskManagerSlotIfAssigned(pendingSlotRequest);
// 获取已注册的TaskManager
TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceID);
if (taskManagerRegistration == null) {
throw new IllegalStateException("Could not find a registered task manager for instance id " +
instanceID + '.');
}
// 标记这个taskManager状态为使用中
taskManagerRegistration.markUsed();
// RPC call to the task manager
// 远程调用TaskExecutor的requestSlot方法
CompletableFuture<Acknowledge> requestFuture = gateway.requestSlot(
slotId,
pendingSlotRequest.getJobId(),
allocationId,
pendingSlotRequest.getResourceProfile(),
pendingSlotRequest.getTargetAddress(),
resourceManagerId,
taskManagerRequestTimeout);
// RPC调用完成后,执行completableFuture
requestFuture.whenComplete(
(Acknowledge acknowledge, Throwable throwable) -> {
if (acknowledge != null) {
completableFuture.complete(acknowledge);
} else {
completableFuture.completeExceptionally(throwable);
}
});
completableFuture.whenCompleteAsync(
(Acknowledge acknowledge, Throwable throwable) -> {
try {
if (acknowledge != null) {
// 如果分配成功,更新slot信息
updateSlot(slotId, allocationId, pendingSlotRequest.getJobId());
} else {
if (throwable instanceof SlotOccupiedException) {
SlotOccupiedException exception = (SlotOccupiedException) throwable;
// 如果slot被占用,更新slot信息
updateSlot(slotId, exception.getAllocationId(), exception.getJobId());
} else {
// 否则,移除SlotRequest
removeSlotRequestFromSlot(slotId, allocationId);
}
if (!(throwable instanceof CancellationException)) {
// 如果slot分配操作取消,调用处理失败slot请求逻辑
handleFailedSlotRequest(slotId, allocationId, throwable);
} else {
LOG.debug("Slot allocation request {} has been cancelled.", allocationId, throwable);
}
}
} catch (Exception e) {
LOG.error("Error while completing the slot allocation.", e);
}
},
mainThreadExecutor);
}
上面ResourceManager
通过RPC调用了TaskExecutor
的requestSlot
方法。
requestSlot
TaskExecutor.requestSlot
内容如下:
@Override
public CompletableFuture<Acknowledge> requestSlot(
final SlotID slotId,
final JobID jobId,
final AllocationID allocationId,
final ResourceProfile resourceProfile,
final String targetAddress,
final ResourceManagerId resourceManagerId,
final Time timeout) {
// TODO: Filter invalid requests from the resource manager by using the instance/registration Id
log.info("Receive slot request {} for job {} from resource manager with leader id {}.",
allocationId, jobId, resourceManagerId);
// 检测是否连接到了ResourceManager
if (!isConnectedToResourceManager(resourceManagerId)) {
final String message = String.format("TaskManager is not connected to the resource manager %s.", resourceManagerId);
log.debug(message);
return FutureUtils.completedExceptionally(new TaskManagerException(message));
}
try {
// 执行TaskExecutor的allocateSlot方法
allocateSlot(
slotId,
jobId,
allocationId,
resourceProfile);
} catch (SlotAllocationException sae) {
return FutureUtils.completedExceptionally(sae);
}
final JobTable.Job job;
try {
// 创建一个作业
job = jobTable.getOrCreateJob(jobId, () -> registerNewJobAndCreateServices(jobId, targetAddress));
} catch (Exception e) {
// free the allocated slot
try {
taskSlotTable.freeSlot(allocationId);
} catch (SlotNotFoundException slotNotFoundException) {
// slot no longer existent, this should actually never happen, because we've
// just allocated the slot. So let's fail hard in this case!
onFatalError(slotNotFoundException);
}
// release local state under the allocation id.
localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
// sanity check
if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
onFatalError(new Exception("Could not free slot " + slotId));
}
return FutureUtils.completedExceptionally(new SlotAllocationException("Could not create new job.", e));
}
if (job.isConnected()) {
offerSlotsToJobManager(jobId);
}
return CompletableFuture.completedFuture(Acknowledge.get());
}
最后我们跟踪到了TaskExecutor
的allocateSlot
方法。这个方法内容较少,不再贴出相关代码。该方法最终调用TaskSlotTable
的allocateSlot
方法。
TaskExecutor 的 allocateSlot 方法
private void allocateSlot(
SlotID slotId,
JobID jobId,
AllocationID allocationId,
ResourceProfile resourceProfile) throws SlotAllocationException {
// 如果slot处于空闲状态
if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
// taskSlotTable分配slot
if (taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, resourceProfile, taskManagerConfiguration.getTimeout())) {
log.info("Allocated slot for {}.", allocationId);
} else {
log.info("Could not allocate slot for {}.", allocationId);
throw new SlotAllocationException("Could not allocate slot.");
}
} else if (!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) {
// 进入这个分支表明slot被分配给了其他的job
final String message = "The slot " + slotId + " has already been allocated for a different job.";
log.info(message);
final AllocationID allocationID = taskSlotTable.getCurrentAllocation(slotId.getSlotNumber());
throw new SlotOccupiedException(message, allocationID, taskSlotTable.getOwningJob(allocationID));
}
}
到这里为止,我们完成了从Execution vertex到最终TaskManager创建出TaskSlot的过程。
TaskManager 启动时分配slot逻辑
TaskExecutor
的startTaskExecutorServices
方法。该方法启动了ResourceManager资源管理器Leader信息的获取服务,并注册了一个监听器,实时监听ResourceManager leader状态的变化。然后启动TaskSlotTable
,Job leader服务和文件缓存。
startTaskExecutorServices
private void startTaskExecutorServices() throws Exception {
try {
// start by connecting to the ResourceManager
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
// tell the task slot table who's responsible for the task slot actions
taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());
// start the job leader service
jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());
} catch (Exception e) {
handleStartTaskExecutorServicesException(e);
}
}
当ResourceManager leader服务选举成功之时通知ResourceManagerLeaderListener
,调用它的notifyLeaderAddress
方法。
notifyLeaderAddress
@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
runAsync(
() -> notifyOfNewResourceManagerLeader(
leaderAddress,
ResourceManagerId.fromUuidOrNull(leaderSessionID)));
}
这里异步调用了notifyOfNewResourceManagerLeader
方法。我们跟踪一下。
notifyOfNewResourceManagerLeader
private void notifyOfNewResourceManagerLeader(String newLeaderAddress, ResourceManagerId newResourceManagerId) {
resourceManagerAddress = createResourceManagerAddress(newLeaderAddress, newResourceManagerId);
reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", resourceManagerAddress)));
}
该方法首先保存了选举后确定的ResourceManager leader地址,然后建立和ResourceManager的连接。
我们跟踪下建立连接的方法。
reconnectToResourceManager
private void reconnectToResourceManager(Exception cause) {
closeResourceManagerConnection(cause);
startRegistrationTimeout();
tryConnectToResourceManager();
}
为了逻辑统一,这里实际使用的是重新连接的逻辑。首先关闭和ResourceManager的连接,然后创建超时检测任务(超时时间从配置文件中读取),最后尝试和ResourceManager建立连接。
继续跟踪tryConnectToResourceManager
方法。
tryConnectToResourceManager
private void tryConnectToResourceManager() {
if (resourceManagerAddress != null) {
connectToResourceManager();
}
}
private void connectToResourceManager() {
assert(resourceManagerAddress != null);
assert(establishedResourceManagerConnection == null);
assert(resourceManagerConnection == null);
log.info("Connecting to ResourceManager {}.", resourceManagerAddress);
// 创建一个TaskExecutor注册对象
// 包含TaskExecutor的地址端口,资源配置硬件信息等
final TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(
getAddress(),
getResourceID(),
unresolvedTaskManagerLocation.getDataPort(),
JMXService.getPort().orElse(-1),
hardwareDescription,
memoryConfiguration,
taskManagerConfiguration.getDefaultSlotResourceProfile(),
taskManagerConfiguration.getTotalResourceProfile()
);
// 建立和ResourceManager的连接,并启动
resourceManagerConnection =
new TaskExecutorToResourceManagerConnection(
log,
getRpcService(),
taskManagerConfiguration.getRetryingRegistrationConfiguration(),
resourceManagerAddress.getAddress(),
resourceManagerAddress.getResourceManagerId(),
getMainThreadExecutor(),
new ResourceManagerRegistrationListener(),
taskExecutorRegistration);
resourceManagerConnection.start();
}
TaskExecutor
向ResourceManager
注册并开启连接。注意这里创建了一个连接状态监听器。注册并连接成功后,调用ResourceManagerRegistrationListener
的onRegistrationSuccess
方法。
onRegistrationSuccess
@Override
public void onRegistrationSuccess(TaskExecutorToResourceManagerConnection connection, TaskExecutorRegistrationSuccess success) {
final ResourceID resourceManagerId = success.getResourceManagerId();
final InstanceID taskExecutorRegistrationId = success.getRegistrationId();
final ClusterInformation clusterInformation = success.getClusterInformation();
final ResourceManagerGateway resourceManagerGateway = connection.getTargetGateway();
runAsync(
() -> {
// filter out outdated connections
//noinspection ObjectEquality
if (resourceManagerConnection == connection) {
try {
establishResourceManagerConnection(
resourceManagerGateway,
resourceManagerId,
taskExecutorRegistrationId,
clusterInformation);
} catch (Throwable t) {
log.error("Establishing Resource Manager connection in Task Executor failed", t);
}
}
});
}
回调函数异步调用TaskExecutor
的establishResourceManagerConnection
,执行建立连接后的逻辑。
establishResourceManagerConnection
private void establishResourceManagerConnection(
ResourceManagerGateway resourceManagerGateway,
ResourceID resourceManagerResourceId,
InstanceID taskExecutorRegistrationId,
ClusterInformation clusterInformation) {
// 向ResourceManager异步发送slot报告
final CompletableFuture<Acknowledge> slotReportResponseFuture = resourceManagerGateway.sendSlotReport(
getResourceID(),
taskExecutorRegistrationId,
// 通过TaskSlotTable创建slot报告
taskSlotTable.createSlotReport(getResourceID()),
taskManagerConfiguration.getTimeout());
slotReportResponseFuture.whenCompleteAsync(
(acknowledge, throwable) -> {
if (throwable != null) {
// 如果遇到异常,再次尝试重新连接ResourceManager
reconnectToResourceManager(new TaskManagerException("Failed to send initial slot report to ResourceManager.", throwable));
}
}, getMainThreadExecutor());
// monitor the resource manager as heartbeat target
// 监测和ResourceManager之间的心跳状态
resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<TaskExecutorHeartbeatPayload>() {
@Override
public void receiveHeartbeat(ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {
// 收到心跳后向ResourceManager回送心跳信息
resourceManagerGateway.heartbeatFromTaskManager(resourceID, heartbeatPayload);
}
@Override
public void requestHeartbeat(ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {
// the TaskManager won't send heartbeat requests to the ResourceManager
}
});
// set the propagated blob server address
// 设置blob server地址信息
final InetSocketAddress blobServerAddress = new InetSocketAddress(
clusterInformation.getBlobServerHostname(),
clusterInformation.getBlobServerPort());
// 设置blobCache
blobCacheService.setBlobServerAddress(blobServerAddress);
// 保存已创建的ResourceManager连接信息
establishedResourceManagerConnection = new EstablishedResourceManagerConnection(
resourceManagerGateway,
resourceManagerResourceId,
taskExecutorRegistrationId);
// 停止连接注册过程超时计时器
stopRegistrationTimeout();
}
发送SlotReport的方法在ResourceManager
的sendSlotReport
。
sendSlotReport
@Override
public CompletableFuture<Acknowledge> sendSlotReport(ResourceID taskManagerResourceId, InstanceID taskManagerRegistrationId, SlotReport slotReport, Time timeout) {
final WorkerRegistration<WorkerType> workerTypeWorkerRegistration = taskExecutors.get(taskManagerResourceId);
if (workerTypeWorkerRegistration.getInstanceID().equals(taskManagerRegistrationId)) {
// 通过SlotManager注册TaskManager
if (slotManager.registerTaskManager(workerTypeWorkerRegistration, slotReport)) {
onWorkerRegistered(workerTypeWorkerRegistration.getWorker());
}
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
return FutureUtils.completedExceptionally(new ResourceManagerException(String.format("Unknown TaskManager registration id %s.", taskManagerRegistrationId)));
}
}
这个方法向SlotManager
注册了TaskManager
。我们继续跟踪。
registerTaskManager
SlotManagerImpl.registerTaskManager
方法内容如下:
@Override
public boolean registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
// 检查slotManager状态确保已经启动
checkInit();
LOG.debug("Registering TaskManager {} under {} at the SlotManager.", taskExecutorConnection.getResourceID().getStringWithMetadata(), taskExecutorConnection.getInstanceID());
// we identify task managers by their instance id
// 如果包含TaskExecutor的instance id,说明这个task executor已经注册过
// 更新slots中保存的slot信息,返回false
if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
return false;
} else {
// 检查分配slot后slot个数是否超过上限
// slot最大个数通过slotmanager.number-of-slots.max配置
if (isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
LOG.info("The total number of slots exceeds the max limitation {}, release the excess resource.", maxSlotNum);
// 将资源释放
resourceActions.releaseResource(taskExecutorConnection.getInstanceID(), new FlinkException("The total number of slots exceeds the max limitation."));
return false;
}
// first register the TaskManager
ArrayList<SlotID> reportedSlots = new ArrayList<>();
// 将slot report中的各个slot id写入reportedSlots
for (SlotStatus slotStatus : initialSlotReport) {
reportedSlots.add(slotStatus.getSlotID());
}
// 生成并保存TaskManager的注册信息
TaskManagerRegistration taskManagerRegistration = new TaskManagerRegistration(
taskExecutorConnection,
reportedSlots);
taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskManagerRegistration);
// next register the new slots
// 逐个注册slot
for (SlotStatus slotStatus : initialSlotReport) {
registerSlot(
slotStatus.getSlotID(),
slotStatus.getAllocationID(),
slotStatus.getJobID(),
slotStatus.getResourceProfile(),
taskExecutorConnection);
}
return true;
}
}
这个方法返回一个布尔值。如果TaskManager之前没有注册过,并且注册成功,返回true。否则返回false。
接下来我们重点分析下SlotManagerImpl.registerSlot
方法。SlotManager
通过这个方法为TaskManager
注册slot。
registerSlot
private void registerSlot(
SlotID slotId,
AllocationID allocationId,
JobID jobId,
ResourceProfile resourceProfile,
TaskExecutorConnection taskManagerConnection) {
// 如果要注册的slot id和已存在的某个slot相同,需要先移除这个已存在的slot
if (slots.containsKey(slotId)) {
// remove the old slot first
removeSlot(
slotId,
new SlotManagerException(
String.format(
"Re-registration of slot %s. This indicates that the TaskExecutor has re-connected.",
slotId)));
}
// 创建一个新的TaskManagerSlot中,并注册(保存到slots集合)
final TaskManagerSlot slot = createAndRegisterTaskManagerSlot(slotId, resourceProfile, taskManagerConnection);
final PendingTaskManagerSlot pendingTaskManagerSlot;
if (allocationId == null) {
// 如果没有allocationId,找到一个资源要求匹配的pending slot
pendingTaskManagerSlot = findExactlyMatchingPendingTaskManagerSlot(resourceProfile);
} else {
pendingTaskManagerSlot = null;
}
// 如果没有找到资源要求匹配的slot,更新slot信息
if (pendingTaskManagerSlot == null) {
updateSlot(slotId, allocationId, jobId);
} else {
// 从pendingSlots中移除
pendingSlots.remove(pendingTaskManagerSlot.getTaskManagerSlotId());
// 取出pending slot申请请求
final PendingSlotRequest assignedPendingSlotRequest = pendingTaskManagerSlot.getAssignedPendingSlotRequest();
if (assignedPendingSlotRequest == null) {
// 当前slot无人请求,放入空闲slot集合中
handleFreeSlot(slot);
} else {
// 开始分配slot
assignedPendingSlotRequest.unassignPendingTaskManagerSlot();
allocateSlot(slot, assignedPendingSlotRequest);
}
}
}
此时,ResourceManager开始真正的分配slot流程。分配slot过程位于SlotManagerImpl.allocateSlot
方法。后面的过程和Slot申请流程相同,不再赘述。
本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。