Flink 源码:StreamTask 介绍及初始化过程详解

本文仅为笔者平日学习记录之用,侵删
原文:https://mp.weixin.qq.com/s/GuA9o09EEue66fEpGgoGaQ

本文是 Flink 源码解析系列,通过阅读本文你能 get 到以下点:

  • StreamTask 类的基本功能及其职责
  • StreamTask 初始化详细流程
  • StateBackend 与 keyedStateBackend 以及 operatorStateBackend 之间的关系

这里先留一个思考题:如下代码所示,开发 Flink Job 时 dataStream keyBy 后连续跟 map、filter、flatMap 三个算子,请问这三个自定义的 Function 内都可以使用 Flink 的 ValueState 吗?

dataStream.keyby(_._1)
  .map(new MyMapFunction())
  .filter(new MyFilterFunction())
  .flatMap(new MyFlatMapFunction())

一、 StreamTask 介绍

Flink 中数据的整个处理流程都是围绕 StreamTask 来做的,所以先介绍一下 StreamTask 这个类。StreamTask 类的 doc 如下所示:

StreamTask 类 doc

注释的大概意思是:StreamTask 是所有 Streaming Task 的基类,是由 TM 部署并执行的本地处理单元。每个 StreamTask 运行一个或多个 Chain 在一起的 StreamOperator,这些 Operator 将会在一个线程内同步执行。常见的 case:map、flatmap、filter 三个算子连续的算子。

通俗的讲,StreamTask 就是对应一个 subtask 实例。如下图 Job 的 ExecutionGraph 所示,Source 算子和 map 算子 Chain 在一起,组成一个 OperatorChain,所以这两个算子运行在一个 subtask 里,同时这两个算子的并行度为 2,所以在对应两个 subtask。图中后续的算子也是类似,图中任务如果运行起来,就会对应 5 个 subtask,也就是对应 5 个 StreamTask。

Job 的 ExecutionGraph

从资源角度讲,每个 TaskManager 内部有多个 slot,每个 slot 内部运行着一个 subtask,也就是说每个 slot 内部运行着一个 StreamTask。

看完这个案例,再回顾一遍源码中注释,应该比较容易理解了:

  • StreamTask 是由 TM 部署并执行的本地处理单元
  • 每个 StreamTask 运行一个或多个 Chain 在一块的 StreamOperator,即:Source 算子和 map 算子就是 Chain 在一起的 Operator
  • 这些 Operator 将会在一个线程内同步的执行。即:线程中 Source 算子和 map 算子不能同时执行。

二、StreamTask 职责简介

如源码注释所示,StreamTask 的生命周期如下所示:

StreamTask 的生命周期

简单概括分为三个阶段:初始化、run、close。

初始化阶段包括:Operator 的配置、task 特定的初始化、初始化算子的 State、open-operators。

做 Flink 开发的同学应该都知道:自定义一个 Function 时可以实现 RichFunction,实现 open 方法,然后 Job 启动时,就会调用 open 方法做一些初始化操作。
open-operators 指的是 StreamTask 在初始化阶段,会调用所有实现了 RichFunction 算子 的 open 方法。

run 阶段:主要就是数据处理了。

close 阶段:做一些关闭操作,例如调用算子的 close 方法等,并做一些清理工作。

三、StreamTask 的初始化

StreamTask 的整个流程都在 invoke 方法中,直接从 invoke 方法开始分析。invoke 方法就是上面介绍的三个阶段:初始化、run、close。初始化阶段做了很多事情,有些直接略过了(例如:创建线程池等),当然初始化阶段重要的操作会深入分析。

invoke 中初始化相关的代码做了部分删减,如下所示:

asyncOperationsThreadPool = Executors.newCachedThreadPool(new ExecutorThreadFactory("AsyncOperations", uncaughtExceptionHandler));

// 创建 StateBackend, 优先从 app 的设置中去加载,再去 config 中去加载,
// 都没有配置,则创建默认的 MemoryStateBackend
stateBackend = createStateBackend();
checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID());

operatorChain = new OperatorChain<>(this, recordWriters);
headOperator = operatorChain.getHeadOperator();

