Flink-1.10 源码笔记 checkpint -- 1

Flink 源码笔记 --- checkpoint

设置触发checkpoint的节点

ExecutionGraph该对象运行在 SchedulerBase中, SchedulerBase运行在JobMaster中

ExecutionGraph的enableCheckpointing方法 初始化了checkpointCoordinator(检查点协调器)对象, 改对象运行在JobManager中

checkpointCoordinator 负责分布式系统下checkpoint过程 主要职责:

  1. 定时触发checkpoint,命令数据源发送 checkpoint barrier
  2. 接受各个operator的某个checkpoint完成确认消息
  3. 对于某个checkpoint,当接受到所有的operator的确认消息之时,发送消息通知各个operator,checkpoint已完成
  4. 保存已完成和正在进行中的checkpoint的相关信息

在构建checkpointCoordinator时,传入一个变量 tasksToTrigger ,是需要触发checkpoint的节点,该变量在StreamingJobGraphGenerator的configureCheckpointing方法中创建

在创建JobMaster的时候, 对schedulerNG进行了初始化

this.schedulerNGFactory = checkNotNull(schedulerNGFactory);

this.schedulerNG = createScheduler(jobManagerJobMetricGroup);

createScheduler该方法会调用 schedulerNGFactory的createInstance,获取调度器

DefaultSchedulerFactory和LegacySchedulerFactory分别创建DefaultScheduler和LegacyScheduler实例,这两者都继承SchedulerBase,实例化时都会调用SchedulerBase的构造方法,其中会构造ExecutionGraph,然后通过startScheduling进行调度

    private SchedulerNG createScheduler(final JobManagerJobMetricGroup jobManagerJobMetricGroup) throws Exception {
        return schedulerNGFactory.createInstance(
            log,
            jobGraph,
            backPressureStatsTracker,
            scheduledExecutorService,
            jobMasterConfiguration.getConfiguration(),
            scheduler,
            scheduledExecutorService,
            userCodeLoader,
            highAvailabilityServices.getCheckpointRecoveryFactory(),
            rpcTimeout,
            blobWriter,
            jobManagerJobMetricGroup,
            jobMasterConfiguration.getSlotRequestTimeout(),
            shuffleMaster,
            partitionTracker);
    }

​ 在DefaultScheduler和 LegacyScheduler 类中, 创建时候会初始化父类 SchedulerBase

