Apache Spark 2.2.0 中文文档 - SparkR (R on Spark) | ApacheCN

SparkR (R on Spark)

概述

SparkDataFrame

启动: SparkSession

从 RStudio 来启动

创建 SparkDataFrames

从本地的 data frames 来创建 SparkDataFrames

从 Data Sources(数据源)创建 SparkDataFrame

从 Hive tables 来创建 SparkDataFrame

SparkDataFrame 操作

Selecting rows(行), columns(列)

Grouping, Aggregation(分组, 聚合)

Operating on Columns(列上的操作)

应用 User-Defined Function(UDF 用户自定义函数)

Run a given function on a large dataset usingdapplyordapplyCollect

dapply

dapplyCollect

Run a given function on a large dataset grouping by input column(s) and usinggapplyorgapplyCollect(在一个大的 dataset 上通过 input colums(输入列)来进行 grouping(分组)并且使用gapplyorgapplyCollect来运行一个指定的函数)

gapply

gapplyCollect

使用spark.lapply分发运行一个本地的 R 函数

spark.lapply

SparkR 中运行 SQL 查询

机器学习

算法

分类

回归

聚类

协同过滤

频繁模式挖掘

统计

模型持久化

R和Spark之间的数据类型映射

Structured Streaming

R 函数名冲突

迁移指南

SparkR 1.5.x 升级至 1.6.x

SparkR 1.6.x 升级至 2.0

升级至 SparkR 2.1.0

升级至 SparkR 2.2.0

概述

SparkR 是一个 R package, 它提供了一个轻量级的前端以从 R 中使用 Apache Spark. 在 Spark 2.2.0 中, SparkR 提供了一个分布式的 data frame, 它实现了像 selection, filtering, aggregation etc 一系列所支持的操作.(dplyr与 R data frames 相似) ), 除了可用于海量数据上之外. SparkR 还支持使用 MLlib 来进行分布式的 machine learning(机器学习).

SparkDataFrame

SparkDataFrame 是一个分布式的, 将数据映射到有名称的 colums(列)的集合. 在概念上 相当于关系数据库中的table表或 R 中的 data frame,但在该引擎下有更多的优化. SparkDataFrames 可以从各种来源构造,例如: 结构化的数据文件,Hive 中的表,外部数据库或现有的本地 R data frames.

All of the examples on this page use sample data included in R or the Spark distribution and can be run using the./bin/sparkRshell.

启动: SparkSession

SparkR 的入口点是SparkSession, 它会连接您的 R 程序到 Spark 集群中. 您可以使用sparkR.session来创建SparkSession, 并传递诸如应用程序名称, 依赖的任何 spark 软件包等选项, 等等. 此外,还可以通过SparkSession来与SparkDataFrames一起工作。 如果您正在使用sparkRshell,那么SparkSession应该已经被创建了,你不需要再调用sparkR.session.

sparkR.session()

从 RStudio 来启动

您可以从 RStudio 中来启动 SparkR. 您可以从 RStudio, R shell, Rscript 或者 R IDEs 中连接你的 R 程序到 Spark 集群中去. 要开始, 确保已经在环境变量中设置好 SPARK_HOME (您可以检测下Sys.getenv), 加载 SparkR package, 并且像下面一样调用sparkR.session. 它将检测 Spark 的安装, 并且, 如果没有发现, 它将自动的下载并且缓存起来. 当然,您也可以手动的运行install.spark.

为了调用sparkR.session, 您也可以指定某些 Spark driver 的属性. 通常哪些应用程序属性运行时环境不能以编程的方式来设置, 这是因为 driver 的 JVM 进程早就已经启动了, 在这种情况下 SparkR 会帮你做好准备. 要设置它们, 可以像在sparkConfig参数中的其它属性一样传递它们到sparkR.session()中去.

if(nchar(Sys.getenv("SPARK_HOME"))<1){Sys.setenv(SPARK_HOME="/home/spark")}library(SparkR,lib.loc=c(file.path(Sys.getenv("SPARK_HOME"),"R","lib")))sparkR.session(master="local[*]",sparkConfig=list(spark.driver.memory="2g"))

下面的 Spark driver 属性可以 从 RStudio 的 sparkR.session 的 sparkConfig 中进行设置:

Property Name<(属性名称)Property group(属性分组)spark-submitequivalent

spark.masterApplication Properties--master

spark.yarn.keytabApplication Properties--keytab

spark.yarn.principalApplication Properties--principal