// task 特定的初始化,例如 当前 StreamTask 有 input 的情况下,会初始化 inputProcessor
init();

synchronized (lock) {
 // 循环遍历,对该 task 所有 Operator 进行状态初始化,
 // 包括初始化 StateBackend ,并调用 udf 的 initializeState 方法
 initializeState();
 openAllOperators();
}

初始化部分代码较多,下面主要介绍几部分:

  • createStateBackend
  • init
  • initializeState();
  • openAllOperators();

1. createStateBackend

见名之意,该方法就是创建 StateBackend,Flink 目前支持三种 StateBackend:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。该方法决定了当前 Job 具体要创建哪种 StateBackend。

源码如下:

// (1) the application defined state backend has precedence
// 代码中创建了 StateBackend,则按照代码中配置来
if (fromApplication != null) {
 // see if this is supposed to pick up additional configuration parameters
 if (fromApplication instanceof ConfigurableStateBackend) {
  // needs to pick up configuration
  if (logger != null) {
   logger.info("Configuring application-defined state backend with job/cluster config");
  }

  backend = ((ConfigurableStateBackend) fromApplication).configure(config, classLoader);
 }
 else {
  // keep as is!
  backend = fromApplication;
 }
}
else {
 // (2) check if the config defines a state backend
 // 代码中没有配置,按照 配置文件来:即按照 flink-conf.yaml 文件中的配置来
 final StateBackend fromConfig = loadStateBackendFromConfig(config, classLoader, logger);
 if (fromConfig != null) {
  backend = fromConfig;
 }
 else {
  // (3) use the default
  // 代码中没有配置,配置文件也没有配置,则创建默认的 MemoryStateBackend
  backend = new MemoryStateBackendFactory().createFromConfig(config, classLoader);
 }
}

整体流程比较简单:

  • 代码中创建了 StateBackend,则按照代码中配置来
  • 代码中没有配置,按照 配置文件来:即按照 flink-conf.yaml 文件中的配置来
  • 代码中没有配置,配置文件也没有配置,则创建默认的 MemoryStateBackend

2. init

init 运行 task 特定的初始化,例如当前 StreamTask 有 input 的情况下,会初始化 inputProcessor 读取并处理数据。关于 inputProcessor 会在 run 部分重点介绍,这里先略过。

3. initializeState

initializeState 方法源码:

// 循环遍历,对该 task 所有 Operator 进行状态初始化,
// 包括初始化 StateBackend ,并调用 udf 的 initializeState 方法
private void initializeState() throws Exception {

 StreamOperator<?>[] allOperators = operatorChain.getAllOperators();

 for (StreamOperator<?> operator : allOperators) {
  if (null != operator) {
   operator.initializeState();
  }
 }
}

源码逻辑比较简单:直接调用当前 StreamTask 的 operatorChain 中所有 StreamOperator 的 initializeState 方法。假如当前 operatorChain 包含了 MapFunction、FilterFunction,两个算子将会被封装在 StreamMap 和 StreamFilter 中,那么此时就会调用这两个算子所对应的 StreamOperator 的 initializeState 方法,根据继承,最后调用的是 AbstractStreamOperator 的无参 initializeState() 方法。

这里专门强调无参 initializeState() 方法,是因为 AbstractStreamOperator 中还有一个有参的 initializeState(StateInitializationContext context) 方法,不要混淆。

注:有参的 initializeState 方法参数类型较长,下文将缩写为 initializeState(context) ;无参的 initializeState 方法继续用 initializeState() 表示。

AbstractStreamOperator 类的 initializeState() 方法介绍

initializeState 方法的简洁版源码如下:

public final void initializeState() throws Exception {
 final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());

  // 创建 StreamTaskStateInitializerImpl
 final StreamTaskStateInitializer streamTaskStateManager =
  Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer());

 // 使用 StreamTaskStateInitializerImpl 初始化
  // 各种 operatorStateBackend 和 keyedStateBackend,
 // 并从 Checkpoint 处恢复 State
 final StreamOperatorStateContext context =
  streamTaskStateManager.streamOperatorStateContext(XXX);

 try {
    // new Context 
  StateInitializationContext initializationContext = new 
      StateInitializationContextImpl(XXX);

  /**
   * 重点关注 AbstractUdfStreamOperator,它重写了 initializeState(context) 方法,
   * 去真正调用 各个 udf 的 initializeState 方法,
   */
  initializeState(initializationContext);
 } finally {
    XXX
 }
}

