Presto技术源码解析总结-一个SQL的奇幻之旅 下

Presto技术总结 因为内容过长分为了上下两集

2.4.5stage调度器开始调度

stage调度器主要包括以下三种

(1)Source task

  • SourcePartitionedScheduler

(2)Fixed task

  • FixedCountScheduler
  • FixedSourcePartitionedScheduler

分配策略主要包括下面两种
(1) DynamicSplitPlacementPolicy

(2) FixedSplitPlacementPolicy

在query调度器中,调用stage的调度器

调用代码为stageSchedulers.get(stage.getStageId()).schedule();

第一种为:SourcePartitionedScheduler

public synchronized ScheduleResult schedule()
{
    int overallSplitAssignmentCount = 0;
    ImmutableSet.Builder<RemoteTask> overallNewTasks = ImmutableSet.builder();
    List<ListenableFuture<?>> overallBlockedFutures = new ArrayList<>();
    boolean anyBlockedOnPlacements = false;
    boolean anyBlockedOnNextSplitBatch = false;
    boolean anyNotBlocked = false;

    for (Entry<Lifespan, ScheduleGroup> entry : scheduleGroups.entrySet()) {
        Lifespan lifespan = entry.getKey();
        ScheduleGroup scheduleGroup = entry.getValue();
        Set<Split> pendingSplits = scheduleGroup.pendingSplits;

        if (scheduleGroup.state != ScheduleGroupState.DISCOVERING_SPLITS) {
            verify(scheduleGroup.nextSplitBatchFuture == null);
        }
        else if (pendingSplits.isEmpty()) {
            // try to get the next batch 如果没有等待中的split,则开始获取下一批的split
            if (scheduleGroup.nextSplitBatchFuture == null) {
                scheduleGroup.nextSplitBatchFuture = splitSource.getNextBatch(scheduleGroup.partitionHandle, lifespan, splitBatchSize - pendingSplits.size());

                long start = System.nanoTime();
                Futures.addCallback(scheduleGroup.nextSplitBatchFuture, new FutureCallback<SplitBatch>()
                {
                    @Override
                    public void onSuccess(SplitBatch result)
                    {
                        stage.recordGetSplitTime(start);
                    }

                    @Override
                    public void onFailure(Throwable t)
                    {
                    }
                });
            }

            if (scheduleGroup.nextSplitBatchFuture.isDone()) {
                SplitBatch nextSplits = getFutureValue(scheduleGroup.nextSplitBatchFuture);
                scheduleGroup.nextSplitBatchFuture = null;
                pendingSplits.addAll(nextSplits.getSplits());
                if (nextSplits.isLastBatch() && scheduleGroup.state == ScheduleGroupState.DISCOVERING_SPLITS) {
                   //如果是最后一个batch,调度组的状态是正在发现split的话,则将调度组的状态更新为没有更多的splits
                    scheduleGroup.state = ScheduleGroupState.NO_MORE_SPLITS;
                }
            }
            else {
                overallBlockedFutures.add(scheduleGroup.nextSplitBatchFuture);
                anyBlockedOnNextSplitBatch = true;
                continue;
            }
        }

        Multimap<Node, Split> splitAssignment = ImmutableMultimap.of();
        if (!pendingSplits.isEmpty()) {
            if (!scheduleGroup.placementFuture.isDone()) {
                continue;
            }

            if (state == State.INITIALIZED) {
                state = State.SPLITS_ADDED;
            }

            // 计算分片分配的位置,根据前面生成的策略,这一步其实是将split分配到不同的node上去
            SplitPlacementResult splitPlacementResult = splitPlacementPolicy.computeAssignments(pendingSplits);
            splitAssignment = splitPlacementResult.getAssignments();

            // remove splits with successful placements
            splitAssignment.values().forEach(pendingSplits::remove); // AbstractSet.removeAll performs terribly here.
            overallSplitAssignmentCount += splitAssignment.size();

            // if not completed placed, mark scheduleGroup as blocked on placement
            if (!pendingSplits.isEmpty()) {
                scheduleGroup.placementFuture = splitPlacementResult.getBlocked();
                overallBlockedFutures.add(scheduleGroup.placementFuture);
                anyBlockedOnPlacements = true;
            }
        }

        // if no new splits will be assigned, update state and attach completion event
        Multimap<Node, Lifespan> noMoreSplitsNotification = ImmutableMultimap.of();
        if (pendingSplits.isEmpty() && scheduleGroup.state == ScheduleGroupState.NO_MORE_SPLITS) {
            scheduleGroup.state = ScheduleGroupState.DONE;
            if (!lifespan.isTaskWide()) {
                Node node = ((FixedSplitPlacementPolicy) splitPlacementPolicy).getNodeForBucket(lifespan.getId());
                noMoreSplitsNotification = ImmutableMultimap.of(node, lifespan);
            }
        }

        //将split分配到不同的node上执行,输入node和split放回一个RemoteTask,然后执行task
        overallNewTasks.addAll(assignSplits(splitAssignment, noMoreSplitsNotification));

        // Assert that "placement future is not done" implies "pendingSplits is not empty".
        // The other way around is not true. One obvious reason is (un)lucky timing, where the placement is unblocked between `computeAssignments` and this line.
        // However, there are other reasons that could lead to this.
        // Note that `computeAssignments` is quite broken:
        // 1. It always returns a completed future when there are no tasks, regardless of whether all nodes are blocked.
        // 2. The returned future will only be completed when a node with an assigned task becomes unblocked. Other nodes don't trigger future completion.
        // As a result, to avoid busy loops caused by 1, we check pendingSplits.isEmpty() instead of placementFuture.isDone() here.
        if (scheduleGroup.nextSplitBatchFuture == null && scheduleGroup.pendingSplits.isEmpty() && scheduleGroup.state != ScheduleGroupState.DONE) {
            anyNotBlocked = true;
        }
    }

    if (autoDropCompletedLifespans) {
        drainCompletedLifespans();
    }

    // * `splitSource.isFinished` invocation may fail after `splitSource.close` has been invoked.
    //   If state is NO_MORE_SPLITS/FINISHED, splitSource.isFinished has previously returned true, and splitSource is closed now.
    // * Even if `splitSource.isFinished()` return true, it is not necessarily safe to tear down the split source.
    //   * If anyBlockedOnNextSplitBatch is true, it means we have not checked out the recently completed nextSplitBatch futures,
    //     which may contain recently published splits. We must not ignore those.
    //   * If any scheduleGroup is still in DISCOVERING_SPLITS state, it means it hasn't realized that there will be no more splits.
    //     Next time it invokes getNextBatch, it will realize that. However, the invocation will fail we tear down splitSource now.
    if ((state == State.NO_MORE_SPLITS || state == State.FINISHED) || (scheduleGroups.isEmpty() && splitSource.isFinished())) {
        switch (state) {
            case INITIALIZED:
                // we have not scheduled a single split so far
                state = State.SPLITS_ADDED;
                ScheduleResult emptySplitScheduleResult = scheduleEmptySplit();
                overallNewTasks.addAll(emptySplitScheduleResult.getNewTasks());
                overallSplitAssignmentCount++;
                // fall through
            case SPLITS_ADDED:
                state = State.NO_MORE_SPLITS;
                splitSource.close();
                // fall through
            case NO_MORE_SPLITS:
                if (!scheduleGroups.isEmpty()) {
                    // we are blocked on split assignment
                    break;
                }
                state = State.FINISHED;
                whenFinishedOrNewLifespanAdded.set(null);
                // fall through
            case FINISHED:
                return new ScheduleResult(
                        true,
                        overallNewTasks.build(),
                        overallSplitAssignmentCount);
            default:
                throw new IllegalStateException("Unknown state");
        }
    }

    if (anyNotBlocked) {
        return new ScheduleResult(false, overallNewTasks.build(), overallSplitAssignmentCount);
    }

    // Only try to finalize task creation when scheduling would block
    overallNewTasks.addAll(finalizeTaskCreationIfNecessary());

    ScheduleResult.BlockedReason blockedReason;
    if (anyBlockedOnNextSplitBatch) {
        blockedReason = anyBlockedOnPlacements ? MIXED_SPLIT_QUEUES_FULL_AND_WAITING_FOR_SOURCE : WAITING_FOR_SOURCE;
    }
    else {
        blockedReason = anyBlockedOnPlacements ? SPLIT_QUEUES_FULL : NO_ACTIVE_DRIVER_GROUP;
    }

    overallBlockedFutures.add(whenFinishedOrNewLifespanAdded);
    return new ScheduleResult(
            false,
            overallNewTasks.build(),
            nonCancellationPropagating(whenAnyComplete(overallBlockedFutures)),
            blockedReason,
            overallSplitAssignmentCount);
}

