Flink的waterMark实现解决乱序以及延迟数据

1、watermark的作用

watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。

我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。

但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。

2、watermark解决迟到的数据

out-of-order/late element

实时系统中,由于各种原因造成的延时,造成某些消息发到flink的时间延时于事件产生的时间。如果基于event time构建window,但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。

Watermarks(水位线)就是来处理这种问题的机制

1.参考google的DataFlow。

2.是event time处理进度的标志。

3.表示比watermark更早(更老)的事件都已经到达(没有比水位线更低的数据 )。

4.基于watermark来进行窗口触发计算的判断。

有序的数据流watermark:

在某些情况下,基于Event Time的数据流是有续的(相对event time)。在有序流中,watermark就是一个简单的周期性标记。

<v:shapetype id="_x0000_t75" stroked="f" filled="f" path="m@4@5l@4@11@9@11@9@5xe" o:preferrelative="t" o:spt="75" coordsize="21600,21600"><v:stroke joinstyle="miter"><v:formulas></v:formulas><v:path o:connecttype="rect" gradientshapeok="t" o:extrusionok="f"></v:path></v:stroke></v:shapetype><v:shape id="图片_x0020_24" style="width:415.8pt;height:88.8pt;visibility:visible;mso-wrap-style:square" type="#_x0000_t75" alt="说明: C:\Users\admin\Desktop\1642492-20190412171849001-925432538.png" o:spid="_x0000_i1027"><v:imagedata o:title="1642492-20190412171849001-925432538" src="file:///C:\Users\chenning\AppData\Local\Temp\msohtmlclip1\01\clip_image001.png"></v:imagedata></v:shape>

无序的数据流watermark:

在更多场景下,基于Event Time的数据流是无续的(相对event time)。

在无序流中,watermark至关重要,她告诉operator比watermark更早(更老/时间戳更小)的事件已经到达, operator可以将内部事件时间提前到watermark的时间戳(可以触发window计算啦)

<v:shape id="图片_x0020_25" style="width:415.8pt;height:132pt;visibility:visible;mso-wrap-style:square" type="#_x0000_t75" alt="说明: C:\Users\admin\Desktop\1642492-20190412171832181-1505800693.png" o:spid="_x0000_i1026"><v:imagedata o:title="1642492-20190412171832181-1505800693" src="file:///C:\Users\chenning\AppData\Local\Temp\msohtmlclip1\01\clip_image002.png"></v:imagedata></v:shape>

并行流当中的watermark:

通常情况下, watermark在source函数中生成,但是也可以在source后任何阶段,如果指定多次 watermark,后面指定的 watermarker会覆盖前面的值。 source的每个sub task独立生成水印。

watermark通过operator时会推进operators处的当前event time,同时operators会为下游生成一个新的watermark。

多输入operator(union、 keyBy、 partition)的当前event time是其输入流event time的最小值。

注意:多并行度的情况下,watermark对齐会取所有channel最小的watermark

<v:shape id="图片_x0020_2" style="width:415.2pt;height:158.4pt; visibility:visible;mso-wrap-style:square" type="#_x0000_t75" o:spid="_x0000_i1025"><v:imagedata o:title="" src="file:///C:\Users\chenning\AppData\Local\Temp\msohtmlclip1\01\clip_image003.png"></v:imagedata></v:shape>

watermark介绍参考链接:

https://blog.csdn.net/xorxos/article/details/80715113

3、watermark如何生成

通常,在接收到source的数据后,应该立刻生成watermark;但是,也可以在source后,应用简单的map或者filter操作,然后再生成watermark。

生成watermark的方式主要有2大类:

<pre style="margin-left:0cm;text-indent:-18.0pt;line-height:16.5pt;mso-list: l0 level1 lfo1;tab-stops:45.8pt 91.6pt list 126.0pt left 137.4pt 183.2pt 229.0pt 274.8pt 320.6pt 366.4pt 412.2pt 458.0pt 503.8pt 549.6pt 595.4pt 641.2pt 687.0pt 732.8pt; background:#F6F8FA">1. (1):With Periodic Watermarks</pre>