initializeState 方法主要完成两个工作:

  • 1、初始化 KeyedStateBackend 和 OperatorStateBackend,并从 Checkpoint 处恢复 State
  • 2、如果封装了 udf,则调用 udf 的 initializeState 方法(前提是 userFunction 实现了 CheckpointedFunction 接口)

源码流程:创建 StreamTaskStateInitializer。StreamTaskStateInitializer 只有一个实现类:StreamTaskStateInitializerImpl,所以会创建 StreamTaskStateInitializerImpl。创建时,将之前初始化好的 StateBackend 传递给 StreamTaskStateInitializerImpl,然后调用 streamOperatorStateContext 方法初始化 KeyedStateBackend 和 OperatorStateBackend。

下面重点关注 StreamTaskStateInitializerImpl 类的 streamOperatorStateContext 方法,源码如下所示:

public StreamOperatorStateContext streamOperatorStateContext(XXX){
// -------------- 初始化 Keyed State Backend --------------
 keyedStatedBackend = keyedStatedBackend(XXX);

 // -------------- 初始化 Operator State Backend --------------
 operatorStateBackend = operatorStateBackend(XXX);

 // -------------- Raw State 相关操作 --------------
 rawKeyedStateInputs = rawKeyedStateInputs(XXX);
 streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);

 rawOperatorStateInputs = rawOperatorStateInputs(XXX);
 streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);

 // -------------- Internal Timer Service Manager --------------
 timeServiceManager = internalTimeServiceManager(keyedStatedBackend, keyContext, rawKeyedStateInputs);

 // -------------- Preparing return value --------------

 return new StreamOperatorStateContextImpl(
  prioritizedOperatorSubtaskStates.isRestored(),
  operatorStateBackend,
  keyedStatedBackend,
  timeServiceManager,
  rawOperatorStateInputs,
  rawKeyedStateInputs);
}

首先会初始化 Keyed State Backend 和 Operator State Backend,Flink 还支持 Raw 类型的 State,基本用不到,除非 Flink 内的 Managed State 不能满足作业的需求。重点关注 Keyed State Backend 和 Operator State Backend 的初始化。keyedStatedBackend 方法用于初始化 keyedStatedBackend,operatorStateBackend 方法用于初始化 operatorStateBackend。

StateBackend 与 keyedStateBackend 以及 operatorStateBackend 之间的关系

介绍初始化源码之前,先介绍一下 StateBackend 与 keyedStateBackend 以及 operatorStateBackend 之间的关系。我们都知道 Flink 中支持两种类型的 State,即:KeyedState 和 OperatorState;Flink 目前支持三种状态后端存储,即:Memory、Fs 和 RocksDB。所以 StateBackend 有三种实现即:MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend。每种 StateBackend 都要支持 KeyedState 和 OperatorState,所以每种 StateBackend 要负责创建出自己相应的 keyedStateBackend 以及 operatorStateBackend。具体 KeyedState 与存储系统如何交互是由 keyedStateBackend 完成的,具体 OperatorState 与存储系统如何交互是由 operatorStateBackend。例如 RocksDBStateBackend 会创建出 RocksDBKeyedStateBackend,每个 RocksDBKeyedStateBackend 会持有 RocksDB 数据库实例,然后 Flink 引擎就可以与 RocksDB 进行交互了。

简言之:根据用户配置创建出不同类型的 StateBackend,然后不同的 StateBackend 再创建出对应的 keyedStateBackend 以及 operatorStateBackend,keyedStateBackend 和 operatorStateBackend 会真正的存储状态数据。

每种 StateBackend 到底会创建出哪种 keyedStateBackend 和哪种 operatorStateBackend 呢?这里引用 Flink 社区分享的图:

