Flink Time详解

Flink Time 详解

概述:
    对于流式数据处理,最大的特点是数据上具有时间的属性特征,Flink根据时间产生的位置不同,将时间区分为三种时间语义
    分别为事件生成时间(Event Time)、事件接入时间(Ingestion Time)和事件处理时间(Processing Time)
1. Event Time: 事件产生的时间,它通常有事件中的时间戳描述
2. Ingestion Time: 事件进入Flink的时间
3. Processing Time : 事件被处理时当前系统的时间.

1. 时间语义Time

概述:
    数据从终端产生,或者从系统中产生的过程中生成的时间为事件生成时间,当时间经过消息中间件传入到Flink系统中,在DataSource中接入
    的时候会生成事件接入时间,当数据在Flink系统中通过各个算子实例执行转换操作的过程中,算子实例所在系统的时间为数据处理时间,
1. 设置时间语义:
概述:
     在Flink中默认情况下使用的是Process Time时间语义,如果用户选择使用Event Time或者Ingestion Time 语义, 则需要在创建的
     StreamExecutionEnvironment 中调用setStreamTimeCharacteristic()方法设定系统的时间概念,
     如下代码使用TimeCharacteristic.EventTime作为系统的时间语义:
code:
    //设置使用EventTime
    env.setStreamTImeCharacteristic(TimeCharacteristic.EventTime)
    // 使用IngestionTime
    env.setStreamTImeCharacteristic(TimeCharacteristic.IngestionTime)

2. WaterMark水位线

概述:
    在使用EventTIme处理Stream数据的时候会遇到数据乱序的问题,流处理从Event(事件)产生,流经Source,再到Operator,这中间需要一定
    的时间,虽然大部分情况下,传输到Operator的数据都是按照事件产生的时间顺序来的,但是也不能排除由于网络延迟等原因而导致乱序的
    产生,特别是使用Kafka的时候,多个分区之间的数据无法保证有序,因此,在进行Window计算的时候,不能无限期地等下去,必须要有个机制
    来保证在特定的时间后,必须要触发Window进行计算,WaterMark是用于处理乱序事件的.
1. Watermark原理
概述:
    在Filnk的窗口处理过程中,如果确定全部数据到达,就可以对Window的所有数据做窗口计算操作(汇总,分组),如果数据没有全部到达,则
    继续等待该窗口中的数据全部到达才开始处理,这种情况下就需要用到水位线(WaterMarks)机制,它能够衡量数据处理进度(表达数据达到
    的完整性),保证事件数据(全部)达到Flink系统,或者在乱序以及延迟到达时,也能够想预期一样计算出正确并且连续的结果,
    当任何Event进入到FLink系统时,会根据当前最大事件时间产生Watermarks时间戳(t)
Flink是怎么计算Watermark的值呢?
概述:
    Watermark = 进入Flink的最大的事件时间(MaxEventTime)-指定的延迟时间(t)
那么有Watermark的Window是怎么处罚窗口函数的呢?
概述:
    如果有窗口的停止时间等于或者小于maxEventTIme -t(当时的Watermark),那么这个窗口被触发执行.
注意:
    Watermark本质可以理解成一个延迟触发机制.

Watermark的使用存在三种情况:

1. 本来有序的Stream中的Watermark
概述:
    如果数据元素的事件时间是有序的,Watermark时间戳会随着数据元素的事件时间按顺序生成,此时水位线的变化和
事件时间保持一致(因为既然是有序的时间,就需要设置延迟了,那么t就是0,所以watermark= maxTime-0 = maxTime),也
就是理想状态下的水位线,
当Watermark大于Windows结束时间就会触发对Windows的数据计算,以此类推
下一个Window也是一样
2. 乱序事件中的Watermark
概述:
    现实情况下数据元素往往并不是按照其产生顺序接入到Flink系统中进行处理,而频繁出现乱序或迟到的情况,这种
