大数据之Hadoop学习——动手实战学习MapReduce编程实例

前言

这里放一个我学习MapReduce的编程实例项目吧,本来是想把这些分开写成多篇文章的,能够详细叙述我学习过程中感想。但无奈,时间不够,只好在Github上创建了该项目,在代码中由较为详细的注释,我想也足够了吧。
josonle/MapReduce-Demo
该项目有些题目是参考了网上几篇博客,但代码实现是本人实现的。其次,所谓的MapReduce学习流程是参照老师上课所讲的PPT上的流程【某985大数据课程PPT】,我想老师以这样的流程授课肯定是有道理的。项目中也放了老师提供的几个参考Demo文件。


目录(目录不可用,见谅。项目中也付了这篇文档)

MapReduce编程实例

1.自定义对象序列化

需求分析

需要统计手机用户流量日志,日志内容实例:

flowdata.log

要把同一个用户的上行流量、下行流量进行累加,并计算出综合 。例如上面的13897230503有两条记录,就要对这两条记录进行累加,计算总和,得到:13897230503,500,1600,2100

报错:Exception in thread "main" java.lang.IllegalArgumentException: Wrong FS: hdfs://192.168.17.10:9000/workspace/flowStatistics/output, expected: file:///

解决:1、将core-site.xml 和hdfs-site.xml拷贝到项目里去就可以,原因是访问远程的HDFS 需要通过URI来获得FileSystem
    2、在项目中,Configuration对象设置fs.defaultFS 【推荐这个,**大小写别拼错,我就是拼错了找了半天**】

        String namenode_ip = "192.168.17.10";
        String hdfs = "hdfs://"+namenode_ip+":9000";
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", hdfs);

解答

一、正常处理即可,不过在处理500 1400 这种时灵活变通一下即可
public static class FlowMapper extends Mapper<Object, Text, Text, Text>{
        
        public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
            String[] strs = value.toString().split("\t");
            Text phone = new Text(strs[0]);
            Text flow = new Text(strs[1]+"\t"+strs[2]);
            context.write(phone, flow);
        }
    }
    
    public static class FlowReducer extends Reducer<Text, Text, Text, Text>{
        public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException {
            int upFlow = 0;
            int downFlow = 0;
            
            for (Text value : values) {
                String[] strs = value.toString().split("\t");
                upFlow += Integer.parseInt(strs[0].toString());
                downFlow += Integer.parseInt(strs[1].toString());
            }
            int sumFlow = upFlow+downFlow;
            
            context.write(key,new Text(upFlow+"\t"+downFlow+"\t"+sumFlow));
        }
    }

二、自定义一个实现Writable接口的可序列化的对象Flow,包含数据形式如 upFlow downFlow sumFlow
public static class FlowWritableMapper extends Mapper<Object, Text, Text, FlowWritable> {
        public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
            String[] strs = value.toString().split("\t");
            Text phone = new Text(strs[0]);
            FlowWritable flow = new FlowWritable(Integer.parseInt(strs[1]),Integer.parseInt(strs[2]));
            context.write(phone, flow);
        }
    }
    public static class FlowWritableReducer extends Reducer<Text, FlowWritable, Text, FlowWritable>{
        public void reduce(Text key,Iterable<FlowWritable> values,Context context) throws IOException, InterruptedException {
            int upFlow = 0;
            int downFlow = 0;
            
            for (FlowWritable value : values) {
                upFlow += value.getUpFlow();
                downFlow += value.getDownFlow();
            }
            
            context.write(key,new FlowWritable(upFlow,downFlow));
        }
    }
    
    public static class FlowWritable implements Writable{
        private int upFlow,downFlow,sumFlow;

        public FlowWritable(int upFlow,int downFlow) {
            this.upFlow = upFlow;
            this.downFlow = downFlow;
            this.sumFlow = upFlow+downFlow;
        }
        
        public int getDownFlow() {
            return downFlow;
        }

        public void setDownFlow(int downFlow) {
            this.downFlow = downFlow;
        }

        public int getUpFlow() {
            return upFlow;
        }

        public void setUpFlow(int upFlow) {
            this.upFlow = upFlow;
        }

        public int getSumFlow() {
            return sumFlow;
        }

        public void setSumFlow(int sumFlow) {
            this.sumFlow = sumFlow;
        }
        // writer和readFields方法务必实现,序列化数据的关键
        @Override
        public void write(DataOutput out) throws IOException {
            // TODO Auto-generated method stub
            out.writeInt(upFlow);
            out.writeInt(downFlow);
            out.writeInt(sumFlow);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            // TODO Auto-generated method stub
            upFlow = in.readInt();
            downFlow = in.readInt();
            sumFlow = in.readInt();
        }

        @Override
        public String toString() {
            // TODO Auto-generated method stub
            return upFlow+"\t"+downFlow+"\t"+sumFlow;
        }
    }

