Transformation转换算子之Key-Value类型

分类:

  1. partitionBy() 按照K重新分区
  2. 自定义分区
  3. reduceByKey()按照K聚合V
  4. groupByKey()按照K重新分组
  5. reduceByKey和groupByKey区别
  6. aggregateByKey()按照K处理分区内和分区间逻辑
  7. foldByKey()分区内和分区间相同的aggregateByKey()
  8. combineByKey()转换结构后分区内和分区间操作

SparkContext

SparkContext 定义成在全局范围,配置如下;

  val conf=new SparkConf().setMaster("local[2]").setAppName("test")
  val sc=new SparkContext(conf)

partitionBy()

将RDD[K,V]中的K按照指定Partitioner重新进行分区;
如果原有的RDD和新的RDD是一致的话就不进行分区,否则会产生Shuffle过程。

定义一个集合,存放map元素(a-j);默认分区4个

    val list=List("a"->100,"b"->100,"c"->100,"d"->100,"e"->100,"f"->100,"g"->100,"h"->100,"i"->100,"j"->100)
    val rdd=sc.parallelize(list,4)

查看默认分区情况

    rdd.mapPartitionsWithIndex((index,it)=>{
      println(s"$index=${it.toList}")
      it
    }).collect

结果: 几乎比较均匀的分配到各个分区中

0=List((a,100), (b,100)) 
1=List((c,100), (d,100), (e,100))
2=List((f,100), (g,100))
3=List((h,100), (i,100), (j,100))

使用 partitionBy 按照key进行分区

partitionBy 源码,需要让指定一个分区器(Partitioner)

 def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
    if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
      throw new SparkException("HashPartitioner cannot partition array keys.")
    }
    if (self.partitioner == Some(partitioner)) {
      self
    } else {
      new ShuffledRDD[K, V, V](self, partitioner)
    }
  }

查看spark有那些分区器

  • HashPartitioner:默认的分区器,通过对key进行hash运算,取余分区数的方式计算分区
  • RangePartitioner:
  • PythonPartitioner:spark内部使用的,外部无法使用
  • 自定义分区:开发者能使用的只有HashPartitionerRangePartitioner两种,若都无法满足我们的需求,就只能自定义分区器了。

使用HashPartitioner 作为 partitionBy的分区器

//  HashPartitioner 需要指定一个分区数
val rdd2: RDD[(String, Int)] = rdd.partitionBy(new HashPartitioner(2))
//  查看分区情况
rdd2.mapPartitionsWithIndex((index,it)=>{
  println(s"$index=${it.toList}")
  it
}).collect

结果

1=List((a,100), (c,100), (e,100), (g,100), (i,100))
0=List((b,100), (d,100), (f,100), (h,100), (j,100))

注意:

  1. 包位置需要指定为org.apache.spark.HashPartitioner
  2. 需要指定一个分区数new HashPartitioner(分区数)
    完整代码
  @Test
  def partitionByTest(): Unit ={

    val list=List("a"->100,"b"->100,"c"->100,"d"->100,"e"->100,"f"->100,"g"->100,"h"->100,"i"->100,"j"->100)
    val rdd=sc.parallelize(list,4)


    rdd.mapPartitionsWithIndex((index,it)=>{
      println(s"$index=${it.toList}")
      it
    }).collect

    val rdd2: RDD[(String, Int)] = rdd.partitionBy(new HashPartitioner(2))

    rdd2.mapPartitionsWithIndex((index,it)=>{
      println(s"$index=${it.toList}")
      it
    }).collect

  }

源码分析HashPartitioner是如何进行分区的

class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
  // 将分区数传给numPartitions
  def numPartitions: Int = partitions
  // 这里才是重点;
  def getPartition(key: Any): Int = key match {
    // 通过模式匹配,判断key是否为null,若为null指定到0分区
    case null => 0
    // 获取 key的hashCode ; numPartitions 传入进来的分区数,也就是2
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }
  // 这个不太重要
  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

nonNegativeMod: 真正计算出分区的地方

// x:上面传入的 hashcode 
// mod :上面传入的分区数
 def nonNegativeMod(x: Int, mod: Int): Int = {
    // hashcode取余分区数  ,取余出来的可能会是一个负数
    // 例如:scala> "hadoop".hashCode %2
    // res10: Int = -1
    val rawMod = x % mod
    // 分区数肯定不能为负数,于是他做了这样的判断,if (rawMod < 0)
    // 比如 rawMod =-1 ;就使用 mod + rawMod = 1 ;否则 mod +0;
    // 这样就很好的解决了取余后为分区可能负数的情况了。
    rawMod + (if (rawMod < 0) mod else 0)
  }

自定义分区

