SparkSQL编程实战

Spark SQL

DataFrame 的创建以及基本操作

DataFrame可以理解成关系型数据库中的表,它与 RDD 的差别在于 DataFrame 有 schema 信息

public class DataFrameCreate {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("DataFrame");
        JavaSparkContext sc = new JavaSparkContext(conf);

        SQLContext sqlContext = new SQLContext(sc);

        DataFrame dataFrame = sqlContext.read()
                .json(Thread.currentThread().getContextClassLoader().getResource("student.txt").getPath());

        // 打印 DataFrame
        dataFrame.show();
        // 打印 DataFrame 的 Schema 信息
        dataFrame.printSchema();
        // 查询某一列的数据
        dataFrame.select(dataFrame.col("name")).show();
        // 查询某几列的数据,并对列进行计算
        dataFrame.select(dataFrame.col("name"), dataFrame.col("age").plus(1)).show();
        // 根据某一列的值进行过滤
        dataFrame.filter(dataFrame.col("age").gt(20)).show();
        // 根据某一列进行分组,然后再进行聚合
        dataFrame.groupBy(dataFrame.col("name")).count().show();
    }
}
object DataFrameCreate extends App {
    val conf = new SparkConf().setMaster("local").setAppName("DataFrame")
    val sc = SparkContext.getOrCreate(conf)

    // 创建 SQLContext
    val sqlContext = SQLContext.getOrCreate(sc)
    // 创建 DataFrame 对象
    val dataFrame = sqlContext.read.json(Thread.currentThread().getContextClassLoader.getResource("student.txt").getPath)
    dataFrame.show()
    dataFrame.printSchema()
    dataFrame.select(dataFrame.col("name")).show()
    dataFrame.select(dataFrame.col("name"), dataFrame.col("age").plus(1)).show()
    dataFrame.filter(dataFrame.col("age").gt(20)).show()
    dataFrame.groupBy(dataFrame.col("name")).count().show()
}

RDD转换为DataFrame

使用反射的方式将RDD转换为DataFrame

/**
 * Java语言实现
 * 通过反射的方式将RDD转换为DataFrame
 */
public class RDD2DataFrameReflection {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrameReflection");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);

        // 读取文件生成RDD,然后将RDD中的line转换为Person类型
        JavaRDD<Person> initRDD = sc.textFile(Thread.currentThread().getContextClassLoader().getResource("person.txt").getPath())
                .map(line -> line.split(" "))
                .map(array -> new Person(array[0], Integer.parseInt(array[1]), array[2]));

        // 通过反射创建DataFrame,然后注册成一张临时表(person)
        sqlContext.createDataFrame(initRDD, Person.class).registerTempTable("person");

        // 执行sql查询,查询出年龄大于18岁的数据
        DataFrame personDataFrame = sqlContext.sql("select name,age,sex from person where age > 18");

        // 打印数据
        personDataFrame.show();

        // 将DataFrame再次转换成RDD,此时RDD中的数据类型为RDD,需要将其转换成Person类型。最后打印结果。
        personDataFrame.javaRDD()
                .map(row -> new Person(row.getString(0), row.getInt(1), row.getString(2)))
                .collect()
                .forEach(System.out::println);
    }


    /**
     * 使用反射将RDD转换为DataFrame时,JavaBean的class必须使用public修饰,并且需要实现Serializable接口
     */
    public static class Person implements Serializable {
        private String name;
        private Integer age;
        private String sex;

        public Person() {
        }

        public Person(String name, Integer age, String sex) {
            this.name = name;
            this.age = age;
            this.sex = sex;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public Integer getAge() {
            return age;
        }

        public void setAge(Integer age) {
            this.age = age;
        }

        public String getSex() {
            return sex;
        }

        public void setSex(String sex) {
            this.sex = sex;
        }

        @Override
        public String toString() {
            return "Person{" +
                    "name='" + name + '\'' +
                    ", age=" + age +
                    ", sex='" + sex + '\'' +
                    '}';
        }
    }
}
// 使用scala语言实现RDD和DataFrame之间的转换
object RDD2DataFrameReflection {
    // 申明一个case class 作为bean对象
    case class Person(name: String, age: Int, sex: String)

    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrame")
        val sc = SparkContext.getOrCreate(conf)
        val sqlContext = SQLContext.getOrCreate(sc)