<pre style="margin-left:0cm;text-indent:-18.0pt;line-height:16.5pt;mso-list:l0 level1 lfo1; tab-stops:45.8pt 91.6pt list 126.0pt left 137.4pt 183.2pt 229.0pt 274.8pt 320.6pt 366.4pt 412.2pt 458.0pt 503.8pt 549.6pt 595.4pt 641.2pt 687.0pt 732.8pt; background:#F6F8FA">2. (2):With Punctuated Watermarks</pre>

第一种可以定义一个最大允许乱序的时间,这种情况应用较多。 我们主要来围绕Periodic Watermarks来说明,下面是生成periodic watermark的方法:

4、watermark处理顺序数据

需求:定义一个窗口为10s,通过数据的event time时间结合watermark实现延迟10s的数据也能够正确统计

我们通过数据的eventTime来向前推10s,得到数据的watermark,

代码实现:

import java.text.SimpleDateFormat import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks} import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector import scala.collection.mutable.ArrayBuffer import scala.util.Sorting object FlinkWaterMark2 { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._ //设置flink的数据处理时间为eventTime env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val tupleStream: DataStream[(String, Long)] = env.socketTextStream("node01", 9000).map(x => { val strings: Array[String] = x.split(" ")

(strings(0), strings(1).toLong)
}) //注册我们的水印val waterMarkStream: DataStream[(String, Long)] = tupleStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] { var currentTimemillis: Long = 0L var timeDiff: Long = 10000L val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); / //获取当前数据的waterMark
override def getNext: Watermark = {
}/ override def getCurrentWatermark: Watermark = { val watermark = new Watermark(currentTimemillis - timeDiff)
watermark
} //抽取数据的eventTime override def extractTimestamp(element: (String, Long), l: Long): Long = { val enventTime = element._2 currentTimemillis = Math.max(enventTime, currentTimemillis) val id = Thread.currentThread().getId println("currentThreadId:" + id + ",key:" + element._1 + ",eventtime:[" + element._2 + "|" + sdf.format(element._2) + "],currentMaxTimestamp:[" + currentTimemillis + "|" + sdf.format(currentTimemillis) + "],watermark:[" + this.getCurrentWatermark.getTimestamp + "|" + sdf.format(this.getCurrentWatermark.getTimestamp) + "]") enventTime
}
})
waterMarkStream.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply(new MyWindowFunction2).print()
env.execute()
}
} class MyWindowFunction2 extends WindowFunction[(String,Long),String,Tuple,TimeWindow]{ override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = { val keyStr = key.toString val arrBuf = ArrayBufferLong val ite = input.iterator while (ite.hasNext){ val tup2 = ite.next()
arrBuf.append(tup2._2)
} val arr = arrBuf.toArray
Sorting.quickSort(arr) //对数据进行排序,按照eventTime进行排序val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); val result = "****聚合数据的****key****为:****"+keyStr + "," + "****窗口当中数据的条数为:****"+arr.length + "," + "****窗口当中第一条数据为:****"+sdf.format(arr.head) + "," +"****窗口当中最后一条数据为:****"+ sdf.format(arr.last)+ "," + "****窗口起始时间为:****"+sdf.format(window.getStart) + "," + "****窗口结束时间为:****"+sdf.format(window.getEnd) + "****!!!!!看到这个结果,就证明窗口已经运行了****" out.collect(result)
}
}

输入测验数据

注意:如果需要触发flink的窗口调用,必须满足两个条件

1:waterMarkTime > eventTime

2:窗口内有数据

数据输入测验:

按照十秒钟统计一次,我们程序会将时间划分成为以下时间间隔段

2019-10-01 10:11:00 到 2019-10-01 10:11:10

2019-10-01 10:11:10 到 2019-10-01 10:11:20

2019-10-01 10:11:20 到 2019-10-01 10:11:30

2019-10-01 10:11:30 到 2019-10-01 10:11:40