StateBackend 与 keyedStateBackend 以及 operatorStateBackend 的映射关系

图中可以看出,Memory 和 Fs 会创建出 HeapKeyedStateBackend,RocksDB 会创建出 RocksDBKeyedStateBackend。无论哪种 StateBackend,都会使用 DefaultOperatorStateBackend。这里也验证了一点:RocksDB 数据库中只会存储 KeyedState,不会存储 OperatorState。因为 RocksDBStateBackend 对应的 OperatorState 的存储也是基于内存的。读到这里,读者应该理解 StateBackend 与 keyedStateBackend 以及 operatorStateBackend 之间的关系了。

初始化 keyedStateBackend 流程

下面重点关注初始化 keyedStateBackend 的流程,keyedStatedBackend 方法源码如下所示:

protected <K> AbstractKeyedStateBackend<K> keyedStatedBackend(XXX){
 // 如果不是 KeyedStream 直接就返回,即:不创建 keyedStatedBackend
 if (keySerializer == null) {
  return null;
 }
  // 计算当前 subtask 负责的 KeyGroupRange
 final KeyGroupRange keyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(
  taskInfo.getMaxNumberOfParallelSubtasks(),
  taskInfo.getNumberOfParallelSubtasks(),
  taskInfo.getIndexOfThisSubtask());

 BackendRestorerProcedure<AbstractKeyedStateBackend<K>, KeyedStateHandle> 
    backendRestorer =
  new BackendRestorerProcedure<>(
   // 这里是函数式接口,并不是去 create KeyedStateBackend
   (stateHandles) -> stateBackend.createKeyedStateBackend(XXX),
   backendCloseableRegistry,
   logDescription);

 try {
  // 这里去 create StateBackend 并恢复状态文件
  return backendRestorer.createAndRestore(
   // 获取 StateHandle 的集合
   prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState());
 } finally {
  if (backendCloseableRegistry.unregisterCloseable(cancelStreamRegistryForRestore)) {
   IOUtils.closeQuietly(cancelStreamRegistryForRestore);
  }
 }
}

可以看到方法第一行:if (keySerializer == null) {return null;} 表示如果不是 KeyedStream 直接就返回,即:不创建 keyedStatedBackend。计算当前 subtask 负责的 KeyGroupRange,然后创建 BackendRestorerProcedure 类型的 backendRestorer,且将 (stateHandles) -> stateBackend.createKeyedStateBackend(XXX) 传递给 BackendRestorerProcedure 构造器的 instanceSupplier,instanceSupplier 是一个函数式接口,用于创建 KeyedStateBackend。

backendRestorer.createAndRestore 方法会循环调用 attemptCreateAndRestore 恢复一个个 State, attemptCreateAndRestore 方法中调用 instanceSupplier(函数式接口)真正的创建 keyedStatedBackend。

instanceSupplier 函数式接口的工作:调用相应 StateBackend 的 createKeyedStateBackend 方法创建 AbstractKeyedStateBackend。如果 stateBackend 是 RocksDBStateBackend,就会创建出 RocksDBKeyedStateBackend。如果是 Memory 或 Fs 则会创建出 HeapKeyedStateBackend。在创建完 KeyedStateBackend 的过程中,会从 Checkpoint 中恢复状态到 Flink 引擎。

注:具体 KeyedStateBackend 恢复状态的流程比较复杂,每种 StateBackend 的恢复流程都不同,同时还牵扯到从 dfs 中拉取状态数据用于恢复,所以后续会有单独的博客介绍恢复流程。

初始化 operatorStateBackend 流程

operatorStateBackend 方法用于初始化 operatorStateBackend。operatorStateBackend 初始化流程与 keyedStateBackend 比较类似,区别在于最后调用的是 stateBackend.createOperatorStateBackend()。

三种 StateBackend 的 createOperatorStateBackend 方法非常相似,源码如下:

public OperatorStateBackend createOperatorStateBackend(
 Environment env,
 String operatorIdentifier,
 @Nonnull Collection<OperatorStateHandle> stateHandles,
 CloseableRegistry cancelStreamRegistry) throws Exception {

 return new DefaultOperatorStateBackendBuilder(
  env.getUserClassLoader(),
  env.getExecutionConfig(),
  isUsingAsynchronousSnapshots(),
  stateHandles,
  cancelStreamRegistry).build();
}

