Presto技术总结 因为内容过长分为了上下两集
2.4.5stage调度器开始调度
stage调度器主要包括以下三种
(1)Source task
- SourcePartitionedScheduler
(2)Fixed task
- FixedCountScheduler
- FixedSourcePartitionedScheduler
分配策略主要包括下面两种
(1) DynamicSplitPlacementPolicy
(2) FixedSplitPlacementPolicy
在query调度器中,调用stage的调度器
调用代码为stageSchedulers.get(stage.getStageId()).schedule();
第一种为:SourcePartitionedScheduler
public synchronized ScheduleResult schedule()
{
int overallSplitAssignmentCount = 0;
ImmutableSet.Builder<RemoteTask> overallNewTasks = ImmutableSet.builder();
List<ListenableFuture<?>> overallBlockedFutures = new ArrayList<>();
boolean anyBlockedOnPlacements = false;
boolean anyBlockedOnNextSplitBatch = false;
boolean anyNotBlocked = false;
for (Entry<Lifespan, ScheduleGroup> entry : scheduleGroups.entrySet()) {
Lifespan lifespan = entry.getKey();
ScheduleGroup scheduleGroup = entry.getValue();
Set<Split> pendingSplits = scheduleGroup.pendingSplits;
if (scheduleGroup.state != ScheduleGroupState.DISCOVERING_SPLITS) {
verify(scheduleGroup.nextSplitBatchFuture == null);
}
else if (pendingSplits.isEmpty()) {
// try to get the next batch 如果没有等待中的split,则开始获取下一批的split
if (scheduleGroup.nextSplitBatchFuture == null) {
scheduleGroup.nextSplitBatchFuture = splitSource.getNextBatch(scheduleGroup.partitionHandle, lifespan, splitBatchSize - pendingSplits.size());
long start = System.nanoTime();
Futures.addCallback(scheduleGroup.nextSplitBatchFuture, new FutureCallback<SplitBatch>()
{
@Override
public void onSuccess(SplitBatch result)
{
stage.recordGetSplitTime(start);
}
@Override
public void onFailure(Throwable t)
{
}
});
}
if (scheduleGroup.nextSplitBatchFuture.isDone()) {
SplitBatch nextSplits = getFutureValue(scheduleGroup.nextSplitBatchFuture);
scheduleGroup.nextSplitBatchFuture = null;
pendingSplits.addAll(nextSplits.getSplits());
if (nextSplits.isLastBatch() && scheduleGroup.state == ScheduleGroupState.DISCOVERING_SPLITS) {
//如果是最后一个batch,调度组的状态是正在发现split的话,则将调度组的状态更新为没有更多的splits
scheduleGroup.state = ScheduleGroupState.NO_MORE_SPLITS;
}
}
else {
overallBlockedFutures.add(scheduleGroup.nextSplitBatchFuture);
anyBlockedOnNextSplitBatch = true;
continue;
}
}
Multimap<Node, Split> splitAssignment = ImmutableMultimap.of();
if (!pendingSplits.isEmpty()) {
if (!scheduleGroup.placementFuture.isDone()) {
continue;
}
if (state == State.INITIALIZED) {
state = State.SPLITS_ADDED;
}
// 计算分片分配的位置,根据前面生成的策略,这一步其实是将split分配到不同的node上去
SplitPlacementResult splitPlacementResult = splitPlacementPolicy.computeAssignments(pendingSplits);
splitAssignment = splitPlacementResult.getAssignments();
// remove splits with successful placements
splitAssignment.values().forEach(pendingSplits::remove); // AbstractSet.removeAll performs terribly here.
overallSplitAssignmentCount += splitAssignment.size();
// if not completed placed, mark scheduleGroup as blocked on placement
if (!pendingSplits.isEmpty()) {
scheduleGroup.placementFuture = splitPlacementResult.getBlocked();
overallBlockedFutures.add(scheduleGroup.placementFuture);
anyBlockedOnPlacements = true;
}
}
// if no new splits will be assigned, update state and attach completion event
Multimap<Node, Lifespan> noMoreSplitsNotification = ImmutableMultimap.of();
if (pendingSplits.isEmpty() && scheduleGroup.state == ScheduleGroupState.NO_MORE_SPLITS) {
scheduleGroup.state = ScheduleGroupState.DONE;
if (!lifespan.isTaskWide()) {
Node node = ((FixedSplitPlacementPolicy) splitPlacementPolicy).getNodeForBucket(lifespan.getId());
noMoreSplitsNotification = ImmutableMultimap.of(node, lifespan);
}
}
//将split分配到不同的node上执行,输入node和split放回一个RemoteTask,然后执行task
overallNewTasks.addAll(assignSplits(splitAssignment, noMoreSplitsNotification));
// Assert that "placement future is not done" implies "pendingSplits is not empty".
// The other way around is not true. One obvious reason is (un)lucky timing, where the placement is unblocked between `computeAssignments` and this line.
// However, there are other reasons that could lead to this.
// Note that `computeAssignments` is quite broken:
// 1. It always returns a completed future when there are no tasks, regardless of whether all nodes are blocked.
// 2. The returned future will only be completed when a node with an assigned task becomes unblocked. Other nodes don't trigger future completion.
// As a result, to avoid busy loops caused by 1, we check pendingSplits.isEmpty() instead of placementFuture.isDone() here.
if (scheduleGroup.nextSplitBatchFuture == null && scheduleGroup.pendingSplits.isEmpty() && scheduleGroup.state != ScheduleGroupState.DONE) {
anyNotBlocked = true;
}
}
if (autoDropCompletedLifespans) {
drainCompletedLifespans();
}
// * `splitSource.isFinished` invocation may fail after `splitSource.close` has been invoked.
// If state is NO_MORE_SPLITS/FINISHED, splitSource.isFinished has previously returned true, and splitSource is closed now.
// * Even if `splitSource.isFinished()` return true, it is not necessarily safe to tear down the split source.
// * If anyBlockedOnNextSplitBatch is true, it means we have not checked out the recently completed nextSplitBatch futures,
// which may contain recently published splits. We must not ignore those.
// * If any scheduleGroup is still in DISCOVERING_SPLITS state, it means it hasn't realized that there will be no more splits.
// Next time it invokes getNextBatch, it will realize that. However, the invocation will fail we tear down splitSource now.
if ((state == State.NO_MORE_SPLITS || state == State.FINISHED) || (scheduleGroups.isEmpty() && splitSource.isFinished())) {
switch (state) {
case INITIALIZED:
// we have not scheduled a single split so far
state = State.SPLITS_ADDED;
ScheduleResult emptySplitScheduleResult = scheduleEmptySplit();
overallNewTasks.addAll(emptySplitScheduleResult.getNewTasks());
overallSplitAssignmentCount++;
// fall through
case SPLITS_ADDED:
state = State.NO_MORE_SPLITS;
splitSource.close();
// fall through
case NO_MORE_SPLITS:
if (!scheduleGroups.isEmpty()) {
// we are blocked on split assignment
break;
}
state = State.FINISHED;
whenFinishedOrNewLifespanAdded.set(null);
// fall through
case FINISHED:
return new ScheduleResult(
true,
overallNewTasks.build(),
overallSplitAssignmentCount);
default:
throw new IllegalStateException("Unknown state");
}
}
if (anyNotBlocked) {
return new ScheduleResult(false, overallNewTasks.build(), overallSplitAssignmentCount);
}
// Only try to finalize task creation when scheduling would block
overallNewTasks.addAll(finalizeTaskCreationIfNecessary());
ScheduleResult.BlockedReason blockedReason;
if (anyBlockedOnNextSplitBatch) {
blockedReason = anyBlockedOnPlacements ? MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE : WAITING_FOR_SOURCE;
}
else {
blockedReason = anyBlockedOnPlacements ? SPLIT_QUEUES_FULL : NO_ACTIVE_DRIVER_GROUP;
}
overallBlockedFutures.add(whenFinishedOrNewLifespanAdded);
return new ScheduleResult(
false,
overallNewTasks.build(),
nonCancellationPropagating(whenAnyComplete(overallBlockedFutures)),
blockedReason,
overallSplitAssignmentCount);
}
splitPlacementPolicy.computeAssignments()方法
DynamicSplitPlacementPolicy类 动态分配逻辑的实现
@Override
public SplitPlacementResult computeAssignments(Set<Split> splits)
{
//调用了nodeSelector的计算分配的方法
return nodeSelector.computeAssignments(splits, remoteTasks.get());
}
//前面提到过nodeSelector接口的实现类基本是都是TopologyAwareNodeSelector,下面是TopologyAwareNodeSelector分配split的实现逻辑
@Override
public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTask> existingTasks)
{
//拿出selector里面的nodeMap
NodeMap nodeMap = this.nodeMap.get().get();
//创建分配Map
Multimap<Node, Split> assignment = HashMultimap.create();
NodeAssignmentStats assignmentStats = new NodeAssignmentStats(nodeTaskMap, nodeMap, existingTasks);
int[] topologicCounters = new int[topologicalSplitCounters.size()];
Set<NetworkLocation> filledLocations = new HashSet<>();
Set<Node> blockedExactNodes = new HashSet<>();
boolean splitWaitingForAnyNode = false;
for (Split split : splits) {
//先判断这个split能不能远程获取,如果不能的话,则取出split对应的网络地址,看能否找到对应的node,包括Coordinator,然后放到候选节点里面,如果找不到对应的节点,则抛出异常
if (!split.isRemotelyAccessible()) {
List<Node> candidateNodes = selectExactNodes(nodeMap, split.getAddresses(), includeCoordinator);
if (candidateNodes.isEmpty()) {
log.debug("No nodes available to schedule %s. Available nodes %s", split, nodeMap.getNodesByHost().keys());
throw new PrestoException(NO_NODES_AVAILABLE, "No nodes available to run query");
}
//这个里面涉及到一个选择策略,主要是最小候选节点数,和最大task分配的split进行撮合对比
Node chosenNode = bestNodeSplitCount(candidateNodes.iterator(), minCandidates, maxPendingSplitsPerTask, assignmentStats);
//如果可以选出来
if (chosenNode != null) {
//放入选择的node和对应的split
assignment.put(chosenNode, split);
assignmentStats.addAssignedSplit(chosenNode);
}
// Exact node set won't matter, if a split is waiting for any node
else if (!splitWaitingForAnyNode) {
//如果根据策略找不到,则先把所有的候选节点放到这个set中
blockedExactNodes.addAll(candidateNodes);
}
continue;
}
//如果不存在远程获取的问题,则根据下面的
Node chosenNode = null;
//生成网络层数,后面需要递归
int depth = networkLocationSegmentNames.size();
int chosenDepth = 0;
Set<NetworkLocation> locations = new HashSet<>();
//把这个split对应网络地址遍历放在NetworkLocation集合中
for (HostAddress host : split.getAddresses()) {
locations.add(networkLocationCache.get(host));
}
//如果缓存里面获取不到地址,则放入root地址,并将网络层数置为0
if (locations.isEmpty()) {
// Add the root location
locations.add(ROOT_LOCATION);
depth = 0;
}
// Try each address at progressively shallower network locations
for (int i = depth; i >= 0 && chosenNode == null; i--) {
for (NetworkLocation location : locations) {
// Skip locations which are only shallower than this level
// For example, locations which couldn't be located will be at the "root" location
if (location.getSegments().size() < i) {
continue;
}
location = location.subLocation(0, i);
if (filledLocations.contains(location)) {
continue;
}
Set<Node> nodes = nodeMap.getWorkersByNetworkPath().get(location);
chosenNode = bestNodeSplitCount(new ResettableRandomizedIterator<>(nodes), minCandidates, calculateMaxPendingSplits(i, depth), assignmentStats);
if (chosenNode != null) {
chosenDepth = i;
break;
}
filledLocations.add(location);
}
}
if (chosenNode != null) {
//放入选择的node和对应的split
assignment.put(chosenNode, split);
assignmentStats.addAssignedSplit(chosenNode);
topologicCounters[chosenDepth]++;
}
else {
splitWaitingForAnyNode = true;
}
}
for (int i = 0; i < topologicCounters.length; i++) {
if (topologicCounters[i] > 0) {
topologicalSplitCounters.get(i).update(topologicCounters[i]);
}
}
ListenableFuture<?> blocked;
int maxPendingForWildcardNetworkAffinity = calculateMaxPendingSplits(0, networkLocationSegmentNames.size());
if (splitWaitingForAnyNode) {
blocked = toWhenHasSplitQueueSpaceFuture(existingTasks, calculateLowWatermark(maxPendingForWildcardNetworkAffinity));
}
else {
blocked = toWhenHasSplitQueueSpaceFuture(blockedExactNodes, existingTasks, calculateLowWatermark(maxPendingForWildcardNetworkAffinity));
}
return new SplitPlacementResult(blocked, assignment);
}
SourcePartitionedScheduler类的assignSplits方法
//对传入的参数splitAssignment进行遍历,对每个entry都执行如下操作
//根据node获取该node上的task,若task为空,则新建一个task,否则将该node上的split提交给运行在该node上的task进行处理
private Set<RemoteTask> assignSplits(Multimap<Node, Split> splitAssignment, Multimap<Node, Lifespan> noMoreSplitsNotification)
{
ImmutableSet.Builder<RemoteTask> newTasks = ImmutableSet.builder();
ImmutableSet<Node> nodes = ImmutableSet.<Node>builder()
.addAll(splitAssignment.keySet())
.addAll(noMoreSplitsNotification.keySet())
.build();
for (Node node : nodes) {
// source partitioned tasks can only receive broadcast data; otherwise it would have a different distribution
ImmutableMultimap<PlanNodeId, Split> splits = ImmutableMultimap.<PlanNodeId, Split>builder()
.putAll(partitionedNode, splitAssignment.get(node))
.build();
ImmutableMultimap.Builder<PlanNodeId, Lifespan> noMoreSplits = ImmutableMultimap.builder();
if (noMoreSplitsNotification.containsKey(node)) {
noMoreSplits.putAll(partitionedNode, noMoreSplitsNotification.get(node));
}
newTasks.addAll(stage.scheduleSplits(
node,
splits,
noMoreSplits.build()));
}
return newTasks.build();
}
SqlStageExecution类的scheduleSplits方法
public synchronized Set<RemoteTask> scheduleSplits(Node node, Multimap<PlanNodeId, Split> splits, Multimap<PlanNodeId, Lifespan> noMoreSplitsNotification)
{
requireNonNull(node, "node is null");
requireNonNull(splits, "splits is null");
splitsScheduled.set(true);
checkArgument(stateMachine.getFragment().getPartitionedSources().containsAll(splits.keySet()), "Invalid splits");
ImmutableSet.Builder<RemoteTask> newTasks = ImmutableSet.builder();
Collection<RemoteTask> tasks = this.tasks.get(node);
RemoteTask task;
if (tasks == null) {
// The output buffer depends on the task id starting from 0 and being sequential, since each
// task is assigned a private buffer based on task id.
TaskId taskId = new TaskId(stateMachine.getStageId(), nextTaskId.getAndIncrement());
task = scheduleTask(node, taskId, splits);
newTasks.add(task);
}
else {
task = tasks.iterator().next();
task.addSplits(splits);
}
if (noMoreSplitsNotification.size() > 1) {
// The assumption that `noMoreSplitsNotification.size() <= 1` currently holds.
// If this assumption no longer holds, we should consider calling task.noMoreSplits with multiple entries in one shot.
// These kind of methods can be expensive since they are grabbing locks and/or sending HTTP requests on change.
throw new UnsupportedOperationException("This assumption no longer holds: noMoreSplitsNotification.size() < 1");
}
for (Entry<PlanNodeId, Lifespan> entry : noMoreSplitsNotification.entries()) {
task.noMoreSplits(entry.getKey(), entry.getValue());
}
return newTasks.build();
第二种为FixedSourcePartitionedScheduler
public ScheduleResult schedule()
{
// schedule a task on every node in the distribution
List<RemoteTask> newTasks = ImmutableList.of();
if (!scheduledTasks) {
newTasks = partitioning.getPartitionToNode().entrySet().stream()
.map(entry -> stage.scheduleTask(entry.getValue(), entry.getKey()))
.collect(toImmutableList());
scheduledTasks = true;
}
boolean allBlocked = true;
List<ListenableFuture<?>> blocked = new ArrayList<>();
BlockedReason blockedReason = BlockedReason.NO_ACTIVE_DRIVER_GROUP;
int splitsScheduled = 0;
Iterator<SourcePartitionedScheduler> schedulerIterator = sourcePartitionedSchedulers.iterator();
List<Lifespan> driverGroupsToStart = ImmutableList.of();
while (schedulerIterator.hasNext()) {
SourcePartitionedScheduler sourcePartitionedScheduler = schedulerIterator.next();
for (Lifespan lifespan : driverGroupsToStart) {
sourcePartitionedScheduler.startLifespan(lifespan, partitionHandleFor(lifespan));
}
ScheduleResult schedule = sourcePartitionedScheduler.schedule();
splitsScheduled += schedule.getSplitsScheduled();
if (schedule.getBlockedReason().isPresent()) {
blocked.add(schedule.getBlocked());
blockedReason = blockedReason.combineWith(schedule.getBlockedReason().get());
}
else {
verify(schedule.getBlocked().isDone(), "blockedReason not provided when scheduler is blocked");
allBlocked = false;
}
driverGroupsToStart = sourcePartitionedScheduler.drainCompletedLifespans();
if (schedule.isFinished()) {
schedulerIterator.remove();
sourcePartitionedScheduler.close();
}
}
if (allBlocked) {
return new ScheduleResult(sourcePartitionedSchedulers.isEmpty(), newTasks, whenAnyComplete(blocked), blockedReason, splitsScheduled);
}
else {
return new ScheduleResult(sourcePartitionedSchedulers.isEmpty(), newTasks, splitsScheduled);
}
}
第三种为FixedCountScheduler
public ScheduleResult schedule()
{
List<RemoteTask> newTasks = partitionToNode.entrySet().stream()
.map(entry -> taskScheduler.apply(entry.getValue(), entry.getKey()))
.collect(toImmutableList());
return new ScheduleResult(true, newTasks, 0);
}
2.5生成RemoteTask任务
根据Presto的架构,stage调度会产生task任务下发到worker上执行
SqlStageExecution类的scheduleTask方法
private synchronized RemoteTask scheduleTask(Node node, TaskId taskId, Multimap<PlanNodeId, Split> sourceSplits)
{
ImmutableMultimap.Builder<PlanNodeId, Split> initialSplits = ImmutableMultimap.builder();
//搜集所有的sourceSplits,在类型为source的stage中,该方法传入参数sourceSplits是值的,而在fixed和single的stage中,该方法的传入参数sourceSplits是没有值的
initialSplits.putAll(sourceSplits);
sourceTasks.forEach((planNodeId, task) -> {
TaskStatus status = task.getTaskStatus();
if (status.getState() != TaskState.FINISHED) {
initialSplits.put(planNodeId, createRemoteSplitFor(taskId, status.getSelf()));
}
});
OutputBuffers outputBuffers = this.outputBuffers.get();
checkState(outputBuffers != null, "Initial output buffers must be set before a task can be scheduled");
//创建远程的task任务
RemoteTask task = remoteTaskFactory.createRemoteTask(
stateMachine.getSession(),
taskId,
node,
stateMachine.getFragment(),
initialSplits.build(),
outputBuffers,
nodeTaskMap.createPartitionedSplitCountTracker(node, taskId),
summarizeTaskInfo);
completeSources.forEach(task::noMoreSplits);
allTasks.add(taskId);
tasks.computeIfAbsent(node, key -> newConcurrentHashSet()).add(task);
nodeTaskMap.addTask(node, task);
task.addStateChangeListener(new StageTaskListener());
if (!stateMachine.getState().isDone()) {
task.start();
}
else {
task.abort();
}
return task;
}
RemoteTask接口对应实现类HttpRemoteTask
public HttpRemoteTask(Session session,
TaskId taskId,
String nodeId,
URI location,
PlanFragment planFragment,
Multimap<PlanNodeId, Split> initialSplits,
OutputBuffers outputBuffers,
HttpClient httpClient,
Executor executor,
ScheduledExecutorService updateScheduledExecutor,
ScheduledExecutorService errorScheduledExecutor,
Duration minErrorDuration,
Duration maxErrorDuration,
Duration taskStatusRefreshMaxWait,
Duration taskInfoUpdateInterval,
boolean summarizeTaskInfo,
JsonCodec<TaskStatus> taskStatusCodec,
JsonCodec<TaskInfo> taskInfoCodec,
JsonCodec<TaskUpdateRequest> taskUpdateRequestCodec,
PartitionedSplitCountTracker partitionedSplitCountTracker,
RemoteTaskStats stats)
{
requireNonNull(session, "session is null");
requireNonNull(taskId, "taskId is null");
requireNonNull(nodeId, "nodeId is null");
requireNonNull(location, "location is null");
requireNonNull(planFragment, "planFragment is null");
requireNonNull(outputBuffers, "outputBuffers is null");
requireNonNull(httpClient, "httpClient is null");
requireNonNull(executor, "executor is null");
requireNonNull(taskStatusCodec, "taskStatusCodec is null");
requireNonNull(taskInfoCodec, "taskInfoCodec is null");
requireNonNull(taskUpdateRequestCodec, "taskUpdateRequestCodec is null");
requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");
requireNonNull(stats, "stats is null");
try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
this.taskId = taskId;
this.session = session;
this.nodeId = nodeId;
this.planFragment = planFragment;
this.outputBuffers.set(outputBuffers);
this.httpClient = httpClient;
this.executor = executor;
this.errorScheduledExecutor = errorScheduledExecutor;
this.summarizeTaskInfo = summarizeTaskInfo;
this.taskInfoCodec = taskInfoCodec;
this.taskUpdateRequestCodec = taskUpdateRequestCodec;
this.updateErrorTracker = new RequestErrorTracker(taskId, location, minErrorDuration, maxErrorDuration, errorScheduledExecutor, "updating task");
this.partitionedSplitCountTracker = requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");
this.stats = stats;
for (Entry<PlanNodeId, Split> entry : requireNonNull(initialSplits, "initialSplits is null").entries()) {
ScheduledSplit scheduledSplit = new ScheduledSplit(nextSplitId.getAndIncrement(), entry.getKey(), entry.getValue());
pendingSplits.put(entry.getKey(), scheduledSplit);
}
pendingSourceSplitCount = planFragment.getPartitionedSources().stream()
.filter(initialSplits::containsKey)
.mapToInt(partitionedSource -> initialSplits.get(partitionedSource).size())
.sum();
List<BufferInfo> bufferStates = outputBuffers.getBuffers()
.keySet().stream()
.map(outputId -> new BufferInfo(outputId, false, 0, 0, PageBufferInfo.empty()))
.collect(toImmutableList());
TaskInfo initialTask = createInitialTask(taskId, location, nodeId, bufferStates, new TaskStats(DateTime.now(), null));
this.taskStatusFetcher = new ContinuousTaskStatusFetcher(
this::failTask,
initialTask.getTaskStatus(),
taskStatusRefreshMaxWait,
taskStatusCodec,
executor,
httpClient,
minErrorDuration,
maxErrorDuration,
errorScheduledExecutor,
stats);
this.taskInfoFetcher = new TaskInfoFetcher(
this::failTask,
initialTask,
httpClient,
taskInfoUpdateInterval,
taskInfoCodec,
minErrorDuration,
maxErrorDuration,
summarizeTaskInfo,
executor,
updateScheduledExecutor,
errorScheduledExecutor,
stats);
taskStatusFetcher.addStateChangeListener(newStatus -> {
TaskState state = newStatus.getState();
if (state.isDone()) {
cleanUpTask();
}
else {
partitionedSplitCountTracker.setPartitionedSplitCount(getPartitionedSplitCount());
updateSplitQueueSpace();
}
});
long timeout = minErrorDuration.toMillis() / MIN_RETRIES;
this.requestTimeout = new Duration(timeout + taskStatusRefreshMaxWait.toMillis(), MILLISECONDS);
partitionedSplitCountTracker.setPartitionedSplitCount(getPartitionedSplitCount());
updateSplitQueueSpace();
}
}
Task的start方法,开始轮询对应的task状态
public void start()
{
try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
// to start we just need to trigger an update
scheduleUpdate();
taskStatusFetcher.start();
taskInfoFetcher.start();
}
}
exchange用于从上游stage中获取数据,而outputBuffer则将当前stage的数据输出给下游stage
2.6Task执行
2.6.1Worker接收Task任务
前面创建RemoteTask后,通过http rest请求将task任务下放到对应的worker上去
@Path("/v1/task")
@POST
@Path("{taskId}")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response createOrUpdateTask(@PathParam("taskId") TaskId taskId, TaskUpdateRequest taskUpdateRequest, @Context UriInfo uriInfo)
{
requireNonNull(taskUpdateRequest, "taskUpdateRequest is null");
Session session = taskUpdateRequest.getSession().toSession(sessionPropertyManager);
TaskInfo taskInfo = taskManager.updateTask(session,
taskId,
taskUpdateRequest.getFragment(),
taskUpdateRequest.getSources(),
taskUpdateRequest.getOutputIds());
if (shouldSummarize(uriInfo)) {
taskInfo = taskInfo.summarize();
}
return Response.ok().entity(taskInfo).build();
}
SqlTaskManager类的updateTask方法
@Override
public TaskInfo updateTask(Session session, TaskId taskId, Optional<PlanFragment> fragment, List<TaskSource> sources, OutputBuffers outputBuffers)
{
requireNonNull(session, "session is null");
requireNonNull(taskId, "taskId is null");
requireNonNull(fragment, "fragment is null");
requireNonNull(sources, "sources is null");
requireNonNull(outputBuffers, "outputBuffers is null");
if (resourceOvercommit(session)) {
// TODO: This should have been done when the QueryContext was created. However, the session isn't available at that point.
queryContexts.getUnchecked(taskId.getQueryId()).setResourceOvercommit();
}
SqlTask sqlTask = tasks.getUnchecked(taskId);
sqlTask.recordHeartbeat();
return sqlTask.updateTask(session, fragment, sources, outputBuffers);
}
public TaskInfo updateTask(Session session, Optional<PlanFragment> fragment, List<TaskSource> sources, OutputBuffers outputBuffers)
{
try {
// The LazyOutput buffer does not support write methods, so the actual
// output buffer must be established before drivers are created (e.g.
// a VALUES query).
outputBuffer.setOutputBuffers(outputBuffers);
// assure the task execution is only created once
SqlTaskExecution taskExecution;
synchronized (this) {
// is task already complete?
TaskHolder taskHolder = taskHolderReference.get();
if (taskHolder.isFinished()) {
return taskHolder.getFinalTaskInfo();
}
taskExecution = taskHolder.getTaskExecution();
if (taskExecution == null) {
checkState(fragment.isPresent(), "fragment must be present");
//首次的话会新建一个task执行器
taskExecution = sqlTaskExecutionFactory.create(session, queryContext, taskStateMachine, outputBuffer, fragment.get(), sources);
taskHolderReference.compareAndSet(taskHolder, new TaskHolder(taskExecution));
needsPlan.set(false);
}
}
if (taskExecution != null) {
taskExecution.addSources(sources);
}
}
catch (Error e) {
failed(e);
throw e;
}
catch (RuntimeException e) {
failed(e);
}
return getTaskInfo();
}
public SqlTaskExecution create(Session session, QueryContext queryContext, TaskStateMachine taskStateMachine, OutputBuffer outputBuffer, PlanFragment fragment, List<TaskSource> sources)
{
boolean verboseStats = getVerboseStats(session);
TaskContext taskContext = queryContext.addTaskContext(
taskStateMachine,
session,
verboseStats,
cpuTimerEnabled);
LocalExecutionPlan localExecutionPlan;
try (SetThreadName ignored = new SetThreadName("Task-%s", taskStateMachine.getTaskId())) {
try {
localExecutionPlan = planner.plan(
taskContext,
fragment.getRoot(),
fragment.getSymbols(),
fragment.getPartitioningScheme(),
fragment.getPipelineExecutionStrategy() == GROUPED_EXECUTION,
fragment.getPartitionedSources(),
outputBuffer);
for (DriverFactory driverFactory : localExecutionPlan.getDriverFactories()) {
Optional<PlanNodeId> sourceId = driverFactory.getSourceId();
if (sourceId.isPresent() && fragment.isPartitionedSources(sourceId.get())) {
checkArgument(fragment.getPipelineExecutionStrategy() == driverFactory.getPipelineExecutionStrategy(),
"Partitioned pipelines are expected to have the same execution strategy as the fragment");
}
else {
checkArgument(fragment.getPipelineExecutionStrategy() != UNGROUPED_EXECUTION || driverFactory.getPipelineExecutionStrategy() == UNGROUPED_EXECUTION,
"When fragment execution strategy is ungrouped, all pipelines should have ungrouped execution strategy");
}
}
}
catch (Throwable e) {
// planning failed
taskStateMachine.failed(e);
throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
return createSqlTaskExecution(
taskStateMachine,
taskContext,
outputBuffer,
sources,
localExecutionPlan,
taskExecutor,
taskNotificationExecutor,
queryMonitor);
}
SqlTaskExecution类的createSqlTaskExecution方法
static SqlTaskExecution createSqlTaskExecution(
TaskStateMachine taskStateMachine,
TaskContext taskContext,
OutputBuffer outputBuffer,
List<TaskSource> sources,
LocalExecutionPlan localExecutionPlan,
TaskExecutor taskExecutor,
Executor notificationExecutor,
QueryMonitor queryMonitor)
{
SqlTaskExecution task = new SqlTaskExecution(
taskStateMachine,
taskContext,
outputBuffer,
localExecutionPlan,
taskExecutor,
queryMonitor,
notificationExecutor);
try (SetThreadName ignored = new SetThreadName("Task-%s", task.getTaskId())) {
// The scheduleDriversForTaskLifeCycle method calls enqueueDriverSplitRunner, which registers a callback with access to this object.
// The call back is accessed from another thread, so this code can not be placed in the constructor.
//tasks是一个全局缓存,根据taskId获取已经缓存的sqlTask,若没有则新建一个
SqlTask sqlTask = tasks.getUnchecked(taskId);
sqlTask.recordHeartbeat();
//执行sqlTask,并返回执行信息
return sqlTask.updateTask(session, fragment, sources, outputBuffers);
}
}
2.6.2Worker启动执行
Worker启动的时候,调用TaskExecutor类的start方法,其主要作用就是处理在Worker上运行的所有Task中的Split
@PostConstruct
public synchronized void start()
{
//runnerThreads 的值通过配置参数:task.max-worker-threads进行配置的,默认值为当前cpu核数*4
checkState(!closed, "TaskExecutor is closed");
for (int i = 0; i < runnerThreads; i++) {
addRunnerThread();
}
splitMonitorExecutor.scheduleWithFixedDelay(this::monitorActiveSplits, 1, 1, TimeUnit.MINUTES);
}
TaskExecutor类addRunnerThread方法
private synchronized void addRunnerThread()
{
try {
//Runner是本类TaskExecutor的内部类
executor.execute(new TaskRunner());
}
catch (RejectedExecutionException ignored) {
}
}
TaskRunner类
private class TaskRunner
implements Runnable
{
private final long runnerId = NEXT_RUNNER_ID.getAndIncrement();
@Override
public void run()
{
try (SetThreadName runnerName = new SetThreadName("SplitRunner-%s", runnerId)) {
while (!closed && !Thread.currentThread().isInterrupted()) {
// select next worker
//获取下一个需要处理的PrioritizedSplitRunner对象,PrioritizedSplitRunner是对作用于一个Split所有操作的包装,封装了作用于一个Split上的一系列的Operator
//优先级SplitRunner
final PrioritizedSplitRunner split;
try {
//从等待队列中取出一个split
split = waitingSplits.take();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
String threadId = split.getTaskHandle().getTaskId() + "-" + split.getSplitId();
try (SetThreadName splitName = new SetThreadName(threadId)) {
RunningSplitInfo splitInfo = new RunningSplitInfo(ticker.read(), threadId, Thread.currentThread());
runningSplitInfos.add(splitInfo);
//将取出的split加入到runningSplit队列,该队列中保存了所有正在处理的split
runningSplits.add(split);
ListenableFuture<?> blocked;
try {
//调用各个Split的process()方法
blocked = split.process();
}
finally {
runningSplitInfos.remove(splitInfo);
//执行完毕之后,需要将Split从runningSplits中移除
runningSplits.remove(split);
}
//finished表示整个split是否已经处理完毕
if (split.isFinished()) {
log.debug("%s is finished", split.getInfo());
splitFinished(split);
}
else {
//blocked表示本次执行是否完毕
if (blocked.isDone()) {
//如果本次执行完毕了,split还没有被处理完毕,则继续放到等待队列中
waitingSplits.offer(split);
}
else {
//放到阻塞队列中
blockedSplits.put(split, blocked);
blocked.addListener(() -> {
//一旦固定时间片执行完毕,则从阻塞队列中移除
blockedSplits.remove(split);
//重新设置优先级
split.resetLevelPriority();
//重新放回到等待队列中
waitingSplits.offer(split);
}, executor);
}
}
}
catch (Throwable t) {
// ignore random errors due to driver thread interruption
if (!split.isDestroyed()) {
if (t instanceof PrestoException) {
PrestoException e = (PrestoException) t;
log.error("Error processing %s: %s: %s", split.getInfo(), e.getErrorCode().getName(), e.getMessage());
}
else {
log.error(t, "Error processing %s", split.getInfo());
}
}
splitFinished(split);
}
}
}
finally {
//如果线程被中断,或者TaskExecutor结束
if (!closed) {
//如果是线程被中断,然后TaskExecutor尚未结束,则重新启动一个Runner线程
addRunnerThread();
}
}
}
}
所有对split的处理均由split.process完成,此处的split是PrioritizedSplitRunner的实例
public ListenableFuture<?> process()
throws Exception
{
try {
long startNanos = ticker.read();
start.compareAndSet(0, startNanos);
lastReady.compareAndSet(0, startNanos);
processCalls.incrementAndGet();
waitNanos.getAndAdd(startNanos - lastReady.get());
CpuTimer timer = new CpuTimer();
//调用split的processFor(Duration duration)方法进行实际的split的处理,这里的split是SplitRunner的实例,然而SplitRunner的实例主要是DriverSplitRunner,SPLIT_RUN_QUANTA值是一个时间段,默认为一秒
ListenableFuture<?> blocked = split.processFor(SPLIT_RUN_QUANTA);
CpuTimer.CpuDuration elapsed = timer.elapsedTime();
long quantaScheduledNanos = ticker.read() - startNanos;
scheduledNanos.addAndGet(quantaScheduledNanos);
priority.set(taskHandle.addScheduledNanos(quantaScheduledNanos));
lastRun.set(ticker.read());
if (blocked == NOT_BLOCKED) {
unblockedQuantaWallTime.add(elapsed.getWall());
}
else {
blockedQuantaWallTime.add(elapsed.getWall());
}
long quantaCpuNanos = elapsed.getCpu().roundTo(NANOSECONDS);
cpuTimeNanos.addAndGet(quantaCpuNanos);
globalCpuTimeMicros.update(quantaCpuNanos / 1000);
globalScheduledTimeMicros.update(quantaScheduledNanos / 1000);
return blocked;
}
catch (Throwable e) {
finishedFuture.setException(e);
throw e;
}
}
2.6.3生成Driver
DriverSplitRunner类的processFor方法,DriverSplitRunner是SqlTaskExecution类的内部类
@Override
public ListenableFuture<?> processFor(Duration duration)
{
//driver是作用于split上的一系列的operator的封装类,driver需要处理的Split存储在属性newSources中
Driver driver;
synchronized (this) {
//如果在执行该方法前,DriverSplitRunner就已经结束了,那么就没有必要进行后续的操作了,直接返回一个value为null的ListenableFuture即可
if (closed) {
return Futures.immediateFuture(null);
}
//若当前的Driver为null,则需要首先根据Client指定的split创建一个driver,partitionedSplit是类DriverSplitRunner中的属性,其类型为ScheduledSplit,而ScheduledSplit是Split的封装类
if (this.driver == null) {
this.driver = driverSplitRunnerFactory.createDriver(driverContext, partitionedSplit);
}
//driver是作用于split上的一系列的operator的封装类,driver需要处理的split存储在属性newSources中
driver = this.driver;
}
return driver.processFor(duration);
}
2.6.4Driver执行
Driver类的processFor方法
public ListenableFuture<?> processFor(Duration duration)
{
checkLockNotHeld("Can not process for a duration while holding the driver lock");
requireNonNull(duration, "duration is null");
// if the driver is blocked we don't need to continue
SettableFuture<?> blockedFuture = driverBlockedFuture.get();
if (!blockedFuture.isDone()) {
return blockedFuture;
}
//最多可以运行时间
long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);
//当前线程获得锁,若有其他线程持有锁,则最多等待100毫秒
Optional<ListenableFuture<?>> result = tryWithLock(100, TimeUnit.MILLISECONDS, () -> {
driverContext.startProcessTimer();
driverContext.getYieldSignal().setWithDelay(maxRuntime, driverContext.getYieldExecutor());
try {
long start = System.nanoTime();
do {
//对split的实际处理,在processInternal中
ListenableFuture<?> future = processInternal();
if (!future.isDone()) {
return updateDriverBlockedFuture(future);
}
}
while (System.nanoTime() - start < maxRuntime && !isFinishedInternal());
}
finally {
driverContext.getYieldSignal().reset();
driverContext.recordProcessed();
}
return NOT_BLOCKED;
});
return result.orElse(NOT_BLOCKED);
}
Driver类的processInternal方法
@GuardedBy("exclusiveLock")
private ListenableFuture<?> processInternal()
{
checkLockHeld("Lock must be held to call processInternal");
handleMemoryRevoke();
try {
//如果有尚未处理的读取的split,将未读取的split加入到sourceOperator中
processNewSources();
//如果只有一个Operator则特别处理
if (operators.size() == 1) {
//如果当前Driver已经执行完毕,则返回NOT_BLOCKED
if (driverContext.isDone()) {
return NOT_BLOCKED;
}
//获取Operator
Operator current = operators.get(0);
//判断Operator是否阻塞
Optional<ListenableFuture<?>> blocked = getBlockedFuture(current);
if (blocked.isPresent()) {
current.getOperatorContext().recordBlocked(blocked.get());
return blocked.get();
}
//若未阻塞,则直接结束当前Operator
// there is only one operator so just finish it
current.getOperatorContext().startIntervalTimer();
current.finish();
current.getOperatorContext().recordFinish();
return NOT_BLOCKED;
}
boolean movedPage = false;
//若Operator的个数大于1,则执行下面的循环,从下面的循环可以看出,每次取出相邻的两个Operator,得到前一个Operator的输出数据,然后将该输出数据作为后一个Operator的输入数据
for (int i = 0; i < operators.size() - 1 && !driverContext.isDone(); i++) {
//一次取出相邻的两个Operator
Operator current = operators.get(i);
Operator next = operators.get(i + 1);
// skip blocked operator
if (getBlockedFuture(current).isPresent()) {
continue;
}
//如果当前Operator没有结束,而且下一个Operator也需要输入
if (!current.isFinished() && !getBlockedFuture(next).isPresent() && next.needsInput()) {
//从当前Operator中获得OutputPage,然后将该page作为输入,交给下一个Operator进行操作
current.getOperatorContext().startIntervalTimer();
//Operator对Page操作的核心逻辑,不同的Operator对Page的操作处理不一样,下面以LimitOperator为示例
Page page = current.getOutput();
current.getOperatorContext().recordGetOutput(page);
//将获得的OutputPage交给下一个Operator进行处理
if (page != null && page.getPositionCount() != 0) {
next.getOperatorContext().startIntervalTimer();
//Operator对Page操作的核心逻辑,不同的Operator对Page的操作处理不一样
next.addInput(page);
next.getOperatorContext().recordAddInput(page);
//标示,表示进行了Page的移动
movedPage = true;
}
if (current instanceof SourceOperator) {
movedPage = true;
}
}
//如果当前的Operator已经完成了,则通知下一个Operator:不会再有输入了,需要完成数据处理,并将结果进行刷新
if (current.isFinished()) {
// let next operator know there will be no more data
next.getOperatorContext().startIntervalTimer();
next.finish();
next.getOperatorContext().recordFinish();
}
}
//如果所有的Operator都已经循环完毕了,但是没有发生Page的移动,我们需要检查是否有Operator被block住了
if (!movedPage) {
List<Operator> blockedOperators = new ArrayList<>();
List<ListenableFuture<?>> blockedFutures = new ArrayList<>();
//循环所有的Operator,并获得每个Operator的ListenableFuture对象,判断:若当前Operator已经执行结束,则会返回其是否在等待额外的内存
for (Operator operator : operators) {
Optional<ListenableFuture<?>> blocked = getBlockedFuture(operator);
if (blocked.isPresent()) {
blockedOperators.add(operator);
blockedFutures.add(blocked.get());
}
}
//若确实有Operator被阻塞住了
if (!blockedFutures.isEmpty()) {
// unblock when the first future is complete
//任意一个ListenableFuture完成,就会解除当前Driver的阻塞状态
ListenableFuture<?> blocked = firstFinishedFuture(blockedFutures);
// driver records serial blocked time
//当前Driver添加monitor实时监听是否已经解除阻塞状态
driverContext.recordBlocked(blocked);
// each blocked operator is responsible for blocking the execution
// until one of the operators can continue
//为每个Operator注册监听器,实时监听是否已经解除阻塞状态
for (Operator operator : blockedOperators) {
operator.getOperatorContext().recordBlocked(blocked);
}
return blocked;
}
}
return NOT_BLOCKED;
}
catch (Throwable t) {
List<StackTraceElement> interrupterStack = exclusiveLock.getInterrupterStack();
if (interrupterStack == null) {
driverContext.failed(t);
throw t;
}
// Driver thread was interrupted which should only happen if the task is already finished.
// If this becomes the actual cause of a failed query there is a bug in the task state machine.
Exception exception = new Exception("Interrupted By");
exception.setStackTrace(interrupterStack.stream().toArray(StackTraceElement[]::new));
PrestoException newException = new PrestoException(GENERIC_INTERNAL_ERROR, "Driver was interrupted", exception);
newException.addSuppressed(t);
driverContext.failed(newException);
throw newException;
}
}
Driver类的getBlockedFuture判断指定的Operator是否阻塞
private Optional<ListenableFuture<?>> getBlockedFuture(Operator operator)
{
ListenableFuture<?> blocked = revokingOperators.get(operator);
if (blocked != null) {
// We mark operator as blocked regardless of blocked.isDone(), because finishMemoryRevoke has not been called yet.
return Optional.of(blocked);
}
blocked = operator.isBlocked();
if (!blocked.isDone()) {
return Optional.of(blocked);
}
blocked = operator.getOperatorContext().isWaitingForMemory();
if (!blocked.isDone()) {
return Optional.of(blocked);
}
blocked = operator.getOperatorContext().isWaitingForRevocableMemory();
if (!blocked.isDone()) {
return Optional.of(blocked);
}
return Optional.empty();
}
2.6.5Operator执行
Operator接口的getOutput()方法和addInput()方法是Operator处理Page的核心,这里以LimitOperator为示例
@Override
public void addInput(Page page)
{
checkState(needsInput());
if (page.getPositionCount() <= remainingLimit) {
remainingLimit -= page.getPositionCount();
nextPage = page;
}
else {
Block[] blocks = new Block[page.getChannelCount()];
for (int channel = 0; channel < page.getChannelCount(); channel++) {
Block block = page.getBlock(channel);
blocks[channel] = block.getRegion(0, (int) remainingLimit);
}
nextPage = new Page((int) remainingLimit, blocks);
remainingLimit = 0;
}
}
@Override
public Page getOutput()
{
Page page = nextPage;
nextPage = null;
return page;
}
3.技术性改造
3.1支持Hive View
3.2自定义的Connector
3.3隐式转化
3.4支持UDF
3.5性能调优
未写完善待续…
如有错误请及时指出,共同进步~
每天晚上更新~
如需转载请附上本文链接,原创不易谢谢~