spark.driver.memoryApplication Properties--driver-memory

spark.driver.extraClassPathRuntime Environment--driver-class-path

spark.driver.extraJavaOptionsRuntime Environment--driver-java-options

spark.driver.extraLibraryPathRuntime Environment--driver-library-path

创建 SparkDataFrames

有了一个SparkSession之后, 可以从一个本地的 R data frame,Hive 表, 或者其它的data sources中来创建SparkDataFrame应用程序.

从本地的 data frames 来创建 SparkDataFrames

要创建一个 data frame 最简单的方式是去转换一个本地的 R data frame 成为一个 SparkDataFrame. 我们明确的使用as.DataFrame或createDataFrame并且经过本地的 R data frame 中以创建一个 SparkDataFrame. 例如, 下面的例子基于 R 中已有的faithful来创建一个SparkDataFrame.

df<-as.DataFrame(faithful)# 展示第一个 SparkDataFrame 的内容head(df)##  eruptions waiting##1    3.600      79##2    1.800      54##3    3.333      74

从 Data Sources(数据源)创建 SparkDataFrame

SparkR 支持通过SparkDataFrame接口对各种 data sources(数据源)进行操作. 本节介绍使用数据源加载和保存数据的常见方法. 您可以查看 Spark Sql 编程指南的specific options部分以了解更多可用于内置的 data sources(数据源)内容.

从数据源创建 SparkDataFrames 常见的方法是read.df. 此方法将加载文件的路径和数据源的类型,并且将自动使用当前活动的 SparkSession. SparkR 天生就支持读取 JSON, CSV 和 Parquet 文件, 并且通过可靠来源的软件包第三方项目, 您可以找到 Avro 等流行文件格式的 data source connectors(数据源连接器). 可以用spark-submit或sparkR命令指定--packages来添加这些包, 或者在交互式 R shell 或从 RStudio 中使用sparkPackages参数初始化SparkSession.

sparkR.session(sparkPackages="com.databricks:spark-avro_2.11:3.0.0")

We can see how to use data sources using an example JSON input file. Note that the file that is used here isnota typical JSON file. Each line in the file must contain a separate, self-contained valid JSON object. For more information, please seeJSON Lines text format, also called newline-delimited JSON. As a consequence, a regular multi-line JSON file will most often fail.

我们可以看看如何使用 JSON input file 的例子来使用数据源. 注意, 这里使用的文件是not一个经典的 JSON 文件. 文件中的每行都必须包含一个单独的,独立的有效的JSON对象

people<-read.df("./examples/src/main/resources/people.json","json")head(people)##  age    name##1  NA Michael##2  30    Andy##3  19  Justin# SparkR 自动从 JSON 文件推断出 schema(模式)printSchema(people)# root#  |-- age: long (nullable = true)#  |-- name: string (nullable = true)# 同样, 使用  read.json 读取多个文件people<-read.json(c("./examples/src/main/resources/people.json","./examples/src/main/resources/people2.json"))

该 data sources API 原生支持 CSV 格式的 input files(输入文件). 要了解更多信息请参阅 SparkRread.dfAPI 文档.

df<-read.df(csvPath,"csv",header="true",inferSchema="true",na.strings="NA")

该 data sources API 也可用于将 SparkDataFrames 存储为多个 file formats(文件格式). 例如, 我们可以使用write.df把先前的示例的 SparkDataFrame 存储为一个 Parquet 文件.

write.df(people,path="people.parquet",source="parquet",mode="overwrite")

从 Hive tables 来创建 SparkDataFrame

您也可以从 Hive tables(表)来创建 SparkDataFrames. 为此,我们需要创建一个具有 Hive 支持的 SparkSession,它可以访问 Hive MetaStore 中的 tables(表). 请注意, Spark 应该使用Hive support来构建,更多细节可以在SQL 编程指南中查阅.

sparkR.session()sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")# Queries can be expressed in HiveQL.results<-sql("FROM src SELECT key, value")# results is now a SparkDataFramehead(results)##  key  value## 1 238 val_238## 2  86  val_86## 3 311 val_311

SparkDataFrame 操作

SparkDataFrames 支持一些用于结构化数据处理的 functions(函数). 这里我们包括一些基本的例子,一个完整的列表可以在API文档中找到:

Selecting rows(行), columns(列)