2019-10-01 10:11:40 到 2019-10-01 10:11:50

2019-10-01 10:11:50 到 2019-10-01 10:12:00

顺序计算:

触发数据计算的条件依据为两个

第一个waterMark时间大于数据的eventTime时间,第二个窗口之内有数据

我们这里的waterMark直接使用eventTime的最大值减去10秒钟

0001 1569895882000 数据eventTime为:2019-10-01 10:11:22 数据waterMark为 2019-10-01 10:11:12

0001 1569895885000 数据eventTime为:2019-10-01 10:11:25 数据waterMark为 2019-10-01 10:11:15

0001 1569895888000 数据eventTime为:2019-10-01 10:11:28 数据waterMark为 2019-10-01 10:11:18

0001 1569895890000 数据eventTime为:2019-10-01 10:11:30 数据waterMark为 2019-10-01 10:11:20

0001 1569895891000 数据eventTime为:2019-10-01 10:11:31 数据waterMark为 2019-10-01 10:11:21

0001 1569895895000 数据eventTime为:2019-10-01 10:11:35 数据waterMark为 2019-10-01 10:11:25

0001 1569895898000 数据eventTime为:2019-10-01 10:11:38 数据waterMark为 2019-10-01 10:11:28

0001 1569895900000 数据eventTime为:2019-10-01 10:11:40 数据waterMark为 2019-10-01 10:11:30 触发第一条到第三条数据计算,数据包前不包后,不会计算2019-10-01 10:11:30 这条数据

0001 1569895911000 数据eventTime为:2019-10-01 10:11:51 数据waterMark为 2019-10-01 10:11:41 触发2019-10-01 10:11:20到2019-10-01 10:11:28时间段的额数据计算,数据包前不包后,不会触发2019-10-01 10:11:30这条数据的计算

5、watermark处理乱序数据

输入测验数据

接着继续输入以下乱序数据,验证flink乱序数据的问题是否能够解决

乱序数据

0001 1569895948000 数据eventTime为:2019-10-01 10:12:28 数据waterMark为 2019-10-01 10:12:18

0001 1569895945000 数据eventTime为:2019-10-01 10:12:25 数据waterMark为 2019-10-01 10:12:18

0001 1569895947001、watermark的作用
watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。
我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。
但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。
2、watermark解决迟到的数据
out-of-order/late element
实时系统中,由于各种原因造成的延时,造成某些消息发到flink的时间延时于事件产生的时间。如果基于event time构建window,但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。
Watermarks(水位线)就是来处理这种问题的机制
1.参考google的DataFlow。
2.是event time处理进度的标志。
3.表示比watermark更早(更老)的事件都已经到达(没有比水位线更低的数据 )。
4.基于watermark来进行窗口触发计算的判断。
有序的数据流watermark:
在某些情况下,基于Event Time的数据流是有续的(相对event time)。在有序流中,watermark就是一个简单的周期性标记。

无序的数据流watermark:
在更多场景下,基于Event Time的数据流是无续的(相对event time)。
在无序流中,watermark至关重要,她告诉operator比watermark更早(更老/时间戳更小)的事件已经到达, operator可以将内部事件时间提前到watermark的时间戳(可以触发window计算啦)

并行流当中的watermark:
通常情况下, watermark在source函数中生成,但是也可以在source后任何阶段,如果指定多次 watermark,后面指定的 watermarker会覆盖前面的值。 source的每个sub task独立生成水印。
watermark通过operator时会推进operators处的当前event time,同时operators会为下游生成一个新的watermark。
多输入operator(union、 keyBy、 partition)的当前event time是其输入流event time的最小值。
注意:多并行度的情况下,watermark对齐会取所有channel最小的watermark