DefaultScheduler(
        final Logger log,
        final JobGraph jobGraph,
        final BackPressureStatsTracker backPressureStatsTracker,
        final Executor ioExecutor,
        final Configuration jobMasterConfiguration,
        final ScheduledExecutorService futureExecutor,
        final ScheduledExecutor delayExecutor,
        final ClassLoader userCodeLoader,
        final CheckpointRecoveryFactory checkpointRecoveryFactory,
        final Time rpcTimeout,
        final BlobWriter blobWriter,
        final JobManagerJobMetricGroup jobManagerJobMetricGroup,
        final ShuffleMaster<?> shuffleMaster,
        final JobMasterPartitionTracker partitionTracker,
        final SchedulingStrategyFactory schedulingStrategyFactory,
        final FailoverStrategy.Factory failoverStrategyFactory,
        final RestartBackoffTimeStrategy restartBackoffTimeStrategy,
        final ExecutionVertexOperations executionVertexOperations,
        final ExecutionVertexVersioner executionVertexVersioner,
        final ExecutionSlotAllocatorFactory executionSlotAllocatorFactory) throws Exception {

        super(
            log,
            jobGraph,
            backPressureStatsTracker,
            ioExecutor,
            jobMasterConfiguration,
            new ThrowingSlotProvider(), // this is not used any more in the new scheduler
            futureExecutor,
            userCodeLoader,
            checkpointRecoveryFactory,
            rpcTimeout,
            new ThrowingRestartStrategy.ThrowingRestartStrategyFactory(),
            blobWriter,
            jobManagerJobMetricGroup,
            Time.seconds(0), // this is not used any more in the new scheduler
            shuffleMaster,
            partitionTracker,
            executionVertexVersioner,
            false);
            ...

​ 继续看checkpoints过程

JobMaster触发savepoint的时候会启动checkpoint过程。现在查看一下JobMastertriggerSavepoint方法

    //触发保存点
    @Override
    public CompletableFuture<String> triggerSavepoint(
            @Nullable final String targetDirectory,
            final boolean cancelJob,
            final Time timeout) {
        //调用的 SchedulerBase实现类的方法
        return schedulerNG.triggerSavepoint(targetDirectory, cancelJob);
    }

在该方法中,调用了schedulerNG的triggerSavepoint同名方法,这里调用的是SchedulerNG接口的实现类 SchedulerBase中的方法,该类同时是一个抽象类

public abstract class SchedulerBase implements SchedulerNG

现在看一下schedulerNG.triggerSavepoint方法 ,该方法中主要获取checkpointCoordinator(检查点协调器),然后调用checkpointCoordinator的triggerSavepoint方法

    @Override
    public CompletableFuture<String> triggerSavepoint(final String targetDirectory, final boolean cancelJob) {
        //确保运行在主线程
        mainThreadExecutor.assertRunningInMainThread();

        //从executionGraph中获取CheckpointCoordinator(检查点协调器)
        final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
        if (checkpointCoordinator == null) {
            throw new IllegalStateException(
                String.format("Job %s is not a streaming job.", jobGraph.getJobID()));
        } else if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
            // 确保配置了savepoint默认存储目录,或者方法中传入了存储目录
            log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", jobGraph.getJobID());

            throw new IllegalStateException(
                "No savepoint directory configured. You can either specify a directory " +
                    "while cancelling via -s :targetDirectory or configure a cluster-wide " +
                    "default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.");
        }

        // 如果是取消作业,停止checkpoint协调器
        if (cancelJob) {
            checkpointCoordinator.stopCheckpointScheduler();
        }
        // 先触发一次savepoint操作(实际上触发的是checkpoint)
        // 接下来返回checkpoint操作保存的文件路径
        //  最后执行:
        // 1.如果需要取消作业,并且之前步骤抛出了异常,则再次启动checkpoint协调器,抛出异常
        // 2.如果需要取消作业,之前步骤没有抛出异常,取消任务执行
        return checkpointCoordinator
            .triggerSavepoint(System.currentTimeMillis(), targetDirectory)
            .thenApply(CompletedCheckpoint::getExternalPointer)
            .handleAsync((path, throwable) -> {
                if (throwable != null) {
                    if (cancelJob) {
                        startCheckpointScheduler(checkpointCoordinator);
                    }
                    throw new CompletionException(throwable);
                } else if (cancelJob) {
                    log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID());
                    cancel();
                }
                return path;
            }, mainThreadExecutor);
    }

进入checkpointCoordinator的triggerSavepoint方法中,在该方法中,首先说去checkpointProperties(检查点配置),而后调用triggerSavepointInternal方法并返回

public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
        final long timestamp,
        @Nullable final String targetLocation) {

        final CheckpointProperties properties = CheckpointProperties.forSavepoint();
        return triggerSavepointInternal(timestamp, properties, false, targetLocation);
    }

跟踪 triggerSavepointInternal方法进去,该方法中主要逻辑都在triggerCheckpoint中

private CompletableFuture<CompletedCheckpoint> triggerSavepointInternal(
        final long timestamp,
        final CheckpointProperties checkpointProperties,
        final boolean advanceToEndOfEventTime,
        @Nullable final String targetLocation) {

        checkNotNull(checkpointProperties);

        // TODO, call triggerCheckpoint directly after removing timer thread
        // for now, execute the trigger in timer thread to avoid competition
        final CompletableFuture<CompletedCheckpoint> resultFuture = new CompletableFuture<>();
        timer.execute(() -> triggerCheckpoint(
            timestamp,
            checkpointProperties,
            targetLocation,
            false,
            advanceToEndOfEventTime)
            .whenComplete((completedCheckpoint, throwable) -> {
                if (throwable == null) {
                    resultFuture.complete(completedCheckpoint);
                } else {
                    resultFuture.completeExceptionally(throwable);
                }
            }));
        return resultFuture;
    }