# Create the SparkDataFramedf<-as.DataFrame(faithful)# 获取关于 SparkDataFrame 基础信息df## SparkDataFrame[eruptions:double, waiting:double]# Select only the "eruptions" columnhead(select(df,df$eruptions))##  eruptions##1    3.600##2    1.800##3    3.333# You can also pass in column name as stringshead(select(df,"eruptions"))# Filter the SparkDataFrame to only retain rows with wait times shorter than 50 minshead(filter(df,df$waiting<50))##  eruptions waiting##1    1.750      47##2    1.750      47##3    1.867      48

Grouping, Aggregation(分组, 聚合)

SparkR data frames 支持一些常见的, 用于在 grouping(分组)数据后进行 aggregate(聚合)的函数. 例如, 我们可以在faithfuldataset 中计算waiting时间的直方图, 如下所示.

# We use the `n` operator to count the number of times each waiting time appearshead(summarize(groupBy(df,df$waiting),count=n(df$waiting)))##  waiting count##1      70    4##2      67    1##3      69    2# We can also sort the output from the aggregation to get the most common waiting timeswaiting_counts<-summarize(groupBy(df,df$waiting),count=n(df$waiting))head(arrange(waiting_counts,desc(waiting_counts$count)))##  waiting count##1      78    15##2      83    14##3      81    13

Operating on Columns(列上的操作)

SparkR 还提供了一些可以直接应用于列进行数据处理和 aggregatation(聚合)的函数. 下面的例子展示了使用基本的算术函数.

# Convert waiting time from hours to seconds.# Note that we can assign this to a new column in the same SparkDataFramedf$waiting_secs<-df$waiting*60head(df)##  eruptions waiting waiting_secs##1    3.600      79        4740##2    1.800      54        3240##3    3.333      74        4440

应用 User-Defined Function(UDF 用户自定义函数)

在 SparkR 中, 我们支持几种 User-Defined Functions:

Run a given function on a large dataset usingdapplyordapplyCollect

dapply

应用一个 function(函数)到SparkDataFrame的每个 partition(分区). 应用于SparkDataFrame每个 partition(分区)的 function(函数)应该只有一个参数, 它中的data.frame对应传递的每个分区. 函数的输出应该是一个data.frame. Schema 指定生成的SparkDataFramerow format. 它必须匹配返回值的data types.

# Convert waiting time from hours to seconds.# Note that we can apply UDF to DataFrame.schema<-structType(structField("eruptions","double"),structField("waiting","double"),structField("waiting_secs","double"))df1<-dapply(df,function(x){x<-cbind(x,x$waiting*60)},schema)head(collect(df1))##  eruptions waiting waiting_secs##1    3.600      79        4740##2    1.800      54        3240##3    3.333      74        4440##4    2.283      62        3720##5    4.533      85        5100##6    2.883      55        3300

dapplyCollect

像dapply那样, 应用一个函数到SparkDataFrame的每个分区并且手机返回结果. 函数的输出应该是一个data.frame. 但是, 不需要传递 Schema. 注意, 如果运行在所有分区上的函数的输出不能 pulled(拉)到 driver 的内存中过去, 则dapplyCollect会失败.

# Convert waiting time from hours to seconds.# Note that we can apply UDF to DataFrame and return a R's data.frameldf<-dapplyCollect(df,function(x){x<-cbind(x,"waiting_secs"=x$waiting*60)})head(ldf,3)##  eruptions waiting waiting_secs##1    3.600      79        4740##2    1.800      54        3240##3    3.333      74        4440

Run a given function on a large dataset grouping by input column(s) and usinggapplyorgapplyCollect(在一个大的 dataset 上通过 input colums(输入列)来进行 grouping(分组)并且使用gapplyorgapplyCollect来运行一个指定的函数)

gapply

应用给一个函数到SparkDataFrame的每个 group. 该函数被应用到SparkDataFrame的每个 group, 并且应该只有两个参数: grouping key 和 Rdata.frame对应的 key. 该 groups 从SparkDataFrame的 columns(列)中选择. 函数的输出应该是data.frame. Schema 指定生成的SparkDataFramerow format. 它必须在 Sparkdata types 数据类型的基础上表示 R 函数的输出 schema(模式). 用户可以设置返回的data.frame列名.

# Determine six waiting times with the largest eruption time in minutes.schema<-structType(structField("waiting","double"),structField("max_eruption","double"))result<-gapply(df,"waiting",function(key,x){y<-data.frame(key,max(x$eruptions))},schema)head(collect(arrange(result,"max_eruption",decreasing=TRUE)))##    waiting  max_eruption##1      64      5.100##2      69      5.067##3      71      5.033##4      87      5.000##5      63      4.933##6      89      4.900