这里无论是何种 StateBackend,都会创建出 DefaultOperatorStateBackend。也就验证了一点:RocksDB 只支持 KeyedState,OperatorState 都是按照 Heap 的方案。

具体 new DefaultOperatorStateBackend 的过程由建造器 DefaultOperatorStateBackendBuilder 完成,build 的功能是创建出 OperatorStateBackend,并从 Checkpoint 中将 State 恢复到 Flink 引擎端。整体流程比较复杂,这里不阐述会在后面博客中单独介绍。

如果当前是 udf,则调用 udf 的 initializeState 方法

接下来重点又回到了 AbstractStreamOperator 类的 initializeState() 方法中,根据创建好的 operatorStateBackend 和 keyedStateStore 构造 Context。然后调用 initializeState(Context) 方法,之前说过要区分有参和无参的 initializeState 方法,现在执行到了有参的 initializeState(Context) 方法。

前面介绍过所有自定义的 UDF 都被包装起来,例如 MapFunction 都被 StreamMap 类包装起来,且这些 UDF 的包装类都继承自 AbstractUdfStreamOperator,AbstractStreamOperator 类的 initializeState(Context) 方法没有任何实现,这里重点关注 AbstractUdfStreamOperator 重写的 initializeState(Context) 方法。

AbstractUdfStreamOperator 类的 initializeState 方法源码:

@Override
public void initializeState(StateInitializationContext context) throws Exception {
 // super 表示 AbstractStreamOperator 类
 super.initializeState(context);
 // 调用 udf 的 initializeState 方法
 StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}

super 表示 AbstractStreamOperator 类,即调用 AbstractStreamOperator 类的 initializeState(context) 空方法,重点在于工具类 StreamingFunctionUtils 的 restoreFunctionState(context, userFunction) 方法,restoreFunctionState 方法内会对包装的 udf 进行解包装,然后执行 tryRestoreFunction 方法。

tryRestoreFunction 方法部分源码如下所示:

private static boolean tryRestoreFunction(
  StateInitializationContext context,
  Function userFunction) throws Exception {
 // 调用 udf 的  initializeState 方法,
  // 前提是 userFunction instanceof CheckpointedFunction
 if (userFunction instanceof CheckpointedFunction) {
  ((CheckpointedFunction) userFunction).initializeState(context);
  return true;
 }
}

tryRestoreFunction 会对 userFunction 进行判断,如果实现了 CheckpointedFunction 接口,就调用 userFunction 的 initializeState(context) 对状态进行初始化。使用过 CheckpointedFunction 接口的同学应该清楚:自定义的 Function 可以实现 CheckpointedFunction 接口,重写 initializeState 方法,做一些状态的初始化操作。例如在 initializeState 方法申请创建 OperatorState。

udf 使用 initializeState 的经典案例就是 FlinkKafkaConsumerBase 类,FlinkKafkaConsumerBase 类实现了 CheckpointedFunction 接口,在 initializeState 方法中定义了 OperatorState 类型的 ListState,将 Flink 消费 Kafka 的 offset 信息维护在 ListState 中。每次启动任务时,都会从 ListState 中恢复之前的 offset,并从 offset 处继续消费。

initializeState 小结

initializeState 过程比较复杂,总的来说就两个事情:

  • 1、 创建相应的 keyedStateBackend 和 OperatorStateBackend,并从 Checkpoint 处恢复 State(具体恢复流程后续讲述)
  • 2、 如果 udf 实现了 CheckpointedFunction 接口,则调用 udf 的 initializeState 方法

4. openAllOperators

此时回到了 StreamTask 初始化流程的下一步:openAllOperators。openAllOperators 方法比较简单,源码如下所示:

private void openAllOperators() throws Exception {
 for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
  if (operator != null) {
   operator.open();
  }
 }
}