splitPlacementPolicy.computeAssignments()方法

DynamicSplitPlacementPolicy类  动态分配逻辑的实现

@Override
public SplitPlacementResult computeAssignments(Set<Split> splits)
{
    //调用了nodeSelector的计算分配的方法
    return nodeSelector.computeAssignments(splits, remoteTasks.get());
}

//前面提到过nodeSelector接口的实现类基本是都是TopologyAwareNodeSelector,下面是TopologyAwareNodeSelector分配split的实现逻辑
@Override
    public SplitPlacementResult computeAssignments(Set<Split> splits, List<RemoteTask> existingTasks)
    {
        //拿出selector里面的nodeMap
        NodeMap nodeMap = this.nodeMap.get().get();
        //创建分配Map
        Multimap<Node, Split> assignment = HashMultimap.create();
        NodeAssignmentStats assignmentStats = new NodeAssignmentStats(nodeTaskMap, nodeMap, existingTasks);

        int[] topologicCounters = new int[topologicalSplitCounters.size()];
        Set<NetworkLocation> filledLocations = new HashSet<>();
        Set<Node> blockedExactNodes = new HashSet<>();
        boolean splitWaitingForAnyNode = false;
        for (Split split : splits) {
            //先判断这个split能不能远程获取,如果不能的话,则取出split对应的网络地址,看能否找到对应的node,包括Coordinator,然后放到候选节点里面,如果找不到对应的节点,则抛出异常
            if (!split.isRemotelyAccessible()) {
                List<Node> candidateNodes = selectExactNodes(nodeMap, split.getAddresses(), includeCoordinator);
                if (candidateNodes.isEmpty()) {
                    log.debug("No nodes available to schedule %s. Available nodes %s", split, nodeMap.getNodesByHost().keys());
                    throw new PrestoException(NO_NODES_AVAILABLE, "No nodes available to run query");
                }
                //这个里面涉及到一个选择策略,主要是最小候选节点数,和最大task分配的split进行撮合对比
                Node chosenNode = bestNodeSplitCount(candidateNodes.iterator(), minCandidates, maxPendingSplitsPerTask, assignmentStats);
                //如果可以选出来
                if (chosenNode != null) {
                    //放入选择的node和对应的split
                    assignment.put(chosenNode, split);
                    assignmentStats.addAssignedSplit(chosenNode);
                }
                // Exact node set won't matter, if a split is waiting for any node
                else if (!splitWaitingForAnyNode) {
                    //如果根据策略找不到,则先把所有的候选节点放到这个set中
                    blockedExactNodes.addAll(candidateNodes);
                }
                continue;
            }
            //如果不存在远程获取的问题,则根据下面的
            Node chosenNode = null;
            //生成网络层数,后面需要递归
            int depth = networkLocationSegmentNames.size();
            int chosenDepth = 0;
            Set<NetworkLocation> locations = new HashSet<>();
            //把这个split对应网络地址遍历放在NetworkLocation集合中
            for (HostAddress host : split.getAddresses()) {
                locations.add(networkLocationCache.get(host));
            }
            //如果缓存里面获取不到地址,则放入root地址,并将网络层数置为0
            if (locations.isEmpty()) {
                // Add the root location
                locations.add(ROOT_LOCATION);
                depth = 0;
            }
            // Try each address at progressively shallower network locations
            for (int i = depth; i >= 0 && chosenNode == null; i--) {
                for (NetworkLocation location : locations) {
                    // Skip locations which are only shallower than this level
                    // For example, locations which couldn't be located will be at the "root" location
                    if (location.getSegments().size() < i) {
                        continue;
                    }
                    location = location.subLocation(0, i);
                    if (filledLocations.contains(location)) {
                        continue;
                    }
                    Set<Node> nodes = nodeMap.getWorkersByNetworkPath().get(location);
                    chosenNode = bestNodeSplitCount(new ResettableRandomizedIterator<>(nodes), minCandidates, calculateMaxPendingSplits(i, depth), assignmentStats);
                    if (chosenNode != null) {
                        chosenDepth = i;
                        break;
                    }
                    filledLocations.add(location);
                }
            }
            if (chosenNode != null) {
                //放入选择的node和对应的split
                assignment.put(chosenNode, split);
                assignmentStats.addAssignedSplit(chosenNode);
                topologicCounters[chosenDepth]++;
            }
            else {
                splitWaitingForAnyNode = true;
            }
        }
        for (int i = 0; i < topologicCounters.length; i++) {
            if (topologicCounters[i] > 0) {
                topologicalSplitCounters.get(i).update(topologicCounters[i]);
            }
        }

        ListenableFuture<?> blocked;
        int maxPendingForWildcardNetworkAffinity = calculateMaxPendingSplits(0, networkLocationSegmentNames.size());
        if (splitWaitingForAnyNode) {
            blocked = toWhenHasSplitQueueSpaceFuture(existingTasks, calculateLowWatermark(maxPendingForWildcardNetworkAffinity));
        }
        else {
            blocked = toWhenHasSplitQueueSpaceFuture(blockedExactNodes, existingTasks, calculateLowWatermark(maxPendingForWildcardNetworkAffinity));
        }
        return new SplitPlacementResult(blocked, assignment);
    }