情况就需要使用Watermark来应对.
3. 并行数据流中的Watermark
概述:
    在多并行度的情况下,Watermark会有一个对齐机制,这个对齐机制会取所有Channel中最小的Watermark.

引入Wateramark和EventTime

1. 有序数据流中引入Watermark和EventTime
    对于有序的数据,代码比较简洁,主要需要从源Event中抽取EventTime.
    code:
    // 读取Socket数据
    //获取flink实时流处理的环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        val data = env.socketTextStream("localhost", 9999)
          .map { line =>
            var arr = line.split(",")
            StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
          }
        // 根据EventTime有序的数据流
        data.assignAscendingTimestamps(_.callTime)
        // StationLog对象中抽取EventTime就是callTime属性
2. 乱序数据流中引入Watermark和EventTime
    对于乱序数据流,有两种常见的引入方法:周期性和间断性.
    2.1 With Periodic(周期性的)Watermark
        周期性地生成Watermark的生成,默认是100ms,每隔N毫秒自动向流里注入一个Watermark,时间间隔由streamEnv.getConfig.
        setAutoWatermarkInterval()决定.最简单写入
        // 如果EventTime是乱序的,需要考虑一个延迟时间t
        // 当前代码设置的延迟时间为3s
        data.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[StationLog](Time.seconds(3)) {
          override def extractTimestamp(t: StationLog): Long = {
            t.callTime// 设置EventTIme
          }
        })
    2.2 With Punctuated(间断性的)Watermark
        间断性的生成Watermark一般都是基于某些事件触发Watermark的生成和发送,比如:在我们的基站数据中,有一个基站的CallTime
        总是没有按照顺序传入,其他基站的时间都是正常的,那我们需要对这个基站来专门生成Watermark
        // 1. 只有satation_1的Event是无序的,所以只需要针对Station_1做处理
        // 当前代码设置station_1基站的延迟处理时间为3s
        data.assignTimestampsAndWatermarks(new MyCustomerPunctuatedWatermarks(3000L)) //自定义延迟
        class MyCustomerPunctuatedWatermarks(delary: Long) extends
        AssignerWithPunctuatedWatermarks[StationLog] {
        var maxTime: long = 0
    
        override def checkAndGetNextWatermark(t: StationLog, l: Long): Watermark = {
          if (t.sid.equals("station_1")) {
            ///当基站ID为station_1才生成水位线
            maxTime = math.max(maxTime, l)
            new Watermark(maxTime)
          } else {
            return null //
          }
        }
    
        override def extractTimestamp(t: StationLog, l: Long): Long = {
          // 抽取EventTime的值
          t.callTime
        }
        }

Watermark 案例

概述:
    每隔5s中统计一下最近10s内每个基站中通话时间最长的一次通话发生的
    呼叫时间、主叫号码、被叫号码,通话时长. 并且还得告诉我是那个时间范围(10s)