上面说过,我们能使用spark 分区器的就有两种,HashPartitioner和RangePartitioner;很多时候根据业务的需求,需要自定义分区。如下数据: 需求要求 a,b,c华为一个分区,d,e,f换分为一个分区,剩下的分为一个分区。

 val list=List("a"->100,"b"->100,"c"->100,"d"->100,"e"->100,"f"->100,"g"->100,"h"->100,"i"->100,"j"->100)

依样画葫芦
我们也许不知道怎么自定义一个分区,那么可以看看spark 自带的是怎么写的;如HashPartitioner

class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

查看 HashPartitioner的父类(Partitioner)

abstract class Partitioner extends Serializable {
  def numPartitions: Int
  def getPartition(key: Any): Int
}

总结:

  • 继承 Partitioner类,它是一个抽象类。
  • 实现父类的numPartitions函数
  • 实现父类的getPartition 函数

自定义分区器

/**
 * 自定义分区器
 * partitions 默认为3
 * @param partitions
 */
class CustomPartitioner(partitions: Int) extends Partitioner{
  override def numPartitions: Int = partitions

  override def getPartition(key: Any): Int = key match {
    case "a"|"b"|"c" =>1
    case "d"|"e"|"f" =>2
    case _=>0
  }
}

使用自定义分区器

 @Test
  def partitionByTest(): Unit ={

    val list=List("a"->100,"b"->100,"c"->100,"d"->100,"e"->100,"f"->100,"g"->100,"h"->100,"i"->100,"j"->100)
    val rdd=sc.parallelize(list,4)

    val rdd3: RDD[(String, Int)] = rdd.partitionBy(new CustomPartitioner(3))

    rdd3.mapPartitionsWithIndex((index,it)=>{
      println(s"$index=${it.toList}")
      it
    }).collect

  }

结果

0=List((g,100), (h,100), (i,100), (j,100))
1=List((a,100), (b,100), (c,100))
2=List((d,100), (e,100), (f,100))

注意:若出现这种序列化问题

org.apache.spark.SparkException: Task not serializable

解决方式:

  1. CustomPartitioner 重新定义class文件创建
  2. 不要再 classobject 中创建(如下)
class Test{
 class CustomPartitioner(partitions: Int) extends Partitioner{
...
 }
}

抽离出class,在外面定义

class Test{
...
}
class CustomPartitioner(partitions: Int) extends Partitioner{
...
}
  1. 若在一个class文件中创建,请使外部实现Serializable接口
class Test extends Serializable {
 class CustomPartitioner(partitions: Int) extends Partitioner{
...
 }
}
  1. 实现Serializable接口之后,出现部分属性无法序列化,可以使用 @transient 注解忽略。
class Test extends Serializable {
 @transient
 val name="a"
 class CustomPartitioner(partitions: Int) extends Partitioner{
 ...
 }
}

该问题的原因:

Driver最终会将Task交给Executor进行执行,其中就需要进行将对象进行序列化,由于CustomPartitioner类在另一个class内部中,序列化CustomPartitioner就需要将外部类先进性序列化。而外部类并没有进行序列化,所以就报了这样的错。


reduceByKey()

功能说明:该操作可以将RDD[K,V]中的元素按照相同的K对V进行聚合。其存在多种重载形式,还可以设置新RDD的分区数。

  • 需求:考试成绩下来了,统计语文,数学,英语各成绩的总成绩
    val rdd1 = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)
val result= rdd1.reduceByKey((x, y) => {
  x+y
}).collect

println(result.toList)

结果

List((数学,69), (英语,162), (语文,100))
  • 原理分析
    查看分区情况
   rdd1.mapPartitionsWithIndex((index,it)=>{
      println(s"$index=${it.toList}")
      it
    }).collect

结果

0=List((语文,10), (语文,20), (数学,15), (语文,30), (数学,33))
1=List((英语,12), (语文,40), (数学,21), (英语,50), (英语,100))

原理图:

reduceByKey执行流程图

上传上去不太清楚的酱紫,我也

说明:

  1. 根据分区大小(这里设置分区数为2),设置将数据写入分布到各个分区中,
0=List((语文,10), (语文,20), (数学,15), (语文,30), (数学,33))
1=List((英语,12), (语文,40), (数学,21), (英语,50), (英语,100))
  1. 程序运行时会将数据写入缓冲区中(MapReduce流程差不多),缓冲区处于内存中,无法无限存入数据,所以会溢写入磁盘中。
  2. 在不影响程序最终结果的情况下使用combiner可以更好的提高效率,在reduceByKey中无论如何都会进行一次combiner(用于提高效率)。
combiner
  1. 对数据按照key进行分组,并再次调用 reduce程序代码(如下),对单个组的数据进行聚合运算
 val result2=rdd1.reduceByKeyLocally((x, y) => {x + y})
  1. 计算结果完成后再将数据溢写入磁盘。
  2. rdd2 类似于reduce,他会对分区类的数据再进进行聚合统计