SourcePartitionedScheduler类的assignSplits方法

//对传入的参数splitAssignment进行遍历,对每个entry都执行如下操作
//根据node获取该node上的task,若task为空,则新建一个task,否则将该node上的split提交给运行在该node上的task进行处理
private Set<RemoteTask> assignSplits(Multimap<Node, Split> splitAssignment, Multimap<Node, Lifespan> noMoreSplitsNotification)
{
    ImmutableSet.Builder<RemoteTask> newTasks = ImmutableSet.builder();

    ImmutableSet<Node> nodes = ImmutableSet.<Node>builder()
            .addAll(splitAssignment.keySet())
            .addAll(noMoreSplitsNotification.keySet())
            .build();
    for (Node node : nodes) {
        // source partitioned tasks can only receive broadcast data; otherwise it would have a different distribution
        ImmutableMultimap<PlanNodeId, Split> splits = ImmutableMultimap.<PlanNodeId, Split>builder()
                .putAll(partitionedNode, splitAssignment.get(node))
                .build();
        ImmutableMultimap.Builder<PlanNodeId, Lifespan> noMoreSplits = ImmutableMultimap.builder();
        if (noMoreSplitsNotification.containsKey(node)) {
            noMoreSplits.putAll(partitionedNode, noMoreSplitsNotification.get(node));
        }
        newTasks.addAll(stage.scheduleSplits(
                node,
                splits,
                noMoreSplits.build()));
    }
    return newTasks.build();
}

SqlStageExecution类的scheduleSplits方法