gapplyCollect

像gapply那样, 将函数应用于SparkDataFrame的每个分区,并将结果收集回 R data.frame. 函数的输出应该是一个data.frame. 但是,不需要传递 schema(模式). 请注意,如果在所有分区上运行的 UDF 的输出无法 pull(拉)到 driver 的内存, 那么gapplyCollect可能会失败.

# Determine six waiting times with the largest eruption time in minutes.result<-gapplyCollect(df,"waiting",function(key,x){y<-data.frame(key,max(x$eruptions))colnames(y)<-c("waiting","max_eruption")y})head(result[order(result$max_eruption,decreasing=TRUE),])##    waiting  max_eruption##1      64      5.100##2      69      5.067##3      71      5.033##4      87      5.000##5      63      4.933##6      89      4.900

使用spark.lapply分发运行一个本地的 R 函数

spark.lapply

类似于本地 R 中的lapply,spark.lapply在元素列表中运行一个函数,并使用 Spark 分发计算. 以类似于doParallel或lapply的方式应用于列表的元素. 所有计算的结果应该放在一台机器上. 如果不是这样, 他们可以像df < - createDataFrame(list)这样做, 然后使用dapply.

# Perform distributed training of multiple models with spark.lapply. Here, we pass# a read-only list of arguments which specifies family the generalized linear model should be.families<-c("gaussian","poisson")train<-function(family){model<-glm(Sepal.Length~Sepal.Width+Species,iris,family=family)summary(model)}# Return a list of model's summariesmodel.summaries<-spark.lapply(families,train)# Print the summary of each modelprint(model.summaries)

SparkR 中运行 SQL 查询

A SparkDataFrame can also be registered as a temporary view in Spark SQL and that allows you to run SQL queries over its data. Thesqlfunction enables applications to run SQL queries programmatically and returns the result as aSparkDataFrame.

# Load a JSON filepeople<-read.df("./examples/src/main/resources/people.json","json")# Register this SparkDataFrame as a temporary view.createOrReplaceTempView(people,"people")# SQL statements can be run by using the sql methodteenagers<-sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")head(teenagers)##    name##1 Justin

机器学习

算法

SparkR 现支持下列机器学习算法:

分类

spark.logit:逻辑回归 Logistic Regression

spark.mlp:多层感知 (MLP)

spark.naiveBayes:朴素贝叶斯

spark.svmLinear:线性支持向量机

回归

spark.survreg:加速失败时间生存模型 Accelerated Failure Time (AFT) Survival Model

spark.glmorglm:广义线性模型 Generalized Linear Model (GLM)

spark.isoreg:保序回归

spark.gbt:梯度提升树 for回归and分类

spark.randomForest:随机森林 for回归and分类

聚类

spark.bisectingKmeans:二分k均值

spark.gaussianMixture:高斯混合模型 (GMM)

spark.kmeans:K-Means

spark.lda:隐含狄利克雷分布 (LDA)

协同过滤

spark.als:交替最小二乘 (ALS)

频繁模式挖掘

spark.fpGrowth:FP-growth

统计

spark.kstest:柯尔莫哥洛夫-斯米尔诺夫检验

SparkR 底层实现使用 MLlib 来训练模型. 有关示例代码,请参阅MLlib用户指南的相应章节. 用户可以调用summary输出拟合模型的摘要, 利用模型对数据进行预测, 并且使用write.ml/read.ml来 保存/加载拟合的模型 . SparkR 支持对模型拟合使用部分R的公式运算符, 包括 ‘~’, ‘.’, ‘:’, ‘+’, 和 ‘-‘.

模型持久化

下面的例子展示了SparkR如何 保存/加载 机器学习模型.

training<-read.df("data/mllib/sample_multiclass_classification_data.txt",source="libsvm")# Fit a generalized linear model of family "gaussian" with spark.glmdf_list<-randomSplit(training,c(7,3),2)gaussianDF<-df_list[[1]]gaussianTestDF<-df_list[[2]]gaussianGLM<-spark.glm(gaussianDF,label~features,family="gaussian")# Save and then load a fitted MLlib modelmodelPath<-tempfile(pattern="ml",fileext=".tmp")write.ml(gaussianGLM,modelPath)gaussianGLM2<-read.ml(modelPath)# Check model summarysummary(gaussianGLM2)# Check model predictiongaussianPredictions<-predict(gaussianGLM2,gaussianTestDF)head(gaussianPredictions)unlink(modelPath)