注意: 要根据具体情况在job中设置Mapper、Reducer类及输出的key、value类型
具体见代码

2.数据去重

需求分析

需求很简单,就是把文件中重复数据去掉。比如说统计类似如下文件中不包含重复日期数据的日期

2017-02-14 1
2016-02-01 2
2017-07-10 3
2016-02-26 4
2015-01-19 5
2016-04-29 6
2016-05-10 7
2015-11-20 8
2017-05-23 9
2014-02-26 10

解答思路

只要搞清楚了MR的流程这个就很简单,reducer的输入类似<key3,[v1,v2,v3...]>,这个地方输入的key3是没有重复值的。所以利用这一点,Mapper输出的key保存日期数据,value置为空即可 【这里可以使用NullWritable类型】

还有就是,不一定是日期去重,去重一行数据也是如此,key保存这一行数据即可

public static class DateDistinctMapper extends Mapper<Object, Text, Text, NullWritable> {       
        public void map(Object key, Text value, Context context ) 
                throws IOException, InterruptedException {
            String[] strs = value.toString().split(" ");
            Text date = new Text(strs[0]);//取到日期作为key
            context.write(date, NullWritable.get());
        }
    }
  
public static class DateDistinctReducer extends Reducer<Text,NullWritable,Text,NullWritable>{
    
        public void reduce(Text key, Iterable<NullWritable> values, Context context) 
                throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
    }

3.数据排序、二次排序

需求分析

这一类问题很多,像学生按成绩排序,手机用户流量按上行流量升序,下行流量降序排序等等

  1. 日期计数升序排序

  2. 日期计数降序排序

    //日期 日期出现的次数
    2015-01-27   7
    2015-01-28   3
    2015-01-29   7
    2015-01-30   6
    2015-01-31   7
    2015-02-01   15
    2015-02-02   10
    2015-02-03   9
    2015-02-04   12
    2015-02-05   14
    
  1. 手机用户流量按上行流量升序,下行流量降序排序

解答思路

MapReduce是默认会对key进行升序排序的,可以利用这一点实现某些排序

  • 单列排序
    • 升序还是降序排序
    • 可以利用Shuffle默认对key排序的规则;
    • 自定义继承WritableComparator的排序类,实现compare方法
  • 二次排序
    • 实现可序列化的比较类WritableComparable<T>,并实现compareTo方法(同样可指定升序降序)
日期按计数升序排序
public static class SortMapper extends Mapper<Object, Text, IntWritable, Text> {
        private IntWritable num = new IntWritable();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] strs = value.toString().split("\t");
            num.set(Integer.parseInt(strs[1]));
            // 将次数作为key进行升序排序
            context.write(num, new Text(strs[0]));
            System.out.println(num.get()+","+strs[0]);
        }
    }

    public static class SortReducer extends Reducer<IntWritable, Text, Text, IntWritable> {

        public void reduce(IntWritable key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            for (Text value : values) {
                // 排序后再次颠倒k-v,将日期作为key
                System.out.println(value.toString()+":"+key.get());
                context.write(value, key);
            }
        }
    }
日期按计数降序排序

实现自定义的排序比较器,继承WritableComparator类,并实现其compare方法