code:
    package FlinkDemo.time
    import java.text.SimpleDateFormat
    import FlinkDemo.functions.FunctionClassTransformation.StationLog
    import org.apache.flink.api.common.functions.ReduceFunction
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala.function.WindowFunction
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
       /**
        * 每隔5s中统计一下最近10s内每个基站中通话时间最长的一次通话发生的
        * 呼叫时间、主叫号码、被叫号码,通话时长
        * 并且还得告诉我是那个时间范围(10s)
        */
    object Watermark_demo {
      def main(args: Array[String]): Unit = {
        //获取flink实时流处理的环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        // 设置数据流时间特征,默认为TimeCharacteristic.ProcessingTime,默认水位线更新200ms
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        // 设置并行度1
        env.setParallelism(1)
        // 导入隐式转换
        import org.apache.flink.streaming.api.scala._
        val data = env.socketTextStream("localhost", 9999)
        // 分配事件时间提取器, 
          .assignTimestampsAndWatermarks(new TimestampExtractor1())
          .map { line =>
            var arr = line.split(",")
            StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
          }
          // 引入Watermark
          .keyBy(_.sid)
          // 设置滑动窗口大小为10s,滚动时间为5s
          .timeWindow(Time.seconds(10), Time.seconds(5))
          // reduce 函数做增量聚合,MaxTimeAggregate能做到来一条数据处理一条,
          // ReturnMaxTime 在窗口触发的时候调用.
          // reduce( preAggregator: (T, T) => T,
          //      function: ProcessWindowFunction[T, R, K, W])
          .reduce(new MaxTimeReduce, new ReturnMaxTime)
          .print()
        env.execute()
      }
    
      class MaxTimeReduce extends ReduceFunction[StationLog] {
        // 每个基站中传入的数据
        override def reduce(value1: StationLog, value2: StationLog): StationLog = {
          if (value1.duration > value2.duration) value1 else value2
        }
      }
    
      class ReturnMaxTime extends WindowFunction[StationLog, String, String, TimeWindow] {
        // 获取时间范围
        override def apply(key: String, window: TimeWindow, input: Iterable[StationLog], out: Collector[String]): Unit = {
          val sb = new StringBuilder
          val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
          sb.append(s"时间范围: Start: ${format.format(window.getStart)},${format.format(window.getEnd)}")
          sb.append("\n")
          sb.append(s"接入时间: ${format.format(input.iterator.next.callTime)}")
          sb.append("\n")
          sb.append("通话日志: ").append(input.iterator.next())
          out.collect(sb.toString())
        }
      }
    }
提取器: 
    package FlinkDemo.time
    
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
    import org.apache.flink.streaming.api.watermark.Watermark
    
    import java.text.SimpleDateFormat
    
    class TimestampExtractor1 extends AssignerWithPeriodicWatermarks[String] with
      Serializable {
      var currentTimestamp: Long = 0L
      //设置最大允许的乱序事件是5s
      val maxDelayTime = 5000L
      val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
      var watermark: Watermark = null
    
      override def getCurrentWatermark: Watermark = {
        watermark =
          new Watermark(currentTimestamp - maxDelayTime)
        //println("new watermark: " + format.format(watermark.getTimestamp) + " ")
        watermark
      }
    
      override def extractTimestamp(t: String, l: Long): Long = {
        val timeStamp =  t.split(",")(4).toLong
        currentTimestamp = Math.max(timeStamp, currentTimestamp)
        println("timestamp: " + format.format(timeStamp) + "|" + format.format(currentTimestamp) + "|" + format.format(getCurrentWatermark.getTimestamp))
        timeStamp
      }
    }

Watermark demo2

code:
    import org.apache.flink.api.common.functions.ReduceFunction
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala.function.WindowFunction
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    
    /*
     *想使用WaterMark,需要3个步骤
     *  1. 对数据进行timestamp提取,即调用assignTimestampsAndWaterMarks函数
     *     实例化BoundedOutOfOrdernessTimestampExtractor,重写extractTimestamp方法
     * 2. 是指使用事件时间,因为WaterMark是基于事件时间
     * 3. 定义时间窗口: 翻滚窗口(TumblingEventWindows)、滑动窗口(timeWindow)
     *  任意要给没有实现都会报异常
     */
    object EtDemo {
      /**/
    
    
      def main(args: Array[String]): Unit = {
        //获取flink实时流处理的环境
        val senv = StreamExecutionEnvironment.getExecutionEnvironment
        import org.apache.flink.streaming.api.scala._
        // 设置数据流时间特征,默认为TimeCharacteristic.ProcessingTime,默认水位线更新200ms
        senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        // 导入隐式转换
    
        //便于测试,并行度设置为1
        senv.setParallelism(1)
    
        // 2. 创建数据源
    
        // 3. 绑定数据源
        val text = senv.socketTextStream("localhost", 9999)
          //senv.fromCollection(data)
          // 添加事件时间提取器
          .assignTimestampsAndWatermarks(new TimestampExtractor())
        // 对自定的数据集进行窗口计数
        val counts = text
          .map { (m: String) => new Character(m.split(",")(1), 1) }
          .keyBy(_.character)
          //划分规则时按1分钟,内秒数划分的1-10,5-15,..30-40,35-45,40-50,45-55,50-60,55-60
          .timeWindow(Time.seconds(10), Time.seconds(5))
          .reduce(new ReduceDemo, new MyWFunction)
          .print()
        senv.execute("EventTime processing examkple")
      }
    
      case class Character(character: String, num: Int)
    
      class ReduceDemo extends ReduceFunction[Character] {
        // 计算传入的数据累加
        override def reduce(value1: Character, value2: Character): Character = {
          new Character(value1.character, value2.num + value1.num)
        }
      }
    
      class MyWFunction extends WindowFunction[Character, String, String, TimeWindow] {
        override def apply(key: String, window: TimeWindow, input: Iterable[Character], out: Collector[String]): Unit = {
          val sb = new StringBuilder
          val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
          sb.append(s"时间范围: Start: ${format.format(window.getStart)},${format.format(window.getEnd)}  ")
          sb.append(input.iterator.next().toString())
          out.collect(sb.toString())
        }
      }
    }
