Hudi on flink v0.7.0 使用遇到的问题及解决办法

测试用例参考原文:https://blog.csdn.net/Mathieu66/article/details/110389575

Hudi社区已经正式发布了hudi-0.7.0版本。hudi on flink 最早在0.6.1本版出现,当时还是在master分支,最终切换到了0.7.0分支并经过0.7.0-rc1和0.7.0-rc2两个版本的迭代后毕业。无疑hudi on flink 的支持,使得flink cdc+hudi实时数仓方案得到落实,为此对hudi on flink的开始使用调研及遇到的问题做以下分享。

一、环境介绍

这里以下环境版本示例:
1、hadoop-2.7.2
2、kafka_2.11-2.1.0
3、flink-1.11.2
4、hudi-0.7.0

二、官方源码编译

官方地址:https://hudi.apache.org/
gihub地址:https://github.com/apache/hudi

2.1、 源码下载
git clone https://github.com/apache/hudi.git && cd hudi
2.2、 源码编译

修改hudi/pom.xml
1> 切换release-0.7.0分支,修改pom.xml中hadoop对应集群版本
2> window环境下注释掉<module>hudi-integ-test</module>和<module>packaging/hudi-integ-test-bundle</module>
3> flink-1.11.2对应parquet版本为1.11.1,可修改pom.xml到对应版本(若flink/lib下存在其他本版以该版本保持一致),本次测试不修改保留1.10.1版本

mvn clean package -DskipTests

三、创建测试用例

3.1、创建hudi-conf.properties配置文件
hoodie.datasource.write.recordkey.field=uuid
hoodie.datasource.write.partitionpath.field=ts
bootstrap.servers=hadoop01:9092,hadoop02:9092,hadoop03:9092
hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator
hoodie.embed.timeline.server=false
hoodie.deltastreamer.schemaprovider.source.schema.file=hdfs:///tmpdir/hudi/test/config/flink/schema.avsc
hoodie.deltastreamer.schemaprovider.target.schema.file=hdfs:///tmpdir/hudi/test/config/flink/schema.avsc
3.2、创建schema.avsc文件
{
  "type":"record",
  "name":"stock_ticks",
  "fields":[{
     "name": "uuid",
     "type": "string"
  }, {
     "name": "ts",
     "type": "long"
  }, {
     "name": "symbol",
     "type": "string"
  },{
     "name": "year",
     "type": "int"
  },{
     "name": "month",
     "type": "int"
  },{
     "name": "high",
     "type": "double"
  },{
     "name": "low",
     "type": "double"
  },{
     "name": "key",
     "type": "string"
  },{
     "name": "close",
     "type": "double"
  }, {
     "name": "open",
     "type": "double"
  }, {
     "name": "day",
     "type":"string"
  }
]}
3.3、上传到hdfs文件系统
sudo -u hdfs hadoop fs -mkdir -p /tmpdir/hudi/test/config/flink
hadoop fs -put schema.avsc /tmpdir/hudi/test/config/flink/
hadoop fs -put hudi-conf.properties /tmpdir/hudi/test/config/flink/
3.4、创建kafka测试topic
#创建主题
/opt/apps/kafka/bin/kafka-topics.sh --create --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181/kafka --replication-factor 2 --partitions 3 --topic mytest
#查看主题
/opt/apps/kafka/bin/kafka-topics.sh --list --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181/kafka
#生产者(测试)
/opt/apps/kafka/bin/kafka-console-producer.sh --broker-list hadoop01:9092,hadoop02:9092,hadoop03:9092 --topic mytest

测试数据

{"close":0.27172637588467297,"day":"2","high":0.4493211149337879,"key":"840ef1","low":0.030714155934507215,"month":11,"open":0.7762668153935262,"symbol":"77c-40d6-8412-6859d4757727","ts":1608361070161,"uuid":"840ef1ff-b77c-40d6-8412-6859d4757727","year":120}

6、提交命令及参数介绍
/opt/flink-1.11.2/bin/flink run -c org.apache.hudi.HoodieFlinkStreamer -m yarn-cluster -d -yjm 2048 -ytm 3096 -ys 4 -ynm hudi_on_flink_test \
-p 1 -yD env.java.opts=" -XX:+TraceClassPaths  -XX:+TraceClassLoading" hudi-flink-bundle_2.11-0.7.0.jar --kafka-topic mytest --kafka-group-id hudi_on_flink \
--kafka-bootstrap-servers hadoop01:9092,hadoop02:9092,hadoop03:9092 --table-type COPY_ON_WRITE --target-base-path hdfs:///tmpdir/hudi/test/data/hudi_on_flink \
--target-table hudi_on_flink --props hdfs:///tmpdir/hudi/test/config/flink/hudi-conf.properties --checkpoint-interval 3000 \
--flink-checkpoint-path hdfs:///flink/hudi/hudi_on_flink_cp