继续追踪triggerCheckpoint方法,

该方法中的主要逻辑 :

1.首先进行触发Checkpoint之前的预检查,判断是否满足条件;    
2.然后获取一个CheckpointID,创建PendingCheckpoint实例;    
3.之后重新检查触发条件是否满足要求,防止产生竞态条件;    
4.最后将PendingCheckpoint实例checkpoint加入到pendingCheckpoints中,并向tasks发送消息触发它们的检查点。

@VisibleForTesting
    public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
        long timestamp,                                //触发检查点时间戳
        CheckpointProperties props,                    //检查点配置
        @Nullable String externalSavepointLocation,  //外部保存点位置
        boolean isPeriodic,              //是否时周期性的
        boolean advanceToEndOfTime) { //提前到结束时间

        // Sanity check 如果检查点是存储在外部系统中且targetDirectory为空,报错
        //                          不能为同步               设置了保存点
        if (advanceToEndOfTime && !(props.isSynchronous() && props.isSavepoint())) {
            return FutureUtils.completedExceptionally(new IllegalArgumentException(
                "Only synchronous savepoints are allowed to advance the watermark to MAX."));
        }

        //CompletableFuture 该对象 是1.8新特性,表示一个任务使用
        //CompletedCheckpoint  完成的检查点  包含 检查点id 完成时间等元数据
        final CompletableFuture<CompletedCheckpoint> onCompletionPromise =
            new CompletableFuture<>();

        //make some eager pre-checks 一些checkpoint之前的预检查
        synchronized (lock) {
            //如果触发请求,  或者在队列中存在   不能触发检查点
            if (isTriggering || !triggerRequestQueue.isEmpty()) {
                // we can't trigger checkpoint directly if there is a trigger request being processed
                // or queued
                //创建一个新的检查点请求加入队列
                triggerRequestQueue.add(new CheckpointTriggerRequest(
                    timestamp,
                    props,
                    externalSavepointLocation,
                    isPeriodic,
                    advanceToEndOfTime,
                    onCompletionPromise));
                return onCompletionPromise;
            }
        }
        //触发检查点
        startTriggeringCheckpoint(
            timestamp,
            props,
            externalSavepointLocation,
            isPeriodic,
            advanceToEndOfTime,
            onCompletionPromise);
        return onCompletionPromise;
    }

进入startTriggeringCheckpoint方法