public static class MyComparator extends WritableComparator {
        public MyComparator() {
            // TODO Auto-generated constructor stub
            super(IntWritable.class, true);
        }

        @Override
        @SuppressWarnings({ "rawtypes", "unchecked" }) // 不检查类型
        public int compare(WritableComparable a, WritableComparable b) {
            // CompareTo方法,返回值为1则降序,-1则升序
            // 默认是a.compareTo(b),a比b小返回-1,现在反过来返回1,就变成了降序
            return b.compareTo(a);
    }

所使用的Mapper、Reducer同上面升序排序的,其次,要在main函数中指定自定义的排序比较器

job.setSortComparatorClass(MyComparator.class);

手机用户流量按上行流量升序,下行流量降序排序

同第一个实例类似,要自定义对象序列化,同时也要可比较,实现WritableComparable接口,并实现CompareTo方法

我这里是将之前统计好的用户流量数据作为输入数据

public static class MySortKey implements WritableComparable<MySortKey> {
        private int upFlow;
        private int downFlow;
        private int sumFlow;

        public void FlowSort(int up, int down) {
            upFlow = up;
            downFlow = down;
            sumFlow = up + down;
        }

        public int getUpFlow() {
            return upFlow;
        }
        public void setUpFlow(int upFlow) {
            this.upFlow = upFlow;
        }
        public int getDownFlow() {
            return downFlow;
        }
        public void setDownFlow(int downFlow) {
            this.downFlow = downFlow;
        }
        public int getSumFlow() {
            return sumFlow;
        }
        public void setSumFlow(int sumFlow) {
            this.sumFlow = sumFlow;
        }

        @Override
        public void write(DataOutput out) throws IOException {
            // TODO Auto-generated method stub
            out.writeInt(upFlow);
            out.writeInt(downFlow);
            out.writeInt(sumFlow);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            // TODO Auto-generated method stub
            upFlow = in.readInt();
            downFlow = in.readInt();
            sumFlow = in.readInt();
        }

        @Override
        public int compareTo(MySortKey o) {
            if ((this.upFlow - o.upFlow) == 0) {// 上行流量相等,比较下行流量
                return o.downFlow - this.downFlow;// 按downFlow降序排序
            } else {
                return this.upFlow - o.upFlow;// 按upFlow升序排
            }
        }

        @Override
        public String toString() {
            // TODO Auto-generated method stub
            return upFlow + "\t" + downFlow + "\t" + sumFlow;
        }
    }

    public static class SortMapper extends Mapper<Object, Text, MySortKey, Text> {
        Text phone = new Text();
        MySortKey mySortKey = new MySortKey();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] lists = value.toString().split("\t");
            phone.set(lists[0]);
            mySortKey.setUpFlow(Integer.parseInt(lists[1]));
            mySortKey.setDownFlow(Integer.parseInt(lists[2]));
            context.write(mySortKey, phone);// 调换手机号和流量计数,后者作为排序键
        }
    }

    public static class SortReducer extends Reducer<MySortKey, Text, Text, MySortKey> {
        public void reduce(MySortKey key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            for (Text value : values) {
                System.out.println(value.toString()+","+key.toString());
                context.write(value, key);// 再次把手机号和流量计数调换
            }
        }
    }

4.自定义分区

需求分析

还是以上个例子的手机用户流量日志为例,在上个例子的统计需要基础上添加一个新需求:按省份统计,不同省份的手机号放到不同的文件里。

例如137表示属于河北,138属于河南,那么在结果输出时,他们分别在不同的文件中。

解答思路

挺简单的,看过我之前结合源码解读MapReduce过程的话,就知道这其实就是一个分区的问题。定义自己的分区规则,一个分区会对应一个reduce,会输出到一个文件。

而你需要做的就是基础partitioner类,并实现getPartition方法,其余过程同第一个例子