        // scala中RDD转成DataFrame需要导入隐式转换的包
        import sqlContext.implicits._
        // 读取txt文件生成RDD,调用toDF转换成DataFrame,最后将DataFrame注册成一张临时表
        sc.textFile(Thread.currentThread().getContextClassLoader.getResource("person.txt").getPath)
            .map(_.split(" "))
            .map(array => Person(array(0), array(1).trim.toInt, array(2)))
            .toDF()
            .registerTempTable("person")

        // 直接调用DataFrame的rdd方法即可得到RDD,此时RDD中的数据类型为Row,需要转换成Person。最后进行打印
        sqlContext.sql("select name,age,sex from person where age > 18")
            .rdd
            .map(row => Person(row.getString(0), row.getInt(1), row.getString(2)))
            .collect
            .foreach(println)
    }
}

使用编码的方式动态创建元数据

// 使用Java语言实现RDD和DataFrame的互相转换(自定义元数据的方式)
public class RDD2DataFrameProgrammatically {
    public static void main(String[] args) {
        // 创建SparkContext和SQLContext
        SparkConf conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrame");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);

        // 从文件中读取数据生成RDD,将RDD中的类型转换成Row类型
        JavaRDD<Row> rdd = sc.textFile(Thread.currentThread().getContextClassLoader().getResource("person.txt").getPath())
                .map(line -> line.split(" "))
                .map(array -> RowFactory.create(array[0], Integer.parseInt(array[1]), array[2]));

        // 构建元数据信息
        List<StructField> structFields = Arrays.asList(
                DataTypes.createStructField("name", DataTypes.StringType, true),
                DataTypes.createStructField("age", DataTypes.IntegerType, true),
                DataTypes.createStructField("sex", DataTypes.StringType, true)
        );
        StructType structType = DataTypes.createStructType(structFields);

        // 将RDD转换成DataFrame,并将其注册成一张临时表
        sqlContext.createDataFrame(rdd, structType).registerTempTable("person");

        sqlContext.sql("select name,age,sex from person where age > 18")
                .javaRDD()
                .map(row -> new RDD2DataFrameReflection.Person(row.getString(0), row.getInt(1), row.getString(2)))
                .collect()
                .forEach(System.out::println);
    }
}

// 使用scala语言实现RDD和DataFrame的互相转换(自定义元数据的方式)
object RDD2DataFrameProgrammatically {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local").setAppName("RDD2DataFrame")
        val sc = SparkContext.getOrCreate(conf)
        val sqlContext = SQLContext.getOrCreate(sc)

        // 从文本文件中读取数据生成RDD
        val rdd = sc.textFile(Thread.currentThread().getContextClassLoader.getResource("person.txt").getPath)
            .map(_.split(" "))
            .map(array => Row(array(0), array(1).toInt, array(2)))
        // 构建元数据信息
        val structType = StructType(Array(
            StructField("name", StringType, true),
            StructField("age", IntegerType, true),
            StructField("sex", StringType, true)
        ))
        // 将RDD转换成DataFrame并将其注册成一张临时表
        sqlContext.createDataFrame(rdd, structType).registerTempTable("person")
        // 查询出年龄大于18的用户信息,将其转成RDD再进行打印
        sqlContext.sql("select name,age,sex from person where age > 18")
            .rdd
            .map(row => Person(row.getString(0), row.getAs[Integer](1), row.getString(2)))
            .collect()
            .foreach(println)
    }
}

load && save 方法

// Java 语言演示 load 和 save 方法的具体使用
public static void loadAndSave() {
    SparkConf conf = new SparkConf().setAppName("LoadAndSave").setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);
    SQLContext sqlContext = new SQLContext(sc);


    // 不指定类型时 load 和 save 默认的格式为 parquet
    sqlContext.read()
            .load(Thread.currentThread().getContextClassLoader().getResource("student.parquet").getPath())
            .write()
            .save(Thread.currentThread().getContextClassLoader().getResource("student_copy.parquet").getPath());

    // 手动指定 load 和 save 方法操作的文件类型
    sqlContext.read()
            .format("json")
            .load(Thread.currentThread().getContextClassLoader().getResource("student.txt").getPath())
            .show();

    sqlContext.read()
            .format("json")
            .load(Thread.currentThread().getContextClassLoader().getResource("student.txt").getPath())
            .write()
            .format("json")
            .save("file:///Users/garen/Documents/student_copy");

    sqlContext.read()
            .format("json")
            .load(Thread.currentThread().getContextClassLoader().getResource("student.txt").getPath())
            .write()
            .format("json")
            .mode(SaveMode.ErrorIfExists)
            .save("file:///Users/garen/Documents/student_copy");
}
def loadAndSave(): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("LoadAndSave")
    val sc = SparkContext.getOrCreate(conf)
    val sqlContext = SQLContext.getOrCreate(sc)

    sqlContext.read
        .format("json")
        .load(Thread.currentThread().getContextClassLoader.getResource("student.txt").getPath)
        .show()

    val dataFrame = sqlContext.read
        .format("json")
        .load(Thread.currentThread().getContextClassLoader.getResource("student.txt").getPath)
    dataFrame.filter(dataFrame.col("age").gt(18))
        .write
        .format("json")
        .mode(SaveMode.ErrorIfExists)
        .save("file:///D:/demo")
}