public synchronized Set<RemoteTask> scheduleSplits(Node node, Multimap<PlanNodeId, Split> splits, Multimap<PlanNodeId, Lifespan> noMoreSplitsNotification)
{
    requireNonNull(node, "node is null");
    requireNonNull(splits, "splits is null");

    splitsScheduled.set(true);

    checkArgument(stateMachine.getFragment().getPartitionedSources().containsAll(splits.keySet()), "Invalid splits");

    ImmutableSet.Builder<RemoteTask> newTasks = ImmutableSet.builder();
    Collection<RemoteTask> tasks = this.tasks.get(node);
    RemoteTask task;
    if (tasks == null) {
        // The output buffer depends on the task id starting from 0 and being sequential, since each
        // task is assigned a private buffer based on task id.
        TaskId taskId = new TaskId(stateMachine.getStageId(), nextTaskId.getAndIncrement());
        task = scheduleTask(node, taskId, splits);
        newTasks.add(task);
    }
    else {
        task = tasks.iterator().next();
        task.addSplits(splits);
    }
    if (noMoreSplitsNotification.size() > 1) {
        // The assumption that `noMoreSplitsNotification.size() <= 1` currently holds.
        // If this assumption no longer holds, we should consider calling task.noMoreSplits with multiple entries in one shot.
        // These kind of methods can be expensive since they are grabbing locks and/or sending HTTP requests on change.
        throw new UnsupportedOperationException("This assumption no longer holds: noMoreSplitsNotification.size() < 1");
    }
    for (Entry<PlanNodeId, Lifespan> entry : noMoreSplitsNotification.entries()) {
        task.noMoreSplits(entry.getKey(), entry.getValue());
    }
    return newTasks.build();

第二种为FixedSourcePartitionedScheduler

public ScheduleResult schedule()
{
    // schedule a task on every node in the distribution
    List<RemoteTask> newTasks = ImmutableList.of();
    if (!scheduledTasks) {
        newTasks = partitioning.getPartitionToNode().entrySet().stream()
                .map(entry -> stage.scheduleTask(entry.getValue(), entry.getKey()))
                .collect(toImmutableList());
        scheduledTasks = true;
    }

    boolean allBlocked = true;
    List<ListenableFuture<?>> blocked = new ArrayList<>();
    BlockedReason blockedReason = BlockedReason.NO_ACTIVE_DRIVER_GROUP;
    int splitsScheduled = 0;

    Iterator<SourcePartitionedScheduler> schedulerIterator = sourcePartitionedSchedulers.iterator();
    List<Lifespan> driverGroupsToStart = ImmutableList.of();
    while (schedulerIterator.hasNext()) {
        SourcePartitionedScheduler sourcePartitionedScheduler = schedulerIterator.next();

        for (Lifespan lifespan : driverGroupsToStart) {
            sourcePartitionedScheduler.startLifespan(lifespan, partitionHandleFor(lifespan));
        }

        ScheduleResult schedule = sourcePartitionedScheduler.schedule();
        splitsScheduled += schedule.getSplitsScheduled();
        if (schedule.getBlockedReason().isPresent()) {
            blocked.add(schedule.getBlocked());
            blockedReason = blockedReason.combineWith(schedule.getBlockedReason().get());
        }
        else {
            verify(schedule.getBlocked().isDone(), "blockedReason not provided when scheduler is blocked");
            allBlocked = false;
        }

        driverGroupsToStart = sourcePartitionedScheduler.drainCompletedLifespans();

        if (schedule.isFinished()) {
            schedulerIterator.remove();
            sourcePartitionedScheduler.close();
        }
    }

    if (allBlocked) {
        return new ScheduleResult(sourcePartitionedSchedulers.isEmpty(), newTasks, whenAnyComplete(blocked), blockedReason, splitsScheduled);
    }
    else {
        return new ScheduleResult(sourcePartitionedSchedulers.isEmpty(), newTasks, splitsScheduled);
    }
}

第三种为FixedCountScheduler

public ScheduleResult schedule()
{
    List<RemoteTask> newTasks = partitionToNode.entrySet().stream()
            .map(entry -> taskScheduler.apply(entry.getValue(), entry.getKey()))
            .collect(toImmutableList());

    return new ScheduleResult(true, newTasks, 0);
}

2.5生成RemoteTask任务

根据Presto的架构,stage调度会产生task任务下发到worker上执行

SqlStageExecution类的scheduleTask方法

private synchronized RemoteTask scheduleTask(Node node, TaskId taskId, Multimap<PlanNodeId, Split> sourceSplits)
    {
        ImmutableMultimap.Builder<PlanNodeId, Split> initialSplits = ImmutableMultimap.builder();
        //搜集所有的sourceSplits,在类型为source的stage中,该方法传入参数sourceSplits是值的,而在fixed和single的stage中,该方法的传入参数sourceSplits是没有值的
        initialSplits.putAll(sourceSplits);
        
        sourceTasks.forEach((planNodeId, task) -> {
            TaskStatus status = task.getTaskStatus();
            if (status.getState() != TaskState.FINISHED) {
                initialSplits.put(planNodeId, createRemoteSplitFor(taskId, status.getSelf()));
            }
        });
        OutputBuffers outputBuffers = this.outputBuffers.get();
        checkState(outputBuffers != null, "Initial output buffers must be set before a task can be scheduled");

        //创建远程的task任务
        RemoteTask task = remoteTaskFactory.createRemoteTask(
                stateMachine.getSession(),
                taskId,
                node,
                stateMachine.getFragment(),
                initialSplits.build(),
                outputBuffers,
                nodeTaskMap.createPartitionedSplitCountTracker(node, taskId),
                summarizeTaskInfo);

        completeSources.forEach(task::noMoreSplits);
        allTasks.add(taskId);
        tasks.computeIfAbsent(node, key -> newConcurrentHashSet()).add(task);
        nodeTaskMap.addTask(node, task);
        task.addStateChangeListener(new StageTaskListener());
        if (!stateMachine.getState().isDone()) {
            task.start();
        }
        else {
            task.abort();
        }
        return task;
    }

RemoteTask接口对应实现类HttpRemoteTask

   public HttpRemoteTask(Session session,
            TaskId taskId,
            String nodeId,
            URI location,
            PlanFragment planFragment,
            Multimap<PlanNodeId, Split> initialSplits,
            OutputBuffers outputBuffers,
            HttpClient httpClient,
            Executor executor,
            ScheduledExecutorService updateScheduledExecutor,
            ScheduledExecutorService errorScheduledExecutor,
            Duration minErrorDuration,
            Duration maxErrorDuration,
            Duration taskStatusRefreshMaxWait,
            Duration taskInfoUpdateInterval,
            boolean summarizeTaskInfo,
            JsonCodec<TaskStatus> taskStatusCodec,
            JsonCodec<TaskInfo> taskInfoCodec,
            JsonCodec<TaskUpdateRequest> taskUpdateRequestCodec,
            PartitionedSplitCountTracker partitionedSplitCountTracker,
            RemoteTaskStats stats)
    {
        requireNonNull(session, "session is null");
        requireNonNull(taskId, "taskId is null");
        requireNonNull(nodeId, "nodeId is null");
        requireNonNull(location, "location is null");
        requireNonNull(planFragment, "planFragment is null");
        requireNonNull(outputBuffers, "outputBuffers is null");
        requireNonNull(httpClient, "httpClient is null");
        requireNonNull(executor, "executor is null");
        requireNonNull(taskStatusCodec, "taskStatusCodec is null");
        requireNonNull(taskInfoCodec, "taskInfoCodec is null");
        requireNonNull(taskUpdateRequestCodec, "taskUpdateRequestCodec is null");
        requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");
        requireNonNull(stats, "stats is null");

        try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
            this.taskId = taskId;
            this.session = session;
            this.nodeId = nodeId;
            this.planFragment = planFragment;
            this.outputBuffers.set(outputBuffers);
            this.httpClient = httpClient;
            this.executor = executor;
            this.errorScheduledExecutor = errorScheduledExecutor;
            this.summarizeTaskInfo = summarizeTaskInfo;
            this.taskInfoCodec = taskInfoCodec;
            this.taskUpdateRequestCodec = taskUpdateRequestCodec;
            this.updateErrorTracker = new RequestErrorTracker(taskId, location, minErrorDuration, maxErrorDuration, errorScheduledExecutor, "updating task");
            this.partitionedSplitCountTracker = requireNonNull(partitionedSplitCountTracker, "partitionedSplitCountTracker is null");
            this.stats = stats;

            for (Entry<PlanNodeId, Split> entry : requireNonNull(initialSplits, "initialSplits is null").entries()) {
                ScheduledSplit scheduledSplit = new ScheduledSplit(nextSplitId.getAndIncrement(), entry.getKey(), entry.getValue());
                pendingSplits.put(entry.getKey(), scheduledSplit);
            }
            pendingSourceSplitCount = planFragment.getPartitionedSources().stream()
                    .filter(initialSplits::containsKey)
                    .mapToInt(partitionedSource -> initialSplits.get(partitionedSource).size())
                    .sum();

            List<BufferInfo> bufferStates = outputBuffers.getBuffers()
                    .keySet().stream()
                    .map(outputId -> new BufferInfo(outputId, false, 0, 0, PageBufferInfo.empty()))
                    .collect(toImmutableList());

            TaskInfo initialTask = createInitialTask(taskId, location, nodeId, bufferStates, new TaskStats(DateTime.now(), null));

            this.taskStatusFetcher = new ContinuousTaskStatusFetcher(
                    this::failTask,
                    initialTask.getTaskStatus(),
                    taskStatusRefreshMaxWait,
                    taskStatusCodec,
                    executor,
                    httpClient,
                    minErrorDuration,
                    maxErrorDuration,
                    errorScheduledExecutor,
                    stats);

            this.taskInfoFetcher = new TaskInfoFetcher(
                    this::failTask,
                    initialTask,
                    httpClient,
                    taskInfoUpdateInterval,
                    taskInfoCodec,
                    minErrorDuration,
                    maxErrorDuration,
                    summarizeTaskInfo,
                    executor,
                    updateScheduledExecutor,
                    errorScheduledExecutor,
                    stats);

            taskStatusFetcher.addStateChangeListener(newStatus -> {
                TaskState state = newStatus.getState();
                if (state.isDone()) {
                    cleanUpTask();
                }
                else {
                    partitionedSplitCountTracker.setPartitionedSplitCount(getPartitionedSplitCount());
                    updateSplitQueueSpace();
                }
            });

            long timeout = minErrorDuration.toMillis() / MIN_RETRIES;
            this.requestTimeout = new Duration(timeout + taskStatusRefreshMaxWait.toMillis(), MILLISECONDS);
            partitionedSplitCountTracker.setPartitionedSplitCount(getPartitionedSplitCount());
            updateSplitQueueSpace();
        }
    }