// 自定义分区类
public static class PhoneNumberPartitioner extends Partitioner<Text, FlowWritable> {
        private static HashMap<String, Integer> numberDict = new HashMap<>();
        static {
            numberDict.put("133", 0);
            numberDict.put("135", 1);
            numberDict.put("137", 2);
            numberDict.put("138", 3);
        }

        @Override
        public int getPartition(Text key, FlowWritable value, int numPartitions) {
            String num = key.toString().substring(0, 3);
            // 借助HashMap返回不同手机段对应的分区号
            // 也可以直接通过if判断,如
            // 根据年份对数据进行分区,返回不同分区号
            // if (key.toString().startsWith("133")) return 0 % numPartitions;
            return numberDict.getOrDefault(num, 4);
        }
    }

注意: main函数中要指定自定义分区类,以及Reducer task数量(一个分区对应一个reduce任务,一个Reduce任务对应一个输出文件)

// 设置分区类,及Reducer数目
job.setPartitionerClass(PhoneNumberPartitioner.class);
job.setNumReduceTasks(4);

[图片上传失败...(image-591982-1544427279895)]

增加ReduceTask数量可看到生成的文件数也增加了,不过文件内容为空

5.计算出每组订单中金额最大的记录

需求分析

有如下订单数据:

img

需要求出每一个订单中成交金额最大的一笔交易。

思路解答

实际上是求最大值、最小值的问题,一拿到题,大概会冒出两种思路吧

  1. 先排序(升序),Reduce端取第一条就是最小值,最后一条是最大值
  2. 不排序,在Reduce端不断循环作比较,也可以求得最值

但问题还涉及到每一个订单中的最大值,这就是分组的问题。比如说这里,同一订单号视为一组,在一组中找最大

先定义一个可序列化且可比较的对象Pair,用来存order_id,amount(只涉及这两个变量)。Mapper端输出类似

Key2 Value2
{order_0000001,222.8} null
{order_0000001,25.8} null
{order_0000002,522.8} null
{order_0000002,122.4} null
{order_0000003,222.8} null

通过Pair中的order_id分组,因为Pair又是可比较,设置同一组按照amount降序排序。然后在Reduce端取第一个key-value对即可
Reduce端输入k-v类似下表:

Key3 Value3
{order_0000001,[222.8,25.8]} null
{order_0000002,[522.8,122.4]} null
{order_0000003,[222.8]} null

以上是排序思路,因为这里比较简单,直接在reduce端进行比较求最值更方便 【你可以自己试一下】


// 定义Pair对象
    public static class Pair implements WritableComparable<Pair> {
        private String order_id;
        private DoubleWritable amount;

        public Pair() {
            // TODO Auto-generated constructor stub
        }

        public Pair(String id, DoubleWritable amount) {
            this.order_id = id;
            this.amount = amount;
        }

        // 省略一些内容,你可以直接去文件中看

        @Override
        public void write(DataOutput out) throws IOException {
            // TODO Auto-generated method stub
            out.writeUTF(order_id);
            out.writeDouble(amount.get());
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            // TODO Auto-generated method stub
            order_id = in.readUTF();
            amount = new DoubleWritable(in.readDouble());
        }

        @Override
        public int compareTo(Pair o) {
            if (order_id.equals(o.order_id)) {// 同一order_id,按照amount降序排序
                return o.amount.compareTo(amount);
            } else {
                return order_id.compareTo(o.order_id);
            }
        }

    }
// 是分组不是分区,分组是组内定义一些规则由reduce去处理,分区是由多个Reduce处理,写到不同文件中
// 自定义分组类
    public static class GroupComparator extends WritableComparator {
        public GroupComparator() {
            // TODO Auto-generated constructor stub
            super(Pair.class, true);
        }
        // Mapper端会对Pair排序,之后分组的规则是对Pair中的order_id比较
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            // TODO Auto-generated method stub
            Pair oa = (Pair) a;
            Pair ob = (Pair) b;
            return oa.getOrder_id().compareTo(ob.getOrder_id());
        }
    }
