干货满满的 pyspark 笔记

反向代理的配置

在服务器中做如下配置:

server {                                                       
  listen 80;                                                   
  server_name test.aldwx.com;                                  
  location /app.launch.php {                                   
    proxy_pass http://127.0.0.1:3000;                          
  }                                                            
}

然后在服务器中的终端中输入

plackup -E deployment -s Starman --workers=1 -p 3000 -a app.pl

或者:

nohup plackup -E deployment -s Starman --workers=10 -p 3000 -a app.pl &

app.pl 是用 dancer 写的一个 demo 程序, 其中的内容如下:

#!/usr/bin/perl
use Digest::MD5 qw(md5_hex);
use Dancer;
use JSON qw(encode_json);

# return the json format data
get  "/app.launch.info.php" => sub {
  
  my $data = `date`; 
  my $token = md5_hex($data);   
  my  $r = {
    'error' => '0',
    'data' => {
       'access_token' => $token
    },
    };
    return encode_json($r);
};


get  '/hello'  => sub  {
    return "Hello 小程序";
};

get  '/l.html' => sub {
    return "Hello, World";
};

dance;

然后在浏览其中输入:

test.aldwx.com/app.launch.info.php

你会看到浏览器返给你返回一段 json 数据。

查看端口号被哪个进程占用

netstat -tulnp | grep ':80 '
tcp        0      0 0.0.0.0:80                  0.0.0.0:*                   LISTEN      4021/nginx.conf 

nginx: [error] open() "/alidata/server/nginx/logs/nginx.pid" failed (2: No such file or directory) 错误

在使用的阿里云服务器上,进程性的 nginx -s stop后再次启动nginx -s reload ,总是会报错误nginx: [error] open() "/alidata/server/nginx/logs/nginx.pid" failed (2: No such file or directory),这应该是因为把nginx进程杀死后pid丢失了,下一次再开启nginx -s reload时无法启动

nginx -c /path/to/config/nginx.conf

查看被占用的端口号

lsof -i | grep ":7077"
java       8927     root  230u  IPv6   100990      0t0  TCP 106.2.1.77:7077 (LISTEN)

php 环境搭建

sudo apt-get update
sudo apt-get install lib apache2-mod-php5 ...

.conf 文件的配置

.conf 文件一版放在 sites-available 目录中, 用的时候再软链接到 sites-enabled 中。

免密码登录服务器

在服务器上的 ./ssh 目录中添加 ssh key, 比如我更换了新电脑:

vim ~/.ssh/authorized_keys

删除掉旧的 ssh keys, 加入新生成的:

ssh-rsa AAACCQEEEEddfdfdffrrt3334QWAUUY1223xxxXXXXXXX wodefan@Mac-mini.local

从本地上传/下载文件/文件夹到服务器

上传和下载都是在本地打开命令行终端。

  • 上传
scp ./app.py root@108.2.5.98:/daly/apk/myapp/ # 从本地上传单个文件到服务器
scp -r myjobs root@108.2.5.98:/daly/apk/      # 从本地上传整个文件夹到服务器
  • 下载
scp root@108.2.5.98:/daly/apk/myapp/today.txt . # 从服务器下载指定文件到本地目录
scp -r root@108.2.5.98:/daly/apk/apk/ .         # 从服务器下载整个文件夹到本地 

在 pyspark 中连接数据库

  • 启动 pyspark 时附加上相应的 mysql jar 包:
/data/app/spark/bin/pyspark --driver-class-path mysql-connector-java-5.1.40-bin.jar --jars mysql-connector-java-5.1.40-bin.jar
  • 使用用户名和密码连接 mysql, 并创建一个数据框:
df = spark.read.format("jdbc").option("url", "jdbc:mysql://8.8.8.8/ald_xinen").option("dbtable", "(select * from ald_session_logs) as df").option('user', "root").option('password', "fish@sky").load()
df.take(2) # 取出前两行

如果已经启动了一个连接 mysql 数据库的 pyspark, 再重新启动一个时会报错, 这个时候就要把之前的启动的杀掉:

ps aux | grep 'spark'

然后找到 pyspark-shell 的 进程 id, kill -9 xxxx

  • toDF(*cols) 返回一新的数据框, 指定新的列名。
df.select("app_key", "uuid").toDF("f1", 'f2').take(2) 

输出:

[Row(f1=u'2323423dsfds', f2=u'14797409227806341000'), Row(f1=u'2323423dsfds', f2=u'14797409227806341000')]
  • toJSON() 将数据框转换为字符串 RDD。每一行都被转换为 JSON 文档, 作为所返回的 RDD 中的一个元素。