parquet数据源操作

使用编程方式加载parquet数据源

// java读取parquet文件
public static void loadParquet() {
    SparkConf conf = new SparkConf().setMaster("local").setAppName("parquet");
    JavaSparkContext sc = new JavaSparkContext(conf);
    SQLContext sqlContext = new SQLContext(sc);

    sqlContext.read().parquet(Thread.currentThread().getContextClassLoader().getResource("/tarball/users.parquet").getPath()).registerTempTable("person");
    sqlContext.sql("select name from person")
            .javaRDD()
            .map(row -> row.getString(0))
            .collect()
            .forEach(System.out::println);
}
// scala读取parquet文件
def parquet(): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("parquet")
    val sc = SparkContext.getOrCreate(conf)
    val sqlContext = SQLContext.getOrCreate(sc)

    sqlContext.read
        .parquet(Thread.currentThread.getContextClassLoader.getResource("tarball/users.parquet").getPath)
        .registerTempTable("users")

    sqlContext.sql("select * from users")
        .rdd
        .map(_.getString(0))
        .collect
        .foreach(println)
}

自动分区推断

// Java语言
public static void parquetPartion() {
    SparkConf conf = new SparkConf().setMaster("local").setAppName("ParquetPartion");
    JavaSparkContext sc = new JavaSparkContext(conf);
    SQLContext sqlContext = new SQLContext(sc);

    // 打印schema信息,会多出两个分区字段("gender","country")
    // 分区列数据类型:支持numeric和string类型的自动推断,通过“spark.sql.sources.partitionColumnTypeInference.enabled”配置开启或关闭(默认开启),关闭后分区列全为string类型。
    sqlContext.read()
            .parquet("hdfs://spark1:9000/spark-study/users/gender=male/country=US/users.parquet")
            .printSchema();
}

自动合并元数据

def mergeSchema(): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("MergeSchema")
    // 方法一:通过 conf 对象,设置 spark.sql.parquet.mergeSchema 为 true
    // conf.set("spark.sql.parquet.mergeSchema", "true")
    val sc: SparkContext = SparkContext.getOrCreate(conf)
    val sqlContext: SQLContext = SQLContext.getOrCreate(sc)

    val nameAndAgeArray: Array[(String, Int)] = Array(("Tom", 23), ("Garen", 21), ("Peter", 25))
    val nameAndScoreArray: Array[(String, Int)] = Array(("Tome", 100), ("Garen", 98), ("Peter", 95))

    import sqlContext.implicits._
    sc.parallelize(nameAndAgeArray)
        .toDF("name", "age")
        .write
        .format("parquet")
        .mode(SaveMode.Append)
        .save("hdfs://users/hadoop/persons")

    sc.parallelize(nameAndScoreArray)
        .toDF("name", "score")
        .write
        .format("parquet")
        .mode(SaveMode.Append)
        .save("hdfs://users/hadoop/persons")

    // 方法二:通过 option 设置 MergeSchema 为 true
    sqlContext.read.option("mergeSchema", "true")
        .parquet("hdfs://users/hadoop/persons")
        .printSchema()
        
    sqlContext.read.option("mergeSchema", "true")
            .parquet("data/persons")
            .show()
}

/**
 * root
 * |-- name: string (nullable = true)
 * |-- score: integer (nullable = true)
 * |-- age: integer (nullable = true)
 *
 *  +-----+-----+----+
 *  | name|score| age|
 *  +-----+-----+----+
 *  |  Tom| null|  23|
 *  |Garen| null|  21|
 *  |Peter| null|  25|
 *  | Tome|  100|null|
 *  |Garen|   98|null|
 *  |Peter|   95|null|
 *  +-----+-----+----+
 */

Json 数据源操作

需求:查询分数大于80的学生信息以及成绩信息