Task的start方法,开始轮询对应的task状态

public void start()
{
    try (SetThreadName ignored = new SetThreadName("HttpRemoteTask-%s", taskId)) {
        // to start we just need to trigger an update
        scheduleUpdate();

        taskStatusFetcher.start();
        taskInfoFetcher.start();
    }
}

exchange用于从上游stage中获取数据,而outputBuffer则将当前stage的数据输出给下游stage

2.6Task执行

2.6.1Worker接收Task任务

前面创建RemoteTask后,通过http rest请求将task任务下放到对应的worker上去

@Path("/v1/task")
@POST
    @Path("{taskId}")
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    public Response createOrUpdateTask(@PathParam("taskId") TaskId taskId, TaskUpdateRequest taskUpdateRequest, @Context UriInfo uriInfo)
    {
        requireNonNull(taskUpdateRequest, "taskUpdateRequest is null");

        Session session = taskUpdateRequest.getSession().toSession(sessionPropertyManager);
        TaskInfo taskInfo = taskManager.updateTask(session,
                taskId,
                taskUpdateRequest.getFragment(),
                taskUpdateRequest.getSources(),
                taskUpdateRequest.getOutputIds());

        if (shouldSummarize(uriInfo)) {
            taskInfo = taskInfo.summarize();
        }

        return Response.ok().entity(taskInfo).build();
    }

SqlTaskManager类的updateTask方法

@Override
public TaskInfo updateTask(Session session, TaskId taskId, Optional<PlanFragment> fragment, List<TaskSource> sources, OutputBuffers outputBuffers)
{
    requireNonNull(session, "session is null");
    requireNonNull(taskId, "taskId is null");
    requireNonNull(fragment, "fragment is null");
    requireNonNull(sources, "sources is null");
    requireNonNull(outputBuffers, "outputBuffers is null");

    if (resourceOvercommit(session)) {
        // TODO: This should have been done when the QueryContext was created. However, the session isn't available at that point.
        queryContexts.getUnchecked(taskId.getQueryId()).setResourceOvercommit();
    }

    SqlTask sqlTask = tasks.getUnchecked(taskId);
    sqlTask.recordHeartbeat();
    return sqlTask.updateTask(session, fragment, sources, outputBuffers);
}
public TaskInfo updateTask(Session session, Optional<PlanFragment> fragment, List<TaskSource> sources, OutputBuffers outputBuffers)
{
    try {
        // The LazyOutput buffer does not support write methods, so the actual
        // output buffer must be established before drivers are created (e.g.
        // a VALUES query).
        outputBuffer.setOutputBuffers(outputBuffers);

        // assure the task execution is only created once
        SqlTaskExecution taskExecution;
        synchronized (this) {
            // is task already complete?
            TaskHolder taskHolder = taskHolderReference.get();
            if (taskHolder.isFinished()) {
                return taskHolder.getFinalTaskInfo();
            }
            taskExecution = taskHolder.getTaskExecution();
            if (taskExecution == null) {
                checkState(fragment.isPresent(), "fragment must be present");
                //首次的话会新建一个task执行器
                taskExecution = sqlTaskExecutionFactory.create(session, queryContext, taskStateMachine, outputBuffer, fragment.get(), sources);
                taskHolderReference.compareAndSet(taskHolder, new TaskHolder(taskExecution));
                needsPlan.set(false);
            }
        }

        if (taskExecution != null) {
            taskExecution.addSources(sources);
        }
    }
    catch (Error e) {
        failed(e);
        throw e;
    }
    catch (RuntimeException e) {
        failed(e);
    }

    return getTaskInfo();
}
public SqlTaskExecution create(Session session, QueryContext queryContext, TaskStateMachine taskStateMachine, OutputBuffer outputBuffer, PlanFragment fragment, List<TaskSource> sources)
{
    boolean verboseStats = getVerboseStats(session);
    TaskContext taskContext = queryContext.addTaskContext(
            taskStateMachine,
            session,
            verboseStats,
            cpuTimerEnabled);

    LocalExecutionPlan localExecutionPlan;
    try (SetThreadName ignored = new SetThreadName("Task-%s", taskStateMachine.getTaskId())) {
        try {
            localExecutionPlan = planner.plan(
                    taskContext,
                    fragment.getRoot(),
                    fragment.getSymbols(),
                    fragment.getPartitioningScheme(),
                    fragment.getPipelineExecutionStrategy() == GROUPED_EXECUTION,
                    fragment.getPartitionedSources(),
                    outputBuffer);

            for (DriverFactory driverFactory : localExecutionPlan.getDriverFactories()) {
                Optional<PlanNodeId> sourceId = driverFactory.getSourceId();
                if (sourceId.isPresent() && fragment.isPartitionedSources(sourceId.get())) {
                    checkArgument(fragment.getPipelineExecutionStrategy() == driverFactory.getPipelineExecutionStrategy(),
                            "Partitioned pipelines are expected to have the same execution strategy as the fragment");
                }
                else {
                    checkArgument(fragment.getPipelineExecutionStrategy() != UNGROUPED_EXECUTION || driverFactory.getPipelineExecutionStrategy() == UNGROUPED_EXECUTION,
                            "When fragment execution strategy is ungrouped, all pipelines should have ungrouped execution strategy");
                }
            }
        }
        catch (Throwable e) {
            // planning failed
            taskStateMachine.failed(e);
            throwIfUnchecked(e);
            throw new RuntimeException(e);
        }
    }
    return createSqlTaskExecution(
            taskStateMachine,
            taskContext,
            outputBuffer,
            sources,
            localExecutionPlan,
            taskExecutor,
            taskNotificationExecutor,
            queryMonitor);
}