input:
    1522827251000,a
    1522827252000,a
    1522827251000,b
    1522827252000,b
    1522827256000,a
    1522827254000,a
    1522827261000,a
    1522827270000,a
result:
    timestamp: 2018-04-04 15:34:11.000|2018-04-04 15:34:11.000|2018-04-04 15:34:06.000
    timestamp: 2018-04-04 15:34:12.000|2018-04-04 15:34:12.000|2018-04-04 15:34:07.000
    timestamp: 2018-04-04 15:34:11.000|2018-04-04 15:34:12.000|2018-04-04 15:34:07.000
    timestamp: 2018-04-04 15:34:12.000|2018-04-04 15:34:12.000|2018-04-04 15:34:07.000
    timestamp: 2018-04-04 15:34:16.000|2018-04-04 15:34:16.000|2018-04-04 15:34:11.000
    timestamp: 2018-04-04 15:34:14.000|2018-04-04 15:34:16.000|2018-04-04 15:34:11.000
    timestamp: 2018-04-04 15:34:21.000|2018-04-04 15:34:21.000|2018-04-04 15:34:16.000
    时间范围: Start: 2018-04-04 15:34:05,2018-04-04 15:34:15  Character(a,3)
    时间范围: Start: 2018-04-04 15:34:05,2018-04-04 15:34:15  Character(b,2)
    timestamp: 2018-04-04 15:34:30.000|2018-04-04 15:34:30.000|2018-04-04 15:34:25.000
    时间范围: Start: 2018-04-04 15:34:10,2018-04-04 15:34:20  Character(a,4)
    时间范围: Start: 2018-04-04 15:34:10,2018-04-04 15:34:20  Character(b,2)
    时间范围: Start: 2018-04-04 15:34:15,2018-04-04 15:34:25  Character(a,2)        

Window的 allowedLateness

概述:
    基于Event-Time的窗口处理流式数据,虽然提供了Watermark机制,却只能在一定程度上解决数据乱序的问题,
    但在某些情况下数据可能延时会非常严重,即使通过Watermark机制也无法等到数据全部进入窗口再进行处理.
    Flink中默认会将这些迟到的数据做丢弃处理,但是有些时候用户希望即使数据延迟达到的情况下,也能够正常
    按照流程处理并输出结果,此时就需要使用Allowed Lateness机制来对迟到的数据进行额外的处理
情况:
    通常情况下用户虽然希望对迟到的数据进行窗口计算,但并不想将结果混入正常的计算流程中,
    例如: 用户大屏数据展示系统,即使正常的窗口中没有将迟到的数据进行统计,但为了保证页面数据显示的连续型,
    后来接入到系统中迟到数据所统计出来的结果不希望显示在屏幕上,而是将延时数据和结果存储到数据库中,
    便于后期对延时数据进行分析.