public static void jsonOption() {
    SparkConf conf = new SparkConf().setMaster("local").setAppName("JsonOption");
    JavaSparkContext sc = new JavaSparkContext(conf);
    SQLContext sqlContext = new SQLContext(sc);

    // 方式一: 从 json 文件创建学生信息的 DataFrame,并注册成临时表
    sqlContext.read().format("json")
            .load(Thread.currentThread().getContextClassLoader().getResource("student.txt").getPath())
            .registerTempTable("student_info");

    JavaRDD<String> rdd = sc.parallelize(Arrays.asList(
            "{\"id\":\"1001\",\"score\":100}",
            "{\"id\":\"1002\",\"score\":79}",
            "{\"id\":\"1003\",\"score\":98}"
    ));
    // 方式二: 从 json 格式的 RDD 创建学生分数信息的 DataFrame,并注册成临时表
    sqlContext.read().json(rdd).registerTempTable("student_score");
    // 查询分数大于80的学生信息和成绩,并保存到 json 文件中
    sqlContext.sql("select a.id,a.name,a.age,b.score from student_info a left join student_score b on a.id = b.id where b.score > 80")
            .write()
            .format("json")
            .mode(SaveMode.Overwrite)
            .save("data/student_score");
}
def jsonOption(): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("JsonOption")
    val sc: SparkContext = SparkContext.getOrCreate(conf)
    val sqlContext = SQLContext.getOrCreate(sc)
    // 从文件中读取json数据,并注册成临时表
    sqlContext.read
        .format("json")
        .load(Thread.currentThread().getContextClassLoader.getResource("student.txt").getPath)
        .registerTempTable("student_info")

    // 从rdd中读取数据,并注册成临时表
    val jsonArray: Array[String] = Array(
        "{\"id\":\"1001\",\"score\":100}",
        "{\"id\":\"1002\",\"score\":90}",
        "{\"id\":\"1003\",\"score\":80}"
    )
    val rdd = sc.parallelize(jsonArray)
    sqlContext.read.json(rdd).registerTempTable("student_score")

    sqlContext.sql("select a.id,a.name,a.age,b.score from student_info a left join student_score b on a.id = b.id where b.score > 80")
        .write
        .format("json")
        .mode(SaveMode.Overwrite)
        .save("data/students_score")
}

Hive数据源操作

操作hive数据源完成查询成绩大于80的用户信息的功能。对hive数据源进行操作时需要将hive-site.xml和mysql的驱动包拷贝到spark的目录下。

public static void hiveOption() {
    SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("HiveOption");
    JavaSparkContext sc = new JavaSparkContext(conf);
    HiveContext hiveContext = new HiveContext(sc);

    // 如果student_info表存在则先删除
    hiveContext.sql("DROP TABLE IF EXISTS hive.student_info").show();
    // 创建studnet_info表结构
    hiveContext.sql("CREATE TABLE IF NOT EXISTS hive.student_info (name STRING,age INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '").show();
    // 加载数据到student_info表中
    hiveContext.sql("LOAD DATA INPATH '/user/hadoop/test_data/student_info.txt' INTO TABLE student_info").show();

    // 如果student_score表存在则先删除
    hiveContext.sql("DROP TABLE IF EXISTS hive.student_score").show();
    // 创建student_score表结构
    hiveContext.sql("CREATE TABLE IF NOT EXISTS hive.student_score (name STRING,score INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '").show();
    // 加载数据到student_score表中
    hiveContext.sql("LOAD DATA INPATH '/user/hadoop/test_data/student_score.txt' INTO TABLE student_score").show();

    // 查询出成绩大于80的学生信息并保存为hive的一张新表
    hiveContext.sql("select a.name,a.age,b.score from student_info a left join student_score b on a.name = b.name where b.score > 80")
            .saveAsTable("student_info_score");

    // 通过hive中的表直接创建DataFrame
    Row[] rows = hiveContext.table("student_info_score").collect();
    Arrays.asList(rows).forEach(System.out::println);
}

JDBC 数据源操作