SqlTaskExecution类的createSqlTaskExecution方法

static SqlTaskExecution createSqlTaskExecution(
        TaskStateMachine taskStateMachine,
        TaskContext taskContext,
        OutputBuffer outputBuffer,
        List<TaskSource> sources,
        LocalExecutionPlan localExecutionPlan,
        TaskExecutor taskExecutor,
        Executor notificationExecutor,
        QueryMonitor queryMonitor)
{
    SqlTaskExecution task = new SqlTaskExecution(
            taskStateMachine,
            taskContext,
            outputBuffer,
            localExecutionPlan,
            taskExecutor,
            queryMonitor,
            notificationExecutor);
    try (SetThreadName ignored = new SetThreadName("Task-%s", task.getTaskId())) {
        // The scheduleDriversForTaskLifeCycle method calls enqueueDriverSplitRunner, which registers a callback with access to this object.
        // The call back is accessed from another thread, so this code can not be placed in the constructor.
        //tasks是一个全局缓存,根据taskId获取已经缓存的sqlTask,若没有则新建一个
        SqlTask sqlTask = tasks.getUnchecked(taskId);
        sqlTask.recordHeartbeat();
        //执行sqlTask,并返回执行信息
        return sqlTask.updateTask(session, fragment, sources, outputBuffers);
    }
}

2.6.2Worker启动执行

Worker启动的时候,调用TaskExecutor类的start方法,其主要作用就是处理在Worker上运行的所有Task中的Split

@PostConstruct
public synchronized void start()
{
    //runnerThreads 的值通过配置参数:task.max-worker-threads进行配置的,默认值为当前cpu核数*4
    checkState(!closed, "TaskExecutor is closed");
    for (int i = 0; i < runnerThreads; i++) {
        addRunnerThread();
    }
    splitMonitorExecutor.scheduleWithFixedDelay(this::monitorActiveSplits, 1, 1, TimeUnit.MINUTES);
}

TaskExecutor类addRunnerThread方法

private synchronized void addRunnerThread()
{
    try {
        //Runner是本类TaskExecutor的内部类
        executor.execute(new TaskRunner());
    }
    catch (RejectedExecutionException ignored) {
    }
}

TaskRunner类

private class TaskRunner
        implements Runnable
{
    private final long runnerId = NEXT_RUNNER_ID.getAndIncrement();

    @Override
    public void run()
    {
        try (SetThreadName runnerName = new SetThreadName("SplitRunner-%s", runnerId)) {
            while (!closed && !Thread.currentThread().isInterrupted()) {
                // select next worker
                //获取下一个需要处理的PrioritizedSplitRunner对象,PrioritizedSplitRunner是对作用于一个Split所有操作的包装,封装了作用于一个Split上的一系列的Operator
                //优先级SplitRunner
                final PrioritizedSplitRunner split;
                try {
                    //从等待队列中取出一个split
                    split = waitingSplits.take();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }

                String threadId = split.getTaskHandle().getTaskId() + "-" + split.getSplitId();
                try (SetThreadName splitName = new SetThreadName(threadId)) {
                    RunningSplitInfo splitInfo = new RunningSplitInfo(ticker.read(), threadId, Thread.currentThread());
                    runningSplitInfos.add(splitInfo);

                    //将取出的split加入到runningSplit队列,该队列中保存了所有正在处理的split
                    runningSplits.add(split);

                    ListenableFuture<?> blocked;
                    try {
                        //调用各个Split的process()方法
                        blocked = split.process();
                    }
                    finally {
                        runningSplitInfos.remove(splitInfo);
                        //执行完毕之后,需要将Split从runningSplits中移除
                        runningSplits.remove(split);
                    }
                    //finished表示整个split是否已经处理完毕
                    if (split.isFinished()) {
                        log.debug("%s is finished", split.getInfo());
                        splitFinished(split);
                    }
                    else {
                        //blocked表示本次执行是否完毕
                        if (blocked.isDone()) {
                            //如果本次执行完毕了,split还没有被处理完毕,则继续放到等待队列中
                            waitingSplits.offer(split);
                        }
                        else {
                            //放到阻塞队列中
                            blockedSplits.put(split, blocked);
                            blocked.addListener(() -> {
                                //一旦固定时间片执行完毕,则从阻塞队列中移除
                                blockedSplits.remove(split);
                                //重新设置优先级
                                split.resetLevelPriority();
                                //重新放回到等待队列中
                                waitingSplits.offer(split);
                            }, executor);
                        }
                    }
                }
                catch (Throwable t) {
                    // ignore random errors due to driver thread interruption
                    if (!split.isDestroyed()) {
                        if (t instanceof PrestoException) {
                            PrestoException e = (PrestoException) t;
                            log.error("Error processing %s: %s: %s", split.getInfo(), e.getErrorCode().getName(), e.getMessage());
                        }
                        else {
                            log.error(t, "Error processing %s", split.getInfo());
                        }
                    }
                    splitFinished(split);
                }
            }
        }
        finally {
            //如果线程被中断,或者TaskExecutor结束
            if (!closed) {
                //如果是线程被中断,然后TaskExecutor尚未结束,则重新启动一个Runner线程
                addRunnerThread();
            }
        }
    }
}