df.select("app_key", "uuid").toJSON().first()        

输出:

u'{"app_key":"2323423dsfds","uuid":"14797409227806341000"}'
  • toLocalIterator()

返回一个包含该数据框所有行的迭代器。这个迭代器会在该数据框的最大分区中消耗尽可能多的内存。

list(df.toLocalIterator())
  • withColumn(colName, col)

通过为原数据框添加一个新列替换已存在的同名列而返回一个新数据框。colName 是一个字符串, 为新列的名字。
col 为这个新列的 Column 表达式。withColumn 的第一个参数必须是已存在的列的名字, withColumn 的第二个参数必须是含有列的表达式。如果不是它会报错 AssertionError: col should be Column

df.withColumn('page_count', df.page_count+100).select("app_key","page_count").take(2)

输出:

[Row(app_key=u'2323423dsfds', page_count=110), Row(app_key=u'2323423dsfds', page_count=104)]

col 表达式可以使用多个其他列:

df.withColumn('avg', df.page_count/df.duration).select("app_key","avg").take(2)

输出:

[Row(app_key=u'2323423dsfds', avg=0.00012387736141220192), Row(app_key=u'2323423dsfds', avg=0.16666666666666666)]

withColumn 可以添加一个常数列, 但是要使用 pyspark.sql.functions 中的函数, 例如: unix_timestamp.

可以参考

  • withColumnRenamed(existing, new)

重命名已存在的列并返回一个新数据框。existing 为已存在的要重命名的列, col 为新列的名字。

duration 列重命名为 time_take 列:

df.withColumnRenamed('duration', 'time_take').select('app_key', 'time_take').take(2)

输出:

[Row(app_key=u'2323423dsfds', time_take=80725), Row(app_key=u'2323423dsfds', time_take=24)]

groupBy 之后可以使用的方法

  • agg(*exprs)

聚合计算并将结果返回为 DataFrame

可用的聚合函数有 avg, max, min, sum, 'count'.

如果 expr 是从字符串到字符串的单个 dict 映射, 那么其键就是要执行聚合的列, 其值就是该聚合函数。

可选地, expr 还可以是一组聚合 表达式。

参数: exprs - 一个从列名(字符串)到聚合函数(字符串)的字典映射, 或者是一个 Column 列表。

gdf = df.groupBy(df.app_key)
gdf.agg({"*" : 'count' } ).collect()

输出:

[Row(app_key=u'64a7f2b033fb593a8598bbc48c3b8486', count(1)=1), Row(app_key=u'AAAAAAAAAAAAAA', count(1)=1), Row(app_key=u'6c4396a7eec8f081e890ef0adbf5090d', count(1)=4), Row(app_key=u'7c8ccee05d025bb85f15f9d45c83aba8', count(1)=12), Row(app_key=u'6428c8aa20672b64386fc1d2ce60ef3f', count(1)=3), Row(app_key=u'2323423dsfds', count(1)=66), Row(app_key=u'21e689e6bb76a213b21fc61147db1bab', count(1)=490), Row(app_key=u'f05883cf4dd8c6016ac55d9380f568d2', count(1)=17), Row(app_key=u'dc9c30b87aa07834ffb7736f715b9caa', count(1)=5)]

对 app_key 排序下:

sorted(gdf.agg({"*": "count"}).collect())

输出:

[Row(app_key=u'21e689e6bb76a213b21fc61147db1bab', count(1)=490), Row(app_key=u'2323423dsfds', count(1)=66), Row(app_key=u'6428c8aa20672b64386fc1d2ce60ef3f', count(1)=3), Row(app_key=u'64a7f2b033fb593a8598bbc48c3b8486', count(1)=1), Row(app_key=u'6c4396a7eec8f081e890ef0adbf5090d', count(1)=4), Row(app_key=u'7c8ccee05d025bb85f15f9d45c83aba8', count(1)=12), Row(app_key=u'AAAAAAAAAAAAAA', count(1)=1), Row(app_key=u'dc9c30b87aa07834ffb7736f715b9caa', count(1)=5), Row(app_key=u'f05883cf4dd8c6016ac55d9380f568d2', count(1)=17)]
from pyspark.sql import functions as F
sorted(gdf.agg(F.max(df.duration)).collect())

输出:

[Row(app_key=u'21e689e6bb76a213b21fc61147db1bab', max(duration)=28498), Row(app_key=u'2323423dsfds', max(duration)=80725), Row(app_key=u'6428c8aa20672b64386fc1d2ce60ef3f', max(duration)=352), Row(app_key=u'64a7f2b033fb593a8598bbc48c3b8486', max(duration)=150), Row(app_key=u'6c4396a7eec8f081e890ef0adbf5090d', max(duration)=33), Row(app_key=u'7c8ccee05d025bb85f15f9d45c83aba8', max(duration)=168), Row(app_key=u'AAAAAAAAAAAAAA', max(duration)=23), Row(app_key=u'dc9c30b87aa07834ffb7736f715b9caa', max(duration)=87), Row(app_key=u'f05883cf4dd8c6016ac55d9380f568d2', max(duration)=7789)]
  • avg(*cols)

为每一组的每个数值列计算平均值。

meanavg 的别名。

参数: cols - 一组列的名字(字符串)。非数值列被忽略。

df.groupBy('app_key').avg('duration').collect()

输出:

[Row(app_key=u'64a7f2b033fb593a8598bbc48c3b8486', avg(duration)=150.0),Row(app_key=u'21e689e6bb76a213b21fc61147db1bab', avg(duration)=153.81428571428572) ...]
df.groupBy('app_key').avg('duration', 'page_count').collect()

输出:

[Row(app_key=u'64a7f2b033fb593a8598bbc48c3b8486', avg(duration)=150.0, avg(page_count)=12.0), avg(page_count)=5.681818181818182), Row(app_key=u'21e689e6bb76a213b21fc61147db1bab', avg(duration)=153.81428571428572, avg(page_count)=8.059183673469388) ...]
  • count()

计算每组的记录数。

sorted(df.groupBy(df.app_key).count().collect())

输出:

[Row(app_key=u'21e689e6bb76a213b21fc61147db1bab', count=490), Row(app_key=u'2323423dsfds', count=66) ...]
  • max(*cols)

计算每组每个数值列的最大值。

df.groupBy('app_key').max('duration').collect()

输出:

[Row(app_key=u'64a7f2b033fb593a8598bbc48c3b8486', max(duration)=150) ...]
df.groupBy('app_key').max('duration', 'page_count').collect()

输出:

[Row(app_key=u'64a7f2b033fb593a8598bbc48c3b8486', max(duration)=150, max(page_count)=12)...]
  • mean() 求平均数, 同 avg

  • min() 参考 max

  • pivot(pivot_col, values=None)

透视当前[[DataFrame]]的列并执行指定的聚合。有两个版本的pivot函数:一个需要调用者指定不同值的列表以便进行透视,而另一个不指定。后者更简洁但效率较低,因为Spark需要首先在内部计算不同值的列表。

参数: pivot_col - 要透视的列名; values - 将转换为输出 DataFrame 中的列的值列表。

计算每个 app 下,每个渠道的总停留时长, 每个渠道的结果占一列:

df.groupBy('app_key').pivot("source", ["1", "2", "3", "4" ]).sum("duration").collect()

输出:

[Row(app_key=u'64a7f2b033fb593a8598bbc48c3b8486', 1=None, 2=None, 3=None, 4=150), Row(app_key=u'21e689e6bb76a213b21fc61147db1bab', 1=7208, 2=54929, 3=7078, 4=6139), Row(app_key=u'dc9c30b87aa07834ffb7736f715b9caa', 1=None, 2=94, 3=29, 4=87) ...]

官方的例子:

# Compute the sum of earnings for each year by course with each course as a separate column
df4.groupBy("year").pivot("course", ["dotNET", "Java"]).sum("earnings").collect()
[Row(year=2012, dotNET=15000, Java=20000), Row(year=2013, dotNET=48000, Java=30000)]

或者不指定列的值(不够有效率)

df4.groupBy("year").pivot("course").sum("earnings").collect()
[Row(year=2012, Java=20000, dotNET=15000), Row(year=2013, Java=30000, dotNET=48000)]
  • sum(*cols)

为每组计算每个数值列的总和。

参数: cols - 一组列的名字(字符串)。非数值列被忽略。

# 官方例子:
df.groupBy().sum('age').collect()
# [Row(sum(age)=7)]
df3.groupBy().sum('age', 'height').collect()
# [Row(sum(age)=7, sum(height)=165)]
  • class pyspark.sql.Column(jc)

数据框中的一列。 Column 实例可以通过如下的代码创建:

#1. 选择数据框中的一列
df.colName
df["colName"]

#2. 从表达式创建
df.colName + 1
1 / df.colName 
  • alias(*alias)

返回这个列的新的别名或别名们(在表达式返回多列的情况下, 例如爆炸)。

df.select(df.duration.alias("time_take")).collect()
  • asc()