private void startTriggeringCheckpoint(
        long timestamp,
        CheckpointProperties props,
        @Nullable String externalSavepointLocation,
        boolean isPeriodic,
        boolean advanceToEndOfTime,
        CompletableFuture<CompletedCheckpoint> onCompletionPromise) {

        try {
            // make some eager pre-checks
            // 触发之前 对checkpoint 进行预检查
            synchronized (lock) {               // 是否为周期的       是否是强制执行检查点
                preCheckBeforeTriggeringCheckpoint(isPeriodic, props.forceCheckpoint());
            }
            //检查我们需要触发的所有任务是否都在运行。如果不是,则放弃检查点
            final Execution[] executions = getTriggerExecutions();
            final Map<ExecutionAttemptID, ExecutionVertex> ackTasks = getAckTasks();

            // we will actually trigger this checkpoint!
            Preconditions.checkState(!isTriggering);
            isTriggering = true;

            final CompletableFuture<PendingCheckpoint> pendingCheckpointCompletableFuture =
                initializeCheckpoint(props, externalSavepointLocation)   //初始化 checkpoints
                    //该方法 当前阶段正常完成以后执行,而且当前阶段的执行的结果会作为下一阶段的输入参数。
                    // thenApplyAsync默认是异步执行的。这里所谓的异步指的是不在当前线程内执行   todo 属于回调函数
                    .thenApplyAsync(
                        (checkpointIdAndStorageLocation) -> createPendingCheckpoint( //创建挂起的检查点
                            timestamp,
                            props,
                            ackTasks,
                            isPeriodic,
                            checkpointIdAndStorageLocation.checkpointId,
                            checkpointIdAndStorageLocation.checkpointStorageLocation,
                            onCompletionPromise),
                        timer);
            //主状态完成
            final CompletableFuture<?> masterStatesComplete = pendingCheckpointCompletableFuture
                //是结合两个任务的返回值进行转化后再返回
                .thenCompose(this::snapshotMasterState);
            //协调检查点完成
            final CompletableFuture<?> coordinatorCheckpointsComplete = pendingCheckpointCompletableFuture
                // 异步的
                .thenComposeAsync((pendingCheckpoint) ->
                    //所有与接受 操作符协调器 的检查点相关的逻辑
                        OperatorCoordinatorCheckpoints.triggerAndAcknowledgeAllCoordinatorCheckpointsWithCompletion(
                            coordinatorsToCheckpoint, pendingCheckpoint, timer),
                    timer);

            CompletableFuture.allOf(masterStatesComplete, coordinatorCheckpointsComplete)
                //上面完成 或 抛出异常 都会执行该方法
                .whenCompleteAsync(
                    (ignored, throwable) -> {
                        //该方法返回一个未来的执行的结果, 如果异常执行完或未完成 未null
                        //创建一个进行中的checkpoint
                        final PendingCheckpoint checkpoint =
                            FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);

                        if (throwable == null && checkpoint != null && !checkpoint.isDiscarded()) {
                            //不是丢弃的 , 没有意外,
                            // no exception, no discarding, everything is OK

                            //快照任务状态
                            snapshotTaskState(
                                timestamp,
                                checkpoint.getCheckpointId(),
                                checkpoint.getCheckpointStorageLocation(),
                                props,
                                executions,
                                advanceToEndOfTime);
                            //触发请求成功必须调用它
                            onTriggerSuccess();
                        } else {
                            // the initialization might not be finished yet
                            if (checkpoint == null) {
                                onTriggerFailure(onCompletionPromise, throwable);
                            } else {
                                onTriggerFailure(checkpoint, throwable);
                            }
                        }
                    },
                    timer);
        } catch (Throwable throwable) {
            onTriggerFailure(onCompletionPromise, throwable);
        }
    }

由于方法比较长,查看该方法调用的重点方法中具体实现

preCheckBeforeTriggeringCheckpoint方法,主要对触发检查点前进行预检查

    private void preCheckBeforeTriggeringCheckpoint(boolean isPeriodic, boolean forceCheckpoint) throws CheckpointException {
        //检查 检查点全局状态   -- 调度器和协调器的检查
        preCheckGlobalState(isPeriodic);

        //如果不是强制执行的检查点
        if (!forceCheckpoint) {
            //检查 是否并行的检查点数量是否查过并发
            checkConcurrentCheckpoints();
            //检查 检查点之间的最小间隔已经通过
            checkMinPauseBetweenCheckpoints();
        }
    }