reduce-rdd2
  1. 最终得到想要的数据结果
List((数学,69), (英语,162), (语文,100))
  • 完整代码
  @Test
  def reduceByKeyTest(): Unit ={
    val rdd1 = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)

    val result= rdd1.reduceByKey((x, y) => {
      x+y
    }).collect

    println(result.toList)

   rdd1.mapPartitionsWithIndex((index,it)=>{
      println(s"$index=${it.toList}")
      it
    }).collect

    val result2=rdd1.reduceByKeyLocally((x, y) => {
      x + y
    })

    println(result2.toList)
  }
  • 总结:

reduceByKey(func: (RDD Value值类型,RDD value值) => RDD value值): 根据key分组之后,所有该key的value值进行聚合
reduceByKey里面的函数是针对每个组的所有value值操作
reduceByKey 会经过一次shuffle

groupByKey()

groupByKey对每个key进行操作,但只生成一个seq,并不进行聚合。
该操作可以指定分区器或者分区数(默认使用HashPartitioner)

groupByKey 有三个重载方法

  • groupByKey(partitioner: Partitioner)
    指定一个分区器
  • groupByKey():
    底层实现就是调用了groupByKey(partitioner: Partitioner)
    默认的分区器为HashPartitioner
    分区器的分区数默认为最开始配置的大小(2)
  • groupByKey(numPartitions: Int)
    底层实现也是调用groupByKey(partitioner: Partitioner)
    并直接定分区器为HashPartitioner
    创建HashPartitioner分区器同时,也为其指定了分区数大小numPartitions

案例:使用默认的无参的groupByKey()

 @Test
  def groupByKeyTest(): Unit ={
    val rdd1 = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)
    val value: RDD[(String, Iterable[Int])] = rdd1.groupByKey()
    println(value.collect.toList)
  }

结果:

List((数学,CompactBuffer(15, 33, 21)), (英语,CompactBuffer(12, 50, 100)), (语文,CompactBuffer(10, 20, 30, 40)))

reduceByKey和groupByKey区别

  1. reduceByKey存在combiner行为,性能更高
  2. groupByKey不存在conbiner行为,性能比较低

工作中推荐使用reduceByKey这种高性能shuffle算子

aggregateByKey()

foldByKey()

在scala中也有fold()函数,与reduce()唯一的区别就是,reduce会把第一个列表中第一个元作为参数的默认值,而fold(),可以指定一个默认值,其他操作和fold与reduce没有什么不同。在spark中foldByKey()和reduceBykey()亦是如此。

foldByKey 参数说明

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    foldByKey(zeroValue, defaultPartitioner(self))(func)
}

这种((zeroValue: V)(func: (V, V) => V))语法称为:函数柯里化
(zeroValue: V):需要指定一个默认值;
(func: (V, V) => V):具体的操作逻辑

案例:
统计各科总成绩,校长心情比较好,决定在总成绩的分数上再加一百分

  @Test
  def foldByKeyTest(): Unit ={
    val rdd = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)

   // 使用 foldByKey 数据汇总
    val value: RDD[(String, Int)] = rdd.foldByKey(100)((v1, v2) => {
      v1 + v2
    })

    println(value.collect.toList)

  }

结果:

List((数学,269), (英语,262), (语文,300))

aggregateByKey()

combineByKey()

combineByKey 参数说明:

def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
  }
  • createCombiner(转换数据的结构)
    combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值。
    简单说明:在combiner阶段对每个组的第一个vlaue值进行转换
  • mergeValue(分区内)
    如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并。
    简单说明:combiner的聚合逻辑
  • mergeCombiners(分区间)
    由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()方法将各个分区的结果进行合并。
    简单说明:reduce的聚合逻辑

案例:
求每门学科的平均成绩
输入如下:

val rdd = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)

完整代码如下:

@Test
  def combineByKeyTest(): Unit ={
    val rdd = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)
    // 求每门学科的平均成绩
    val value: RDD[(String, (Int, Int))] = rdd.combineByKey(v => (v, 1), (c: (Int, Int), v: Int) => (c._1+v, c._2+1), (c1: (Int, Int), c2: (Int, Int)) => (c1._1+c2._1, c1._2+c2._2))

    //获得各科成绩的总分数和个数
    val result=value.collect.toList
    println(result)

    result.foreach(m=>{
      m match {
        case (curr,(totalScore,size)) => println(s"课程:$curr,总分:$totalScore,个数:$size,平均数:${totalScore.toDouble/size}")
      }
    })

结果:

List((数学,(69,3)), (英语,(162,3)), (语文,(100,4)))
课程:数学,总分:69,个数:3,平均数:23.0
课程:英语,总分:162,个数:3,平均数:54.0
课程:语文,总分:100,个数:4,平均数:25.0

代码分析(我觉得挺复杂的,有点太抽象了);
首先我们应该理解的时候,combineByKey各个阶段的操作都是针对于 value 而言,毕竟时BykeycombineByKey 中的C,V其实表示的含义就是传入的value,返回的也是一个value

val value: RDD[(String, (Int, Int))] = rdd.combineByKey(
 v => (v, 1),
 (c: (Int, Int), v: Int) => (c._1+v, c._2+1), (c1: (Int, Int), 
 c2: (Int, Int)) => (c1._1+c2._1, c1._2+c2._2)
)

第一个参数:createCombiner
createCombiner: V => C :他希望我们传入进来C的是一个value,就是一个个分数(10,20,15等),返回的V将作为下一个函数参数的C(mergeValue)
v => (v, 1)
此时结合我们的业务,统计平均数,我们首先得知道语文有多少个,数学有多少个,英语多少个。如何通过combineByKey来实现呢?结合createCombiner的特性在combiner阶段对每个组的第一个vlaue值进行转换,我们就可以将计算器(用1标识)存放到value
结果应该是这样的。

"语文"->(10,1),"语文"->(20,1),"数学"->(15,1),"语文"->(30,1),"数学"->(33,1),
"英语"->(12,1),"语文"->(40,1),"数学"->(21,1),"英语"->(50,1),"英语"->(100,1)

由于是在combiner阶段,在combiner阶段对每个组的第一个vlaue值进行转换
语文:

"语文"->(10,1)

数学:

"数学"->(15,1)

英语:

"英语"->(12,1)

然后就是第二个参数mergeValue,他的解释是combiner的聚合逻辑;待会解释是什么意思
先看看mergeValue需要指定哪些参数 : (C, V) => C,
C:就是上一个函数参数(createCombiner)返回的结果(如:(10,1),(20,1),(30,1))
V:表示带聚合的元素
返回的C将会作为下一个函数参数的C(mergeCombiners的参数C)。应该能明白了吧。

了解了mergeValue各个参数的意思及返回参数的意思之后,再次回到业务中。
(c: (Int, Int), v: Int) => (c._1+v, c._2+1), (c1: (Int, Int)
拿语文进行举例

"语文"->(10,1)

第一次:c._1 表示成绩也是 元素(10,1),v: 表示 下一个带聚合的元素(20,1+1), 结果就是(30,2)
第二次:c._1 表示上一个结果(30,2),v: 表示 下一个带聚合的元素(30,2+1),结果就是(60,3)
第二次:c._1 表示上一个结果(60,3),v: 表示 下一个带聚合的元素(40,3+1),结果就是(100,4)

最终结果

语文 -> (100,4)
数学 -> (69,3)
英语 -> (162,3)

mergeValue之后,最终溢写到磁盘

mergeCombiners 就比较简单了,就是一个reduce操作。
注意:我上面的方式是建立在一个分区情况下,多个分区也是一样的流程。
mergeCombiners 中就是将多个 分区进行最后的聚合处理。

原理图:

combineByKey原理图

我也是在学习阶段,理解可能没那么透彻,文章若有什么不对,希望可以指出来。

除了使用combineByKey可以使用reduceByKey的方式实现类似的功能,对比combineByKey还更简单一点。

  @Test
  def combineByKeyTest(): Unit ={
    val rdd = sc.parallelize(List("语文"->10,"语文"->20,"数学"->15,"语文"->30,"数学"->33,"英语"->12,"语文"->40,"数学"->21,"英语"->50,"英语"->100),2)

    // 通过 map方式实现一样的功能
    val rdd2=rdd.map(x=>{
      (x._1,(x._2,1))
    }).reduceByKey((v1,v2)=>{
      (v1._1+v2._1,v1._2+v2._2)
    })\
    println(rdd2.collect.toList)
 }

结果:

List((数学,(69,3)), (英语,(162,3)), (语文,(100,4)))

其实reduceByKey底层就是使用的是combineByKey
combineByKey 底层实现

def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
    // 注意这个函数
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
  }

reduceByKey 底层实现

 def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }

在往reduceByKey(defaultPartitioner(self), func)中点击

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    // 注意这个函数
   // (v: V) => v ;自己转自己,啥都没干。
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}

看到了吗? 都是使用的是combineByKeyWithClassTag来实现。


©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,492评论 6 513
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,048评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,927评论 0 358
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,293评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,309评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,024评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,638评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,546评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,073评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,188评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,321评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,998评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,678评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,186评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,303评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,663评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,330评论 2 358

推荐阅读更多精彩内容