// Mapper类
    public static class MyMapper extends Mapper<Object, Text, Pair, NullWritable> {
        Pair pair = new Pair();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] strs = value.toString().split(" ");
            pair.setOrder_id(strs[0]);
            pair.setAmount(new DoubleWritable(Double.parseDouble(strs[2])));
            context.write(pair, NullWritable.get());// 道理同上,以Pair作为key
            System.out.println(pair.getOrder_id()+","+pair.getAmount());
        }
    }
    
// Reducer类
    public static class MyReducer extends Reducer<Pair, NullWritable, Text, DoubleWritable> {
        public void reduce(Pair key, Iterable<NullWritable> values, Context context)
                throws IOException, InterruptedException {
            context.write(new Text(key.getOrder_id()), key.getAmount());// 已经排好序的,取第一个即可
            System.out.println(key.order_id+": "+key.amount.get());
        }
    }

注意: main函数中要另外设置自定义的分组类 job.setGroupingComparatorClass(GroupComparator.class);

多文件输入输出、及不同输入输出格式化类型

6.合并多个小文件(多文件输入输出、及不同输入输出格式化类型)

需求分析

要计算的目标文件中有大量的小文件,会造成分配任务和资源的开销比实际的计算开销还大,这就产生了效率损耗。

需要先把一些小文件合并成一个大文件。

解答思路

简单模型.jpg

如图,MapReduce有一种简单模型,仅仅只有Mapper。我想初学者都可能遇到过吧,当Mapper输出k-v类型同Reducer输入k-v不同类型时,Reducer不会执行。

其次,是输入和输出数据如何格式化?

输出很简单,因为最后是合并成一个文件,直接以SequenceFileOutputFormat格式化类写入即可

SequenceFileOutputFormat 的输出是一个二进制顺
序文件

输入要自定义格式化类,具体过程可以参考我之前写过的一篇文章:【MapReduce详解及源码解析(一)】——分片输入、Mapper及Map端Shuffle过程 ,本来是需要实现InputFormat接口的getSplitscreateRecordReader方法,前者是逻辑上获取切片,后者是将分片转化为键值对形式。

但是这里我们是合并小文件,没必要切片,直接将文件对象视为一个分片,键值对以文件名为key,文件对象为value。这里自定义MyInputFormat类继承自InputFormat的实现类FileInputFormat类

public class MyInputFormat extends FileInputFormat<NullWritable, BytesWritable>{
    
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        // TODO 因为是合并小文件,设置文件不可分割,k-v的v就是文件对象
        // 设置不可分,会跳过getSplits方法中切分逻辑
        return false;
    }

    @Override
    public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        // TODO Auto-generated method stub
        MyRecordReader myRecordReader = new MyRecordReader();
//      myRecordReader
        return myRecordReader;
    }

}

然后,你在查看源码时能够发现,createRecordReader方法返回值类型是RecordReader<KEYIN, VALUEIN>,该类型定义了如何获取当前Key-value,如何生成Key-Value的三个核心方法getCurrentKey(),getCurrentValue(),nextKeyValue()

所以你又要定义自己的MyRecordReader类,其继承的RecordReader类有initialize(初始化RecordReader)、getCurrentKey(),getCurrentValue(),nextKeyValue()close(关闭RecordReader)

具体的代码你可以看我源码文件

7.分组输出到多个文件【多文件输入输出、及不同输入输出格式化类型】

需求分析

img

需要把相同订单id的记录放在一个文件中,并以订单id命名。

8.join操作

需求分析

有2个数据文件:订单数据、商品信息。【数据文件:product.txt,order.txt】

订单数据表order

img

商品信息表product

img

需要用MapReduce程序来实现下面这个SQL查询运算:

select o.id order_id, o.date, o.amount, p.id p_id, p.pname, p.category_id, p.price
from t_order o join t_product p on o.pid = p.id

9.计算出用户间的共同好友

需求分析

下面是用户的好友关系列表,每一行代表一个用户和他的好友列表 【数据文件:friendsdata.txt】

image

需要求出哪些人两两之间有共同好友,及他俩的共同好友都有谁