getTriggerExecutions方法,查我们需要触发的所有任务是否都在运行。如果不是,则放弃检查点并抛出异常.

    private Execution[] getTriggerExecutions() throws CheckpointException {
        Execution[] executions = new Execution[tasksToTrigger.length];
        for (int i = 0; i < tasksToTrigger.length; i++) {
            Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
            if (ee == null) {
                LOG.info(
                    "Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
                    tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
                    job);
                throw new CheckpointException(
                    CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
            } else if (ee.getState() == ExecutionState.RUNNING) {
                //todo tasksToTrigger 该单位在StreamingJobGraphGenerator中初始化,
                // 该变量的含义为需要触发checkpoint的节点  
                //todo 如果任务运行状态处于 running 则将该任务添加到 执行序列中
                executions[i] = ee;
            } else {
                LOG.info(
                    "Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
                    tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
                    job,
                    ExecutionState.RUNNING,
                    ee.getState());
                throw new CheckpointException(
                    CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
            }
        }
        return executions;
    }

getAckTasks方法同上方法,检查任务是否运行,如果不是放弃检查点

    private Map<ExecutionAttemptID, ExecutionVertex> getAckTasks() throws CheckpointException {
        Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);

        for (ExecutionVertex ev : tasksToWaitFor) {
            Execution ee = ev.getCurrentExecutionAttempt();
            if (ee != null) {
                ackTasks.put(ee.getAttemptId(), ev);
            } else {
                LOG.info(
                    "Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",
                    ev.getTaskNameWithSubtaskIndex(),
                    job);
                throw new CheckpointException(
                    CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);
            }
        }
        return ackTasks;
    }

createPendingCheckpoint ,在该方法中,在创建PendingCheckpoint对象之前,进行了预检查, PendingCheckpoint对象的作用, 挂起的检查点是已经启动的检查点,但是还没有被所有需要确认它的任务确认。一旦所有任务都确认了它,它就变成了一个{@link CompletedCheckpoint}。 创建PendingCheckpoint对象后,设置跟踪此PendingCheckpoint的回调 在线程锁中,将心创建的PendingCheckpoint添加到pendingCheckpoints集合中,该集合存储着待处理的PendingCheckpoint,然后设置一个定时任务,在给定的延迟之后执行给定的命令,最后将 PendingCheckpoint对象返回

private PendingCheckpoint createPendingCheckpoint(
        long timestamp,
        CheckpointProperties props,
        Map<ExecutionAttemptID, ExecutionVertex> ackTasks,
        boolean isPeriodic,
        long checkpointID,
        CheckpointStorageLocation checkpointStorageLocation,
        CompletableFuture<CompletedCheckpoint> onCompletionPromise) {

        synchronized (lock) {
            // todo 创建 pendingCheckpoint 前 先进行检查
            try {
                // since we haven't created the PendingCheckpoint yet, we need to check the
                // global state here.
                preCheckGlobalState(isPeriodic);
            } catch (Throwable t) {
                throw new CompletionException(t);
            }
        }

        //创建 PendingCheckpoint
        final PendingCheckpoint checkpoint = new PendingCheckpoint(
            job,
            checkpointID,
            timestamp,
            ackTasks,
            OperatorCoordinatorCheckpointContext.getIds(coordinatorsToCheckpoint),
            masterHooks.keySet(),
            props,
            checkpointStorageLocation,
            executor,
            onCompletionPromise);

        if (statsTracker != null) {
            //创建一个 pending checkpoint 的跟踪器
            PendingCheckpointStats callback = statsTracker.reportPendingCheckpoint(
                checkpointID,
                timestamp,
                props);
            // 设置跟踪此pending checkpoint的回调
            checkpoint.setStatsCallback(callback);
        }

        synchronized (lock) {
            //将挂起的checkpoints 加入集合
            pendingCheckpoints.put(checkpointID, checkpoint);
            //在给定的延迟之后执行给定的命令
            ScheduledFuture<?> cancellerHandle = timer.schedule(
                new CheckpointCanceller(checkpoint), // todo 要执行的任务
                checkpointTimeout,                   // todo 从现在开始推迟执行的时间
                TimeUnit.MILLISECONDS);              // todo 延迟参数的时间单位

            if (!checkpoint.setCancellerHandle(cancellerHandle)) { //设置句柄
                // checkpoint is already disposed!
                cancellerHandle.cancel(false);
            }
        }

        //在Flink wen监控页面中 可以看到
        //2020-05-14 14:44:08,203 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator
        // - Triggering checkpoint 1 @ 1589438648198 for job b94ef58349d8befba6412e3d85478bf5.
        LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);
        return checkpoint;
    }