所有对split的处理均由split.process完成,此处的split是PrioritizedSplitRunner的实例

public ListenableFuture<?> process()
        throws Exception
{
    try {
        long startNanos = ticker.read();
        start.compareAndSet(0, startNanos);
        lastReady.compareAndSet(0, startNanos);
        processCalls.incrementAndGet();

        waitNanos.getAndAdd(startNanos - lastReady.get());

        CpuTimer timer = new CpuTimer();

        //调用split的processFor(Duration duration)方法进行实际的split的处理,这里的split是SplitRunner的实例,然而SplitRunner的实例主要是DriverSplitRunner,SPLIT_RUN_QUANTA值是一个时间段,默认为一秒
        ListenableFuture<?> blocked = split.processFor(SPLIT_RUN_QUANTA);

        CpuTimer.CpuDuration elapsed = timer.elapsedTime();

        long quantaScheduledNanos = ticker.read() - startNanos;
        scheduledNanos.addAndGet(quantaScheduledNanos);

        priority.set(taskHandle.addScheduledNanos(quantaScheduledNanos));
        lastRun.set(ticker.read());

        if (blocked == NOT_BLOCKED) {
            unblockedQuantaWallTime.add(elapsed.getWall());
        }
        else {
            blockedQuantaWallTime.add(elapsed.getWall());
        }

        long quantaCpuNanos = elapsed.getCpu().roundTo(NANOSECONDS);
        cpuTimeNanos.addAndGet(quantaCpuNanos);

        globalCpuTimeMicros.update(quantaCpuNanos / 1000);
        globalScheduledTimeMicros.update(quantaScheduledNanos / 1000);

        return blocked;
    }
    catch (Throwable e) {
        finishedFuture.setException(e);
        throw e;
    }
}

2.6.3生成Driver

DriverSplitRunner类的processFor方法,DriverSplitRunner是SqlTaskExecution类的内部类

@Override
public ListenableFuture<?> processFor(Duration duration)
{
    //driver是作用于split上的一系列的operator的封装类,driver需要处理的Split存储在属性newSources中
    Driver driver;
    synchronized (this) {
        //如果在执行该方法前,DriverSplitRunner就已经结束了,那么就没有必要进行后续的操作了,直接返回一个value为null的ListenableFuture即可
        if (closed) {
            return Futures.immediateFuture(null);
        }
        //若当前的Driver为null,则需要首先根据Client指定的split创建一个driver,partitionedSplit是类DriverSplitRunner中的属性,其类型为ScheduledSplit,而ScheduledSplit是Split的封装类
        if (this.driver == null) {
            this.driver = driverSplitRunnerFactory.createDriver(driverContext, partitionedSplit);
        }
        //driver是作用于split上的一系列的operator的封装类,driver需要处理的split存储在属性newSources中
        driver = this.driver;
    }

    return driver.processFor(duration);
}

2.6.4Driver执行

Driver类的processFor方法

public ListenableFuture<?> processFor(Duration duration)
    {
        checkLockNotHeld("Can not process for a duration while holding the driver lock");

        requireNonNull(duration, "duration is null");

        // if the driver is blocked we don't need to continue
        SettableFuture<?> blockedFuture = driverBlockedFuture.get();
        if (!blockedFuture.isDone()) {
            return blockedFuture;
        }

        //最多可以运行时间
        long maxRuntime = duration.roundTo(TimeUnit.NANOSECONDS);

        //当前线程获得锁,若有其他线程持有锁,则最多等待100毫秒
        Optional<ListenableFuture<?>> result = tryWithLock(100, TimeUnit.MILLISECONDS, () -> {
            driverContext.startProcessTimer();
            driverContext.getYieldSignal().setWithDelay(maxRuntime, driverContext.getYieldExecutor());
            try {
                long start = System.nanoTime();
                do {
                    //对split的实际处理,在processInternal中
                    ListenableFuture<?> future = processInternal();
                    if (!future.isDone()) {
                        return updateDriverBlockedFuture(future);
                    }
                }
                while (System.nanoTime() - start < maxRuntime && !isFinishedInternal());
            }
            finally {
                driverContext.getYieldSignal().reset();
                driverContext.recordProcessed();
            }
            return NOT_BLOCKED;
        });
        return result.orElse(NOT_BLOCKED);
    }

Driver类的processInternal方法