返回一个排序过的表达式, 基于给定列的名字的升序进行排列。

  • astype() 是 cast() 的别名。

  • between(lowerBound, upperBound)

如果这个表达式的值在给定的列的值之间,则计算结果为布尔真。

df.select(df.app_key, df.duration.between(60,80)).show(50)

show()函数输出前 20 行:

+--------------------+---------------------------------------+
|             app_key|((duration >= 60) AND (duration <= 80))|
+--------------------+---------------------------------------+
|        2323423dsfds|                                  false|
|        2323423dsfds|                                  false|
|        2323423dsfds|                                  false|
|        2323423dsfds|                                  false|
|        2323423dsfds|                                   true|
...
  • cast(dataType)

将列转换为 dataType 类型。

# 官方例子:
df.select(df.age.cast("string").alias('ages')).collect()
# [Row(ages=u'2'), Row(ages=u'5')]
df.select(df.age.cast(StringType()).alias('ages')).collect()
# [Row(ages=u'2'), Row(ages=u'5')]
# 将字符串转为 int 型
df.select(df.source.cast("int").alias('sources')).take(20)

输出:

[Row(sources=1), Row(sources=2),...]
  • desc()

根据给定列的名字的倒序返回排序后的表达式。

  • getField(name)

在StructField中按名称获取字段的表达式。

from pyspark.sql import Row
df = sc.parallelize([Row(r=Row(a=1, b="b"))]).toDF()
df.select(df.r.getField("b")).show()
+---+
|r.b|
+---+
|  b|
+---+
df.select(df.r.a).show()
+---+
|r.a|
+---+
|  1|
+---+
  • isNotNull()

如果当前的表达式不为 null 则为真。

  • isNull()

如果当前的表达式为 null 则为真。

  • isin(*cols)

如果这个表达式的值被包含在参数的计算值之中,则该表达式为真。

df[df.network_type.isin("Wifi", "4g")].select('app_key', 'network_type').take(2)

输出:

[Row(app_key=u'2323423dsfds', network_type=u'wifi'), Row(app_key=u'2323423dsfds', network_type=u'4g')]
  • name() 是 alias() 的别名

  • otherwise(value)

计算一组条件,并返回多个可能的结果表达式之一。如果未调用 Column.otherwise(),则为不匹配的条件返回 None。

有关使用示例,请参阅 pyspark.sql.functions.when()。

参数:value - 字面值或 Column 表达式。

from pyspark.sql import functions as F
df.select(df.duration, F.when(df.page_count > 10, 1).otherwise(0)).show()

输出:

+--------+---------------------------------------------+
|duration|CASE WHEN (page_count > 10) THEN 1 ELSE 0 END|
+--------+---------------------------------------------+
|   80725|                                            0|
|      24|                                            0|
|      21|                                            0|
|      21|                                            1|
|      66|                                            1|
|      17|                                            0|
|      31|                                            0|
|       1|                                            0|
|      80|                                            1|
|       3|                                            0|
|      11|                                            1|
|       6|                                            0|
|      23|                                            1|
|       4|                                            0|
|       2|                                            0|
|      23|                                            0|
|      60|                                            1|
|      16|                                            0|
|       4|                                            0|
|       4|                                            0|
+--------+---------------------------------------------+
  • from_unixtime

    将 unix 格式的时间戳转换为指定格式的日期

    from pyspark.sql.functions import from_unixtime
    
    df = spark.createDataFrame([('1486461323',)], ['start_time'])
    df.withColumn('day', from_unixtime( df.start_time , 'yyyy-MM-dd')).show()
    

    上面的代码中的createDataFrame 方法创建了一个只有一列的数据框, withColumn 方法为该数据框添加了一个新的名为 day 的列, from_unixtime 方法将该数据框中原来的 start_time 列转换为指定格式的日期。show() 方法打印输出。

  • pyspark.sql.functions.col(col)

    col 方法接收一个字符串列名作为参数, 根据指定的列名返回一个 Column. 作用和 df.columnName 相同。

  • pyspark.sql.functions.when(condition, value)

    计算一组条件并返回多个可能结果的表达式的其中之一。如果 Column.otherwise 没有被调用, 那么对未匹配的条件会返回 None

    参数:

    • condition - 一个布尔 Column 表达式。
    • value - 一个字面值, 或者一个 Column 表达式。
    df.select( when(df['age']==2, 3).otherwise(4).alias("age") ).collect()
    # [Row(age=3), Row(age=4)]
    
    df.select( when(df.age==2, df.age + 1).alias("age") ).collect()
    # [Row(age=3), Row(age=None)]-pyspark.sql.Column.when
    
  • pyspark.sql.Column.when(condition, value)

    计算一组条件并返回多个可能结果的表达式的其中之一。如果 Column.otherwise 没有被调用, 那么对未匹配的条件会返回 None。当 when 中有多个条件时,要用 &| 连接起来。Pyspark: multiple conditions in when clause