public static void jdbc() {
    SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("JDBCOption");
    JavaSparkContext sc = new JavaSparkContext(conf);
    SQLContext sqlContext = SQLContext.getOrCreate(sc.sc());

    // 从student_info表中读取数据,并转换成 RDD
    Map<String, String> options = new HashMap<>();
    options.put("url", "jdbc:mysql://hadoop-cdh:3306/testdb");
    options.put("user", "root");
    options.put("password", "Slb930802.");
    options.put("dbtable", "student_info");
    DataFrame student_info_df = sqlContext.read().format("jdbc").options(options).load();
    JavaPairRDD<String, Integer> student_info_rdd = student_info_df.javaRDD()
            .mapToPair(row -> new Tuple2<String, Integer>(row.getString(0), row.getInt(1)));

    // 从student_score表中读取数据,并转换成 RDD
    options.put("dbtable", "student_score");
    DataFrame student_score_df = sqlContext.read().format("jdbc").options(options).load();
    JavaPairRDD<String, Integer> student_score_rdd = student_score_df.javaRDD()
            .mapToPair(row -> new Tuple2<String, Integer>(row.getString(0), row.getInt(1)));

    // 使用 join 算子连接两个 RDD,再筛选出分数大于80的学生信息
    JavaRDD<Row> good_student_info_rdd = student_info_rdd.join(student_score_rdd)
            .map(t -> RowFactory.create(t._1(), t._2()._1(), t._2()._2()))
            .filter(row -> row.getInt(2) > 80);

    // 通过自定义元数据的方式将 RDD 转换成 DataFrame 并打印
    List<StructField> structFields = Arrays.asList(
            DataTypes.createStructField("name", DataTypes.StringType, true),
            DataTypes.createStructField("age", DataTypes.IntegerType, true),
            DataTypes.createStructField("score", DataTypes.IntegerType, true)
    );
    StructType structType = DataTypes.createStructType(structFields);
    sqlContext.createDataFrame(good_student_info_rdd, structType).show();

    // 将 RDD 的数据保存到 mysql 数据库中
    good_student_info_rdd.foreach(row -> {
        Class.forName("com.mysql.jdbc.Driver");
        Connection conn = null;
        PreparedStatement ps = null;

        try {
            conn = DriverManager.getConnection("jdbc:mysql://hadoop-cdh:3306/testdb", "root", "Slb930802.");
            ps = conn.prepareStatement("INSERT INTO good_student_info VALUES(?,?,?)");
            ps.setString(1, row.getString(0));
            ps.setInt(2, row.getInt(1));
            ps.setInt(3, row.getInt(2));
            ps.executeUpdate();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (ps != null) ps.close();
            if (conn != null) conn.close();
        }
    });
}

SparkSQL内置函数

统计UV

def uv(): Unit = {
    val conf = new SparkConf().setMaster("local[4]").setAppName("uv")
    val sc = SparkContext.getOrCreate(conf)
    val sqlContext = SQLContext.getOrCreate(sc)

    // 构造日志信息,并转换成DataFrame的格式
    val logArray = Array(
        "20180115,fpanpxc2gvjd2py0jd2254ng",
        "20180115,fpanpxc2gvjd2py0jd2254ng",
        "20180115,4xlt0txdgy220f3owkhxv4y1",
        "20180116,kz5t0lvcjsbsqacjqwqyhfu4")
    val logRDD = sc.parallelize(logArray)
        .map(_.split(","))
        .map(array => RowFactory.create(array(0), array(1)))
    val structType = StructType(Array(
        StructField("data", StringType, false),
        StructField("sessionId", StringType, false)
    ))
    val logDF = sqlContext.createDataFrame(logRDD, structType)

    // 使用SparkSQL的内置函数对字段做聚合操作
    // 使用SparkSQL的内置函数需要导入以下两个包
    import sqlContext.implicits._
    import org.apache.spark.sql.functions._
    logDF.groupBy("data").agg(countDistinct('sessionId).as("uv")).show()
}

统计每日的总销售额

def totalSale(): Unit = {
    val conf = new SparkConf().setMaster("local[4]").setAppName("TotalSale")
    val sc = SparkContext.getOrCreate(conf)
    val sqlContext = SQLContext.getOrCreate(sc)
    // 构建DataFrame
    val logArray = Array("20180115,34.3",
        "20180115,35.6",
        "20180116,99.9")
    val logRDD = sc.parallelize(logArray)
        .map(_.split(","))
        .map(array => Row(array(0), array(1).toDouble))
    val structType = StructType(Array(
        StructField("data", StringType, false),
        StructField("sale", DoubleType, false)
    ))
    
    // 做分组聚合,统计每日交易额
    import sqlContext.implicits._
    import org.apache.spark.sql.functions._
    sqlContext.createDataFrame(logRDD, structType)
        .groupBy("data")
        .agg('data, sum('sale).as("total_sale"))
        .show()
}

SparkSQL窗口函数

row_number()窗口函数

select url,rate,row_number() over(partition by url order by rate desc) as r from window_test2; 
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,651评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,468评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,931评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,218评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,234评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,198评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,084评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,926评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,341评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,563评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,731评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,430评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,036评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,676评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,829评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,743评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,629评论 2 354

推荐阅读更多精彩内容