openAllOperators 方法会调用 OperatorChain 中所有 StreamOperator 的 open 方法,通过继承关系,最后调用的仍然是 AbstractUdfStreamOperator 类的 open 方法。AbstractUdfStreamOperator 类的 open 方法源码如下所示:

// AbstractUdfStreamOperator 类的 open 方法
@Override
public void open() throws Exception {
 super.open();
 FunctionUtils.openFunction(userFunction, new Configuration());
}

// FunctionUtils 类的 openFunction 方法
public static void openFunction(Function function
                                , Configuration parameters) {
 if (function instanceof RichFunction) {
  RichFunction richFunction = (RichFunction) function;
  richFunction.open(parameters);
 }
}

AbstractUdfStreamOperator 类的 open 方法调用 FunctionUtils 类的 openFunction 方法,openFunction 方法中会判断当前 userFunction 是否实现了 RichFunction 接口,如果实现了 RichFunction 接口,则调用 userFunction 的 open 方法。

openAllOperators 小结

openAllOperators 的流程比较简单,就是判断 userFunction 是否实现了 RichFunction 接口,在 Flink 中实现了 RichFunction 表示富函数,可以定义 open 和 close 相关的逻辑,在算子初始化或者关闭时会被调用。

四、 思考题

如下代码所示,开发 Flink Job 时 dataStream keyBy 后连续跟 map、filter、flatMap 三个算子,请问这三个自定义的 Function 内都可以使用 Flink 的 ValueState 吗?

dataStream.keyby(_._1)
  .map(new MyMapFunction())
  .filter(new MyFilterFunction())
  .flatMap(new MyFlatMapFunction())

先说答案吧:在 MyMapFunction 中可以使用 ValueState,在 MyFilterFunction 和 MyFlatMapFunction 中不能使用 ValueState。如果在 MyFilterFunction 和 MyFlatMapFunction 中定义 ValueState 或 MapState,都会报错,会显示 keyedStateBackend 为 null。为什么呢?

首先 Flink 中只有 KeyedState 才支持 ValueState 和 MapState,OperatorState 不支持 ValueState 和 MapState。只要对 KeyedStream 的操作才能使用 KeyedState,KeyedState 表示相同的 key 共享同一个 State,普通的 DataStream 中没有 key 的概念不能使用 KeyedState。

DataStream 的 keyBy 方法源码如下所示,由源码 DataStream 可以看到,DataStream 的 keyBy 方法会返回 KeyedStream,KeyedStream 是 DataStream 的子类,KeyedStream 经过 map 转换后又会变成 DataStream。所以上图中只有 MyMapFunction 是基于 KeyedStream 操作的,MyFilterFunction 和 MyFlatMapFunction 都是基于 DataStream 操作的,没有 key 的概念,因此不能使用 KeyedState,即不能使用 ValueState。

// DataStream 的 keyBy 方法源码
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
 Preconditions.checkNotNull(key);
 return new KeyedStream<>(this, clean(key));
}

MyFilterFunction 和 MyFlatMapFunction 中定义 ValueState 或 MapState 时,为什么会报出 keyedStateBackend 为 null 呢?

回顾一下创建 keyedStateBackend 的流程,第一步就是 if (keySerializer == null) {return null;},如果不是 KeyedStream 直接就返回,即:不创建 keyedStatedBackend。所以出现了上述现象。

五、 总结

本文介绍了 StreamTask 类的基本功能,StreamTask 映射到 ExecutionGraph 中对应的是一个 subtask,每个 StreamTask 运行一个或多个 Chain 在一起的 StreamOperator,这些 Operator 将会在一个线程内同步执行。随后介绍了 StreamTask 的生命周期,主要包括了初始化、run、close 三个流程。后半部分重点描述了 StreamTask 初始化的过程,主要是:createStateBackend、init、initializeState()、openAllOperators() 四个过程。

后续会给大家详细介绍 initializeState 部分如何从 Checkpoint 中恢复 State,也会详细介绍 run 流程到底是如何处理一条条数据的。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,133评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,682评论 3 390
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,784评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,508评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,603评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,607评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,604评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,359评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,805评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,121评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,280评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,959评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,588评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,206评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,193评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,144评论 2 352