例如从前2天记录中可以看出,C、E是A、B的共同好友,最终的形式如下:

img

MapReduce理论基础

Hadoop、Spark学习路线及资源收纳

MapReduce书籍推荐

  • 《MapReduce Design Patterns》

    image
  • 《MapReduce2.0源码分析与编程实战》

    [图片上传失败...(image-a2c861-1544427279895)]

  • 《Hadoop MapReduce v2 Cookbook, 2nd Edition》

image

MapReduce实战系统学习流程

词频统计

数据去重

数据排序

求平均值、中位数、标准差、最大/小值、计数

分组、分区

数据输入输出格式化

多文件输入、输出

单表关联

多表关联

倒排索引

索引(index)作为一种具备各种优势的数据结构,被大量应用在数据检索领域

索引的优点

  • 通过对关键字段排序,加快数据的检索速度
  • 保证每一行数据的唯一性
index.png
reverseindex.jpg

需求

对于给定的文档,确定每个单词存在于某个文档,同时在文档中出现的次数

思路解答

  • Map端对文件统计每个单词出现的次数,输出类似<{hadoop,file1},2>
  • Map端输出前要先进行Combine过程,最终输出类似< hadoop, file1:2>
  • Reduce端继续对相同单词进行合并,最终输出类似<hadoop, file1:2 file2:5>

数据文件

随便找几篇英文文档就可以了

TopN

数据文件类似如下:

t001 2067
t002 2055
t003 109
t004 1200
t005 3368
t006 251
t001 3067
t002 255
t003 19
t004 2000
t005 368
t006 2512

随便写的,每一行以空格隔开,查找后面数之的TopN

思路解答

就是每一个Map Task任务要求其只输出TopN数据,这里借助TreeMap自动排序的特性【 将数字作为排序键 】,保证TopN。然后是Reduce中再次求解TopN即可

注意: 在main函数中要设置ReduceTask数量为1,保证最终的TopN

// Mapper中实现的map方法如下
    private TreeMap<Integer, Text> visittimesMap = new TreeMap<Integer, Text>();    //TreeMap是有序KV集合

    @Override
    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        if (value == null) {
            return;
        }
        String[] strs = value.toString().split(" ");
        String tId = strs[0];
        String tVisittimes = strs[1];
        if (tId == null || tVisittimes == null) {
            return;
        }
        visittimesMap.put(Integer.parseInt(tVisittimes), new Text(value));  //将访问次数(KEY)和行记录(VALUE)放入TreeMap中自动排序
        if (visittimesMap.size() > 5) { //如果TreeMap中元素超过N个,将第一个(KEY最小的)元素删除
            visittimesMap.remove(visittimesMap.firstKey());
        }
    }
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        for (Text t : visittimesMap.values()) {
            context.write(NullWritable.get(), t);   //在clean()中完成Map输出
        }
    }

PeopleRank算法实现

推荐系统——协同过滤算法实现

数据

见resources文件夹下,其中rand.sh脚本用于生成随机日期数据

关于我

你可以在途径找到我

本文为项目中说明文档,首发于 https://blog.csdn.net/lzw2016/article/details/84928495

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,761评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,953评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,998评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,248评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,130评论 4 356
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,145评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,550评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,236评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,510评论 1 291
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,601评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,376评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,247评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,613评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,911评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,191评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,532评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,739评论 2 335

推荐阅读更多精彩内容

  • 关于Mongodb的全面总结 MongoDB的内部构造《MongoDB The Definitive Guide》...
    中v中阅读 31,863评论 2 89
  • 当在电话里对着你,诉说我在大学的孤独,不开心时,电话那边的你,沉默好久,终于说一句,下雪了吗?我迟钝了一下,你...
    离洛殊阅读 188评论 0 1
  • 夜的侵蚀给予我 逃生的灵感 飘雾的月色光影 渐渐染着在 寂寞的围墙 路灯唏嘘般的歌唱 一曲冗长的音乐 交错着风的死...
    渡落劫阅读 136评论 0 0
  • 钓鱼舟阅读 559评论 2 6