@GuardedBy("exclusiveLock")
private ListenableFuture<?> processInternal()
{
    checkLockHeld("Lock must be held to call processInternal");

    handleMemoryRevoke();

    try {
        //如果有尚未处理的读取的split,将未读取的split加入到sourceOperator中
        processNewSources();

        //如果只有一个Operator则特别处理
        if (operators.size() == 1) {
            //如果当前Driver已经执行完毕,则返回NOT_BLOCKED
            if (driverContext.isDone()) {
                return NOT_BLOCKED;
            }

            //获取Operator
            Operator current = operators.get(0);
            //判断Operator是否阻塞
            Optional<ListenableFuture<?>> blocked = getBlockedFuture(current);
            if (blocked.isPresent()) {
                current.getOperatorContext().recordBlocked(blocked.get());
                return blocked.get();
            }

            //若未阻塞,则直接结束当前Operator
            // there is only one operator so just finish it
            current.getOperatorContext().startIntervalTimer();
            current.finish();
            current.getOperatorContext().recordFinish();
            return NOT_BLOCKED;
        }

        boolean movedPage = false;
        //若Operator的个数大于1,则执行下面的循环,从下面的循环可以看出,每次取出相邻的两个Operator,得到前一个Operator的输出数据,然后将该输出数据作为后一个Operator的输入数据
        for (int i = 0; i < operators.size() - 1 && !driverContext.isDone(); i++) {
            //一次取出相邻的两个Operator
            Operator current = operators.get(i);
            Operator next = operators.get(i + 1);

            // skip blocked operator
            if (getBlockedFuture(current).isPresent()) {
                continue;
            }

            //如果当前Operator没有结束,而且下一个Operator也需要输入
            if (!current.isFinished() && !getBlockedFuture(next).isPresent() && next.needsInput()) {
                //从当前Operator中获得OutputPage,然后将该page作为输入,交给下一个Operator进行操作
                current.getOperatorContext().startIntervalTimer();
                //Operator对Page操作的核心逻辑,不同的Operator对Page的操作处理不一样,下面以LimitOperator为示例
                Page page = current.getOutput();
                current.getOperatorContext().recordGetOutput(page);

                //将获得的OutputPage交给下一个Operator进行处理
                if (page != null && page.getPositionCount() != 0) {
                    next.getOperatorContext().startIntervalTimer();
                    //Operator对Page操作的核心逻辑,不同的Operator对Page的操作处理不一样
                    next.addInput(page);
                    next.getOperatorContext().recordAddInput(page);
                    //标示,表示进行了Page的移动
                    movedPage = true;
                }

                if (current instanceof SourceOperator) {
                    movedPage = true;
                }
            }

            //如果当前的Operator已经完成了,则通知下一个Operator:不会再有输入了,需要完成数据处理,并将结果进行刷新
            if (current.isFinished()) {
                // let next operator know there will be no more data
                next.getOperatorContext().startIntervalTimer();
                next.finish();
                next.getOperatorContext().recordFinish();
            }
        }

        //如果所有的Operator都已经循环完毕了,但是没有发生Page的移动,我们需要检查是否有Operator被block住了
        if (!movedPage) {
            List<Operator> blockedOperators = new ArrayList<>();
            List<ListenableFuture<?>> blockedFutures = new ArrayList<>();
            //循环所有的Operator,并获得每个Operator的ListenableFuture对象,判断:若当前Operator已经执行结束,则会返回其是否在等待额外的内存
            for (Operator operator : operators) {
                Optional<ListenableFuture<?>> blocked = getBlockedFuture(operator);
                if (blocked.isPresent()) {
                    blockedOperators.add(operator);
                    blockedFutures.add(blocked.get());
                }
            }

            //若确实有Operator被阻塞住了
            if (!blockedFutures.isEmpty()) {
                // unblock when the first future is complete
                //任意一个ListenableFuture完成,就会解除当前Driver的阻塞状态
                ListenableFuture<?> blocked = firstFinishedFuture(blockedFutures);
                // driver records serial blocked time
                //当前Driver添加monitor实时监听是否已经解除阻塞状态
                driverContext.recordBlocked(blocked);
                // each blocked operator is responsible for blocking the execution
                // until one of the operators can continue
                //为每个Operator注册监听器,实时监听是否已经解除阻塞状态
                for (Operator operator : blockedOperators) {
                    operator.getOperatorContext().recordBlocked(blocked);
                }
                return blocked;
            }
        }

        return NOT_BLOCKED;
    }
    catch (Throwable t) {
        List<StackTraceElement> interrupterStack = exclusiveLock.getInterrupterStack();
        if (interrupterStack == null) {
            driverContext.failed(t);
            throw t;
        }

        // Driver thread was interrupted which should only happen if the task is already finished.
        // If this becomes the actual cause of a failed query there is a bug in the task state machine.
        Exception exception = new Exception("Interrupted By");
        exception.setStackTrace(interrupterStack.stream().toArray(StackTraceElement[]::new));
        PrestoException newException = new PrestoException(GENERIC_INTERNAL_ERROR, "Driver was interrupted", exception);
        newException.addSuppressed(t);
        driverContext.failed(newException);
        throw newException;
    }
}

Driver类的getBlockedFuture判断指定的Operator是否阻塞

private Optional<ListenableFuture<?>> getBlockedFuture(Operator operator)
{
    ListenableFuture<?> blocked = revokingOperators.get(operator);
    if (blocked != null) {
        // We mark operator as blocked regardless of blocked.isDone(), because finishMemoryRevoke has not been called yet.
        return Optional.of(blocked);
    }
    blocked = operator.isBlocked();
    if (!blocked.isDone()) {
        return Optional.of(blocked);
    }
    blocked = operator.getOperatorContext().isWaitingForMemory();
    if (!blocked.isDone()) {
        return Optional.of(blocked);
    }
    blocked = operator.getOperatorContext().isWaitingForRevocableMemory();
    if (!blocked.isDone()) {
        return Optional.of(blocked);
    }
    return Optional.empty();
}

2.6.5Operator执行

Operator接口的getOutput()方法和addInput()方法是Operator处理Page的核心,这里以LimitOperator为示例

@Override
public void addInput(Page page)
{
    checkState(needsInput());

    if (page.getPositionCount() <= remainingLimit) {
        remainingLimit -= page.getPositionCount();
        nextPage = page;
    }
    else {
        Block[] blocks = new Block[page.getChannelCount()];
        for (int channel = 0; channel < page.getChannelCount(); channel++) {
            Block block = page.getBlock(channel);
            blocks[channel] = block.getRegion(0, (int) remainingLimit);
        }
        nextPage = new Page((int) remainingLimit, blocks);
        remainingLimit = 0;
    }
}

@Override
public Page getOutput()
{
    Page page = nextPage;
    nextPage = null;
    return page;
}

3.技术性改造

3.1支持Hive View

3.2自定义的Connector

3.3隐式转化

3.4支持UDF

3.5性能调优

未写完善待续…

如有错误请及时指出,共同进步~

每天晚上更新~

如需转载请附上本文链接,原创不易谢谢~

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352

推荐阅读更多精彩内容