watermark介绍参考链接:
https://blog.csdn.net/xorxos/article/details/80715113
3、watermark如何生成
通常,在接收到source的数据后,应该立刻生成watermark;但是,也可以在source后,应用简单的map或者filter操作,然后再生成watermark。
生成watermark的方式主要有2大类:

  1. (1):With Periodic Watermarks
  2. (2):With Punctuated Watermarks
    第一种可以定义一个最大允许乱序的时间,这种情况应用较多。
    我们主要来围绕Periodic Watermarks来说明,下面是生成periodic watermark的方法:
    4、watermark处理顺序数据
    需求:定义一个窗口为10s,通过数据的event time时间结合watermark实现延迟10s的数据也能够正确统计
    我们通过数据的eventTime来向前推10s,得到数据的watermark,
    代码实现:
    import java.text.SimpleDateFormat
    import org.apache.flink.api.java.tuple.Tuple
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks}
    import org.apache.flink.streaming.api.scala.function.WindowFunction
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.watermark.Watermark
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector

import scala.collection.mutable.ArrayBuffer
import scala.util.Sorting

object FlinkWaterMark2 {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._

//设置flink的数据处理时间为eventTime
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val tupleStream: DataStream[(String, Long)] = env.socketTextStream("node01", 9000).map(x => {
  val strings: Array[String] = x.split(" ")

  (strings(0), strings(1).toLong)
})

//注册我们的水印
val waterMarkStream: DataStream[(String, Long)] = tupleStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {
  var currentTimemillis: Long = 0L
  var timeDiff: Long = 10000L
  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

  /* //获取当前数据的waterMark
   override def getNext: Watermark = {
   }*/
  override def getCurrentWatermark: Watermark = {
    val watermark = new Watermark(currentTimemillis - timeDiff)
    watermark
  }

  //抽取数据的eventTime
  override def extractTimestamp(element: (String, Long), l: Long): Long = {
    val enventTime = element._2
    currentTimemillis = Math.max(enventTime, currentTimemillis)
    val id = Thread.currentThread().getId
    println("currentThreadId:" + id + ",key:" + element._1 + ",eventtime:[" + element._2 + "|" + sdf.format(element._2) + "],currentMaxTimestamp:[" + currentTimemillis + "|" + sdf.format(currentTimemillis) + "],watermark:[" + this.getCurrentWatermark.getTimestamp + "|" + sdf.format(this.getCurrentWatermark.getTimestamp) + "]")
    enventTime
  }
})
waterMarkStream.keyBy(0)
  .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  .apply(new MyWindowFunction2).print()
env.execute()

}
}

class MyWindowFunction2 extends WindowFunction[(String,Long),String,Tuple,TimeWindow]{
override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = {
val keyStr = key.toString
val arrBuf = ArrayBufferLong
val ite = input.iterator
while (ite.hasNext){
val tup2 = ite.next()
arrBuf.append(tup2._2)
}
val arr = arrBuf.toArray
Sorting.quickSort(arr) //对数据进行排序,按照eventTime进行排序
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
val result = "聚合数据的key为:"+keyStr + "," + "窗口当中数据的条数为:"+arr.length + "," + "窗口当中第一条数据为:"+sdf.format(arr.head) + "," +"窗口当中最后一条数据为:"+ sdf.format(arr.last)+ "," + "窗口起始时间为:"+sdf.format(window.getStart) + "," + "窗口结束时间为:"+sdf.format(window.getEnd) + "!!!!!看到这个结果,就证明窗口已经运行了"
out.collect(result)
}
}

输入测验数据
注意:如果需要触发flink的窗口调用,必须满足两个条件
1:waterMarkTime > eventTime
2:窗口内有数据

数据输入测验:
按照十秒钟统计一次,我们程序会将时间划分成为以下时间间隔段
2019-10-01 10:11:00 到 2019-10-01 10:11:10
2019-10-01 10:11:10 到 2019-10-01 10:11:20
2019-10-01 10:11:20 到 2019-10-01 10:11:30
2019-10-01 10:11:30 到 2019-10-01 10:11:40
2019-10-01 10:11:40 到 2019-10-01 10:11:50
2019-10-01 10:11:50 到 2019-10-01 10:12:00