Find full example code at "examples/src/main/r/ml/ml.R" in the Spark repo.

R和Spark之间的数据类型映射

RSpark

bytebyte

integerinteger

floatfloat

doubledouble

numericdouble

characterstring

stringstring

binarybinary

rawbinary

logicalboolean

POSIXcttimestamp

POSIXlttimestamp

Datedate

arrayarray

listarray

envmap

Structured Streaming

SparkR 支持 Structured Streaming API (测试阶段). Structured Streaming 是一个 构建于SparkSQL引擎之上的易拓展、可容错的流式处理引擎. 更多信息请参考 R APIStructured Streaming Programming Guide

R 函数名冲突

当在R中加载或引入(attach)一个新package时, 可能会发生函数名冲突,一个函数掩盖了另一个函数

下列函数是被SparkR所掩盖的:

被掩盖函数如何获取

covinpackage:statsstats::cov(x, y = NULL, use = "everything",

method = c("pearson", "kendall", "spearman"))

filterinpackage:statsstats::filter(x, filter, method = c("convolution", "recursive"),

sides = 2, circular = FALSE, init)

sampleinpackage:basebase::sample(x, size, replace = FALSE, prob = NULL)

由于SparkR的一部分是在dplyr软件包上建模的,因此SparkR中的某些函数与dplyr中同名. 根据两个包的加载顺序, 后加载的包会掩盖先加载的包的部分函数. 在这种情况下, 可以在函数名前指定包名前缀, 例如:SparkR::cume_dist(x)ordplyr::cume_dist(x).

你可以在 R 中使用search()检查搜索路径

迁移指南

SparkR 1.5.x 升级至 1.6.x

在Spark 1.6.0 之前, 写入模式默认值为append. 在 Spark 1.6.0 改为error匹配 Scala API.

SparkSQL 将R 中的NA转换为null,反之亦然.

SparkR 1.6.x 升级至 2.0

table方法已经移除并替换为tableToDF.

类DataFrame已改名为SparkDataFrame避免名称冲突.

Spark的SQLContext和HiveContext已经过时并替换为SparkSession. 相应的摒弃sparkR.init()而通过调用sparkR.session()来实例化SparkSession. 一旦实例化完成, 当前的SparkSession即可用于SparkDataFrame 操作(注释:spark2.0开始所有的driver实例通过sparkSession来进行构建).

sparkR.session不支持sparkExecutorEnv参数.要为executors设置环境,请使用前缀”spark.executorEnv.VAR_NAME”设置Spark配置属性,例如”spark.executorEnv.PATH”, -sqlContext不再需要下列函数:createDataFrame,as.DataFrame,read.json,jsonFile,read.parquet,parquetFile,read.text,sql,tables,tableNames,cacheTable,uncacheTable,clearCache,dropTempTable,read.df,loadDF,createExternalTable.

registerTempTable方法已经过期并且替换为createOrReplaceTempView.

dropTempTable方法已经过期并且替换为dropTempView.

scSparkContext 参数不再需要下列函数:setJobGroup,clearJobGroup,cancelJobGroup

升级至 SparkR 2.1.0

join不再执行笛卡尔积计算, 使用crossJoin来进行笛卡尔积计算.

升级至 SparkR 2.2.0

createDataFrame和as.DataFrame添加numPartitions参数. 数据分割时, 分区位置计算已经与scala计算相一致.

方法createExternalTable已经过期并且替换为createTable. 可以调用这两种方法来创建外部或托管表. 已经添加额外的 catalog 方法.

默认情况下,derby.log现在已保存到tempdir()目录中. 当实例化SparkSession且选项enableHiveSupport 为TRUE,会创建derby.log .

更正spark.lda错误设置优化器的bug.

更新模型概况输出coefficientsasmatrix. 更新的模型概况包括spark.logit,spark.kmeans,spark.glm.spark.gaussianMixture的模型概况已经添加对数概度(log-likelihood)loglik.

我们一直在努力

apachecn/spark-doc-zh

原文地址: http://spark.apachecn.org/docs/cn/2.2.0/sparkr.html

网页地址: http://spark.apachecn.org/

github: https://github.com/apachecn/spark-doc-zh(觉得不错麻烦给个 Star,谢谢!~)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,172评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,346评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,788评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,299评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,409评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,467评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,476评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,262评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,699评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,994评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,167评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,499评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,149评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,387评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,028评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,055评论 2 352

推荐阅读更多精彩内容