from pyspark.sql import functions as F
df.select(df.name, F.when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0)).show()

从 df 数据框中选择出 nameage 这两列。如果 age>4 则把 age 置为 1; 如果 age<3 就把 age 置为 -1, 否则置为 0。

+-----+------------------------------------------------------------+
| name|CASE WHEN (age > 4) THEN 1 WHEN (age < 3) THEN -1 ELSE 0 END|
+-----+------------------------------------------------------------+
|Alice|                                                          -1|
|  Bob|                                                           1|
+-----+------------------------------------------------------------+
  • left_outer join

    左连接之后如果有重复的列, 则使用 drop 删除不了重复的列。需要用 select

  • udf 方法

    定义一个 udf 方法, 用来返回今天的日期(yyyy-MM-dd):

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import datetime

# 定义一个 udf 函数 
def today(day):
    if day==None:
        return datetime.datetime.fromtimestamp(int(time.time())).strftime('%Y-%m-%d')
    else:
        return day

# 返回类型为字符串类型
udfday = udf(today, StringType())
# 使用
df.withColumn('day', udfday(df.day))
          

  • drop

    有时候使用 drop 来删除某一列时, 会出现错误, 而用 select 就可以。

将 map 转换为 select

之前一直使用 map 方法来做转换, 添加新的列:

 # 今天的新访问用户数, 访问人数, 访问次数
 _time_slot_df = spark.createDataFrame(slot_session_df.rdd.map(map_sessions)) \
                      .groupBy('app_key', 'day') \
                      .agg(
                           sum('is_first_open').alias('today_new_user_count'), # 新访问用户数
                           countDistinct('uuid').alias('today_user_count'),    # 访问人数
                           sum('page_count').alias('today_visits_count'),      # 访问次数
                       )
                         
# 对 session_logs 日志做 map 操作, 生成需要的字段    
def map_sessions(row):
    if row['is_first_open']:
        is_first_open = 1
    else:
        is_first_open = 0
        
    return Row(
        app_key       = row['app_key'],
        uuid          = row['uuid'],       # 访问人数
        is_first_open = is_first_open,     # 新访问用户数
        page_count    = row['page_count'], # 访问次数
        day        = datetime.datetime.fromtimestamp(int(row['start_time'])).strftime('%Y-%m-%d')
    )  

现在我们有 select, withColumn, when 来做:

        _result_active_df = aladdin_id_df.join(_result_df, _result_df.app_key==aladdin_id_df.app_key, 'left_outer') \
                                         .drop("_result_df.app_key","_result_df.uid")

先用 select 方法从数据框中选择出已经存在的列(s),这就是临时的新的数据框, 再用 withColumn 向该临时数据框中添加新的列。对于需要进行转换的列, 可以使用 when 方法来做运算。


Splitting a row in a PySpark Dataframe into multiple rows

  • pyspark.functions.sql.explode
# Create dummy data
df = sc.parallelize([(1, 2, 3, 'a b c'),
                     (4, 5, 6, 'd e f'),
                     (7, 8, 9, 'g h i')]).toDF(['col1', 'col2', 'col3','col4'])


# Explode column
from pyspark.sql.functions import split, explode
df.withColumn('col4',explode(split('col4',' '))).show()
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   2|   3|   a|
|   1|   2|   3|   b|
|   1|   2|   3|   c|
|   4|   5|   6|   d|
|   4|   5|   6|   e|
|   4|   5|   6|   f|
|   7|   8|   9|   g|
|   7|   8|   9|   h|
|   7|   8|   9|   i|
+----+----+----+----+

这样就不用使用 flatMap 函数了, 它代替了 map 函数:

# 每个电话号码一行     
def map_phones(row):
    r = []
    for phone in re.split(',', row['phones']):
        r.append(Row(
            app_key     = row['app_key'],
            phones      = phone,
            sms_content = row['sms_content']
        ))
    return r

一个 empty 引发的异常

在脚本中加入这么一句:

if yesterday_session_df.rdd.isEmpty: return

上面的 isEmpty 方法没有带圆括号,但是执行没有报错,但是这个语句后面的语句好像都没有执行。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,686评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,668评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,160评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,736评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,847评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,043评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,129评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,872评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,318评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,645评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,777评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,861评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,589评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,687评论 2 351

推荐阅读更多精彩内容