HoodieFlinkStreamer参数介绍

参数 描述
--kafka-topic 必选,kafka source主题
--kafka-group-id 必选,kafka消费者组
--kafka-bootstrap-servers 必选,kafka bootstrap.servers 如 node1:9092,node2:9092,node3:9092
--flink-checkpoint-path 可选,flink checkpoint 路径
--flink-block-retry-times 可选,默认10,当最近的hoodie instant未完成时重试的次数
--flink-block-retry-interval 可选,默认1,当最近的hoodie instant未完成时,两次尝试之间的秒数
--target-base-path 必选,目标hoodie表的基本路径(如果路径不存在将被创建)
--target-table 必选,Hive中目标表的名称
--table-type 必选,表的类型。COPY_ON_WRITE 或 MERGE_ON_READ
--props 可选,属性配置文件的路径(local或hdfs)。有hoodie客户端、schema提供者、键生成器和数据源的配置。
--hoodie-conf 可选,可以在属性文件中设置的任何配置(使用参数"--props")也可以使用此参数传递命令行。
--source-ordering-field 可选,以决定如何打破输入数据中具有相同键的记录之间的联系字段。默认值:'ts'保存记录的unix时间戳
--payload-class 可选,HoodieRecordPayload的子类,它在GenericRecord上工作。实现你自己的,如果你想做一些事情而不是覆盖现有的值
--op 可选,接受以下值之一:UPSERT(默认),INSERT(当输入纯粹是新数据/插入时使用,以提高速度)
--filter-dupes 可选,默认false。是否应该在插入/大容量插入之前删除/过滤源中的重复记录
--commit-on-errors 可选,默认false。即使某些记录写入失败也要提交
--checkpoint-interval 可选,默认5000毫秒。Flink checkpoint间隔

四、发现问题

新搭建的测试环境未做过jar包的兼容性整合。job能正常启动后,当消费消息write_process时会发生如下异常:

java.io.IOException: Could not perform checkpoint 6 for operator write_process (1/1).
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:892)
    at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113)
    at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:137)
    at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 6 for operator write_process (1/1). Failure reason: Checkpoint was declined.
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:215)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879)
    ... 13 more
Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to upsert for commit time 20210125212048
    at org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:62)
    at org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor.execute(FlinkUpsertCommitActionExecutor.java:47)
    at org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.upsert(HoodieFlinkCopyOnWriteTable.java:66)
    at org.apache.hudi.table.HoodieFlinkCopyOnWriteTable.upsert(HoodieFlinkCopyOnWriteTable.java:58)
    at org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:110)
    at org.apache.hudi.operator.KeyedWriteProcessFunction.snapshotState(KeyedWriteProcessFunction.java:121)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
    at org.apache.hudi.operator.KeyedWriteProcessOperator.snapshotState(KeyedWriteProcessOperator.java:58)
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186)
    ... 23 more
Caused by: java.lang.RuntimeException: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError: org/apache/hudi/avro/HoodieAvroWriteSupport
    at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:121)
    at java.util.Iterator.forEachRemaining(Iterator.java:116)
    at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.lambda$execute$0(BaseFlinkCommitActionExecutor.java:120)
    at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
    at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:118)
    at org.apache.hudi.table.action.commit.BaseFlinkCommitActionExecutor.execute(BaseFlinkCommitActionExecutor.java:68)
    at org.apache.hudi.table.action.commit.AbstractWriteHelper.write(AbstractWriteHelper.java:55)
    ... 33 more
Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError: org/apache/hudi/avro/HoodieAvroWriteSupport
    at org.apache.hudi.execution.FlinkLazyInsertIterable.computeNext(FlinkLazyInsertIterable.java:73)
    at org.apache.hudi.execution.FlinkLazyInsertIterable.computeNext(FlinkLazyInsertIterable.java:38)
    at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
    ... 39 more
Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError: org/apache/hudi/avro/HoodieAvroWriteSupport
    at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:143)
    at org.apache.hudi.execution.FlinkLazyInsertIterable.computeNext(FlinkLazyInsertIterable.java:69)
    ... 41 more