最主要的方法 snapshotTaskState () 在该方法中,具体逻辑都在这里 现在看一下这个方法的实现, 首先创建CheckpointOptions对象,该对象执行检查点的选项 获取checkpoint类型和存储位置配置

然后开始触发所有tasksToTrigger的checkpoint创建过程,在触发的时候会根据是否时异步的调用不同的方法,但是两个方法最终都会调用 Execution的triggerCheckpointHelper方法

    private void snapshotTaskState(
        long timestamp,
        long checkpointID,
        CheckpointStorageLocation checkpointStorageLocation,
        CheckpointProperties props,
        Execution[] executions,
        boolean advanceToEndOfTime) {
        // 该对象  执行检查点的选项   获取checkpoint类型和存储位置配置
        final CheckpointOptions checkpointOptions = new CheckpointOptions(
            props.getCheckpointType(), //获取 类型  是检查点 还是 保存点
            checkpointStorageLocation.getLocationReference(), // 获取存储位置的引用
            isExactlyOnceMode,
            isUnalignedCheckpoint);

        // send the messages to the tasks that trigger their checkpoint
        // todo 触发所有tasksToTrigger的checkpoint创建过程
        for (Execution execution : executions) {
            // todo 触发checkpoint的入口
            if (props.isSynchronous()) {                //同步的
                execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions, advanceToEndOfTime);
            } else {  //非同步的
                execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
            }
        }
    }

由于我们平时开发都是使用异步的检查点,所以进入triggerSynchronousSavepoint方法, 该方法调用triggerCheckpointHelper方法

    public void triggerSynchronousSavepoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {
        triggerCheckpointHelper(checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime);
    }

进入triggerCheckpointHelper方法,在该方法中,会先获取slot表示逻辑槽表示任务管理器上的资源,可以将单个任务部署到该资源中

获取TaskManagerGateway对象,这里的对象为RpcTaskManagerGateway类型,RpcTaskManagerGateway是TaskManagerGateway的实现类

然后调用taskManagerGateway的triggerCheckpoint方法,进行触发checkpoint

    private void triggerCheckpointHelper(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {

        final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
        if (advanceToEndOfEventTime && !(checkpointType.isSynchronous() && checkpointType.isSavepoint())) {
            //只有同步保存点才允许将水印提升到最大值
            throw new IllegalArgumentException("Only synchronous savepoints are allowed to advance the watermark to MAX.");
        }

        //获取 slot
        final LogicalSlot slot = assignedResource;

        if (slot != null) {
            //获取task manager gateway
            //todo 这里taskManagerGateway是RpcTaskManagerGateway类型
            //返回 与TaskManager对话的网关
            final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
            //todo 触发一个保存点
            taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime);
        } else {
            // 日志输出 执行没有分配槽。这表示执行不再运行
            LOG.debug("The execution has no slot assigned. This indicates that the execution is no longer running.");
        }
    }

进入taskManagerGateway的triggerCheckpoint方法,该方法调用了taskExecutorGateway的同名方法, taskExecutorGateway是TaskExecutor类型

TaskExecutor是TaskExecutorGateway接口的实现类

    @Override
    public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {
        taskExecutorGateway.triggerCheckpoint(
            executionAttemptID,
            checkpointId,
            timestamp,
            checkpointOptions,
            advanceToEndOfEventTime);
    }

下一篇

Flink-1.10 源码笔记 checkpint - 2

如有错误,欢迎指正!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,794评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,050评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,587评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,861评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,901评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,898评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,832评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,617评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,077评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,349评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,483评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,199评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,824评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,442评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,632评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,474评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,393评论 2 352

推荐阅读更多精彩内容