解决:
    对于这种情况需要借助SideOutput来处理,通过使用sideOutputLateDate(OutputTag)来标记迟到数据计算的结果,
    然后使用getSideOutput(lateOutputTag)从窗口结果中获取lateOutputTag标签对应的数据,之后转成独立的DataStream
    数据集进行处理,创建late-date的OutputTag,再通过该标签从窗口结果中将迟到的数据筛选出来.
注意:
    如果有Watermark同时也有Allowed Lateness,name窗口函数再次触发的条件是:
    watermark < end-of-window +allowdLateness
案例:
    import java.text.SimpleDateFormat
    
    import FlinkDemo.functions.FunctionClassTransformation.StationLog
    import org.apache.flink.api.common.functions.AggregateFunction
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala.function.WindowFunction
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    
    object LateDataOnWindow {
      /**
        * 案例: 每隔5s统计最近10s,每个基站的呼叫数量
        * 1. 每隔基站的数据会存在乱序
        * 2. 大多数数据延迟2s到,但是有些数据迟到时间比较长
        * 3. 迟到时间超过2s的数据不能丢弃,放入测流
        */
    
      def main(args: Array[String]): Unit = {
        //获取flink实时流处理的环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        // 设置数据流时间特征,默认为TimeCharacteristic.ProcessingTime,默认水位线更新200ms
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)
        // 导入隐式转换
        import org.apache.flink.streaming.api.scala._
        val data = env.socketTextStream("localhost", 9999)
          // .assignTimestampsAndWatermarks(new TimestampExtractor1())
          .map { line =>
          var arr = line.split(",")
          StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
        }
          // 引入Watermark
          .assignTimestampsAndWatermarks(
          new BoundedOutOfOrdernessTimestampExtractor[StationLog](Time.seconds(2)) {
            //延迟2s
            override def extractTimestamp(t: StationLog): Long = {
              t.callTime
            }
          }
        )
        //分组,开窗处理
        // 定义一个侧输出流的 标签
        val lateTage = new OutputTag[StationLog]("late")
        val mainStream = data.keyBy(_.sid)
          .timeWindow(Time.seconds(10), Time.seconds(5))
          // 注意: 只要符合 watermark < end-of-window + allowedLateness 之内达到
          // 的数据,都会被再次出发窗口的计算
          // 迟到之外的迟到数据会被放入侧输出流
          .allowedLateness(Time.seconds(5)) // 允许数据迟到5s
          // 迟到的数据, 输出另一个位置保存
          .sideOutputLateData(lateTage)
          .aggregate(new AggregateCount, new OutputResult)
        // 迟到很久的数据可以另外再处理
        mainStream.getSideOutput(lateTage).print("late")// 迟到很久的数据可以另外再处理
        mainStream.print("main")
        env.execute()
    
      }
    
      /**
        * 累加聚合操作
        */
      class AggregateCount extends AggregateFunction[StationLog, Long, Long] {
        // 创建累加器
        override def createAccumulator(): Long = 0
        // 累加器加值
        override def add(value: StationLog, accumulator: Long): Long = accumulator + 1
        // 获取累加器结果
        override def getResult(accumulator: Long): Long = accumulator
        //合并Value
        override def merge(a: Long, b: Long): Long = a + b
      }
    
      class OutputResult extends WindowFunction[Long, String, String, TimeWindow] {
        override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[String]): Unit = {
          var sb = new StringBuilder
          val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
          sb.append(s"时间范围: Start: ${format.format(window.getStart)},----${format.format(window.getEnd)}")
          sb.append("\n")
          sb.append("\n")
          sb.append("当前基站是:").append(key)
            .append(" 呼叫数量是: ").append(input.iterator.next())
          out.collect(sb.toString())
        }
      }
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,496评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,407评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,632评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,180评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,198评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,165评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,052评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,910评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,324评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,542评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,711评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,424评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,017评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,668评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,823评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,722评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,611评论 2 353

推荐阅读更多精彩内容