Caused by: java.util.concurrent.ExecutionException: java.lang.NoClassDefFoundError: org/apache/hudi/avro/HoodieAvroWriteSupport
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
    ... 42 more
Caused by: java.lang.NoClassDefFoundError: org/apache/hudi/avro/HoodieAvroWriteSupport
    at org.apache.hudi.io.storage.HoodieFileWriterFactory.newParquetFileWriter(HoodieFileWriterFactory.java:59)
    at org.apache.hudi.io.storage.HoodieFileWriterFactory.getFileWriter(HoodieFileWriterFactory.java:47)
    at org.apache.hudi.io.HoodieCreateHandle.<init>(HoodieCreateHandle.java:85)
    at org.apache.hudi.io.HoodieCreateHandle.<init>(HoodieCreateHandle.java:66)
    at org.apache.hudi.io.CreateHandleFactory.create(CreateHandleFactory.java:34)
    at org.apache.hudi.execution.CopyOnWriteInsertHandler.consumeOneRecord(CopyOnWriteInsertHandler.java:83)
    at org.apache.hudi.execution.CopyOnWriteInsertHandler.consumeOneRecord(CopyOnWriteInsertHandler.java:40)
    at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
    at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

五、定位问题

5.1、查看编译的jar包

发现存在该类,于是猜测与集群中有jar冲突

5.2、是否和集群中有hudi相关jar冲突(该类属于hudi-common包)

通过 find /opt -name "*hudi*.jar" 查找。并未发现有相关的jar(曾切换到不同版本的hadoop集群测试结果一致异常)
结论:不是jar冲突引起。

5.3、打开classloadtrace查看类加载情况

添加 -yD env.java.opts="-XX:+TraceClassLoading -XX:+TraceClassPaths"参数,查看HoodieAvroWriteSupport 没有被加载。于是开始定位异常代码位置,猜测为静态代码中初始化该类时就已经异常导致,将 new AvroSchemaConverter().convert(schema)写到外面来定位具体的缺失类


结果:异常信息变为:Caused by: java.lang.ClassNotFoundException: org.apache.parquet.schema.Typemac

5.4、通过查看hudi-flink-bundle编译包,除了parquet-avro,其他parquet相关的依赖并没有被编译进来。

到编译的本地maven依赖仓库找出以下parquet相关jar拷贝到flink/lib下尝试解决


结果: 异常信息变为
Caused by: java.lang.NoClassDefFoundError: org/apache/parquet/hadoop/ParquetInputFormat
at org.apache.parquet.HadoopReadOptions$Builder.<init>(HadoopReadOptions.java:87)
所以由parquet相关包的异常导致NoClassDefFoundError: org/apache/hudi/avro/HoodieAvroWriteSupport

5.5、根据HadoopReadOptions类加载信息判断

查看classloadtrace信息已加载HadoopReadOptions来确认parquet-hadoop包已经被加载,所以确定ParquetInputFormat类(来自parquet-hadoop)的异常由其他缺失造成,于是查看HadoopReadOptions类中引入的类及ParquetInputFormat继承的父类等。最终发现ParquetInputFormat继承的父类org.apache.hadoop.mapreduce.lib.input.FileInputFormat(所属hadoop-mapreduce-client-coreye-x.x.x.jar)并未被加载。

5.6、将hadoop-mapreduce-client-core-x.x.x.jar拷贝到flink/lib下解决

重新提交任务。若发现有如下异常,删除hdfs表目录下的文件重新提交(由于之前步骤的异常造成的损坏)。

java.io.IOException: Could not perform checkpoint 3 for operator instant_generator (1/1).
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:892)
    at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113)
    at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:137)
    at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93)
    at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:566)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException: Last instant costs more than 10 second, stop task now
    at org.apache.hudi.operator.InstantGenerateOperator.doCheck(InstantGenerateOperator.java:199)
    at org.apache.hudi.operator.InstantGenerateOperator.prepareSnapshotPreBarrier(InstantGenerateOperator.java:119)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.prepareSnapshotPreBarrier(OperatorChain.java:266)
    at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:249)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:921)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:911)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:879)
    ... 13 more
5.7 解决问题

问题总结

上述问题基本是由于集群没有相关jar包导致,根据上面的问题定位方法能解决大部分在遇到NoClassDefFoundError、ClassNotFoundException等异常问题。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351

推荐阅读更多精彩内容