顺序计算:
触发数据计算的条件依据为两个
第一个waterMark时间大于数据的eventTime时间,第二个窗口之内有数据
我们这里的waterMark直接使用eventTime的最大值减去10秒钟

0001 1569895882000 数据eventTime为:2019-10-01 10:11:22 数据waterMark为 2019-10-01 10:11:12
0001 1569895885000 数据eventTime为:2019-10-01 10:11:25 数据waterMark为 2019-10-01 10:11:15
0001 1569895888000 数据eventTime为:2019-10-01 10:11:28 数据waterMark为 2019-10-01 10:11:18

0001 1569895890000 数据eventTime为:2019-10-01 10:11:30 数据waterMark为 2019-10-01 10:11:20
0001 1569895891000 数据eventTime为:2019-10-01 10:11:31 数据waterMark为 2019-10-01 10:11:21
0001 1569895895000 数据eventTime为:2019-10-01 10:11:35 数据waterMark为 2019-10-01 10:11:25
0001 1569895898000 数据eventTime为:2019-10-01 10:11:38 数据waterMark为 2019-10-01 10:11:28

0001 1569895900000 数据eventTime为:2019-10-01 10:11:40 数据waterMark为 2019-10-01 10:11:30 触发第一条到第三条数据计算,数据包前不包后,不会计算2019-10-01 10:11:30 这条数据
0001 1569895911000 数据eventTime为:2019-10-01 10:11:51 数据waterMark为 2019-10-01 10:11:41 触发2019-10-01 10:11:20到2019-10-01 10:11:28时间段的额数据计算,数据包前不包后,不会触发2019-10-01 10:11:30这条数据的计算

5、watermark处理乱序数据
输入测验数据
接着继续输入以下乱序数据,验证flink乱序数据的问题是否能够解决

乱序数据
0001 1569895948000 数据eventTime为:2019-10-01 10:12:28 数据waterMark为 2019-10-01 10:12:18
0001 1569895945000 数据eventTime为:2019-10-01 10:12:25 数据waterMark为 2019-10-01 10:12:18
0001 1569895947000 数据eventTime为:2019-10-01 10:12:27 数据waterMark为 2019-10-01 10:12:18

0001 1569895950000 数据eventTime为:2019-10-01 10:12:30 数据waterMark为 2019-10-01 10:12:20

0001 1569895960000 数据eventTime为:2019-10-01 10:12:40 数据waterMark为 2019-10-01 10:12:30 触发计算 waterMark > eventTime 并且窗口内有数据,触发 2019-10-01 10:12:28到2019-10-01 10:12:27 这三条数据的计算,数据包前不包后,不会触发2019-10-01 10:12:30 这条数据的计算
0001 1569895949000 数据eventTime为:2019-10-01 10:12:29 数据waterMark为 2019-10-01 10:12:30 迟到太多的数据,flink直接丢弃,可以设置flink将这些迟到太多的数据保存起来,便于排查问题
0 数据eventTime为:2019-10-01 10:12:27 数据waterMark为 2019-10-01 10:12:18

0001 1569895950000 数据eventTime为:2019-10-01 10:12:30 数据waterMark为 2019-10-01 10:12:20

0001 1569895960000 数据eventTime为:2019-10-01 10:12:40 数据waterMark为 2019-10-01 10:12:30 触发计算 waterMark > eventTime 并且窗口内有数据,触发 2019-10-01 10:12:28到2019-10-01 10:12:27 这三条数据的计算,数据包前不包后,不会触发2019-10-01 10:12:30 这条数据的计算

0001 1569895949000 数据eventTime为:2019-10-01 10:12:29 数据waterMark为 2019-10-01 10:12:30 迟到太多的数据,flink直接丢弃,可以设置flink将这些迟到太多的数据保存起来,便于排查问题

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,076评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,658评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,732评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,493评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,591评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,598评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,601评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,348评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,797评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,114评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,278评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,953评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,585评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,202评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,180评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,139评论 2 352

推荐阅读更多精彩内容