Apache Flink实例应用

本文主要采用flink + redis完成数据的清洗过滤,再到计算加工,最后落地.其中包括实现自定义source/sink以及event-time窗口/watermark的使用.

准备

一.docker安装kafka

1.下载docker镜像(如果直接下载docker镜像慢 可通过指定国内镜像仓库进行操作)

docker pull wurstmeister/zookeeper docker pull wurstmeister/kafka

2.分别运行docker镜像: zookeeper和kafka

2.1启动zookeeper docker run -d --name zookeeper --publish 2181:2181
--volume /etc/localtime:/etc/localtime
wurstmeister/zookeeper

2.2启动kafka docker run -d --name kafka --publish 9092:9092
--link zookeeper
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
--env KAFKA_ADVERTISED_HOST_NAME=localhost
--env KAFKA_ADVERTISED_PORT=9092
--volume /etc/localtime:/etc/localtime
wurstmeister/kafka

3.验证docker对应的容器是否启动成功

3.1 运行 docker ps,找到kafka的 CONTAINER ID, 3.2 运行 docker exec -it ${CONTAINER ID} /bin/bash,进入kafka容器。 3.3 进入kafka默认目录 /opt/kafka_2.11-0.10.1.0, 运行 bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic test, 创建一个 topic 名称为 test。

运行 bin/kafka-topics.sh --list --zookeeper zookeeper:2181 查看当前的 topic 列表。

运行一个消息生产者,指定 topic 为刚刚创建的 test , bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test,输入一些测试消息。

运行一个消息消费者,同样指定 topic 为 test, bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning,可以接收到生产者发送的消息。

二.docker安装redis

1.下载redis镜像

docker pull registry.docker-cn.com/library/redis

2.启动redis镜像

docker run -d -p 6379:6379 --name myredis registry.docker-cn.com/library/redis

3.查看docker ps 查看运行中的容器

4.连接、查看容器,使用redis镜像执行redis-cli命令连接到刚启动的容器

sudo docker exec -it 6fb1ba029b41 redis-cli 出现类似: 127.0.0.1:6379>

三.测试数据集

3.1 数据集地址如下:

wget http://training.ververica.com/trainingData/nycTaxiRides.gz

wget http://training.ververica.com/trainingData/nycTaxiFares.gz

3.2 数据集字段说明

=============================Taxi Ride数据集相关字段说明=============================
rideId         : Long      // a unique id for each ride 一次行程
taxiId         : Long      // a unique id for each taxi 本次行程使用的出租车
driverId       : Long      // a unique id for each driver 本次行程的司机
isStart        : Boolean   // TRUE for ride start events, FALSE for ride end events  行程开始标识
startTime      : DateTime  // the start time of a ride   行程开始日期
endTime        : DateTime  // the end time of a ride,    行程结束日期
                           //   "1970-01-01 00:00:00" for start events
startLon       : Float     // the longitude of the ride start location    行程开始经度
startLat       : Float     // the latitude of the ride start location     行程开始维度
endLon         : Float     // the longitude of the ride end location      行程结束经度
endLat         : Float     // the latitude of the ride end location   行程结束维度
passengerCnt   : Short     // number of passengers on the ride        本次行程乘客数

=============================TaxiFare数据集相关字段说明=============================
rideId         : Long      // a unique id for each ride     一次行程
taxiId         : Long      // a unique id for each taxi     本次行程的出租车
driverId       : Long      // a unique id for each driver   本次行程的司机
startTime      : DateTime  // the start time of a ride      行程开始时间
paymentType    : String    // CSH or CRD                    行程付费方式(CSH/CRD)
tip            : Float     // tip for this ride         本次行程的里程
tolls          : Float     // tolls for this ride           本次行程缴费
totalFare      : Float     // total fare collected          本次行程总费用

代码实现

一.基础类

用于对数据源内容的映射

import com.jdd.streaming.demos.utils.GeoUtils;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.Locale;

/**
 * @Auther: dalan
 * @Date: 19-3-18 15:23
 * @Description: 每一段行程
 */
public class TaxiRide implements Comparable<TaxiRide>, Serializable {

    private static transient DateTimeFormatter timeFormatter =
            DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withLocale(Locale.US).withZoneUTC();

    public TaxiRide() {
        this.startTime = new DateTime();
        this.endTime = new DateTime();
    }

    public TaxiRide(long rideId, boolean isStart, DateTime startTime, DateTime endTime,
                    float startLon, float startLat, float endLon, float endLat,
                    short passengerCnt, long taxiId, long driverId) {

        this.rideId = rideId;
        this.isStart = isStart;
        this.startTime = startTime;
        this.endTime = endTime;
        this.startLon = startLon;
        this.startLat = startLat;
        this.endLon = endLon;
        this.endLat = endLat;
        this.passengerCnt = passengerCnt;
        this.taxiId = taxiId;
        this.driverId = driverId;
    }

    public long rideId;
    public boolean isStart;
    public DateTime startTime;
    public DateTime endTime;
    public float startLon;
    public float startLat;
    public float endLon;
    public float endLat;
    public short passengerCnt;
    public long taxiId;
    public long driverId;

    public String toString() {  // 便于用内容输出
        StringBuilder sb = new StringBuilder();
        sb.append(rideId).append(",");
        sb.append(isStart ? "START" : "END").append(",");
        sb.append(startTime.toString(timeFormatter)).append(",");
        sb.append(endTime.toString(timeFormatter)).append(",");
        sb.append(startLon).append(",");
        sb.append(startLat).append(",");
        sb.append(endLon).append(",");
        sb.append(endLat).append(",");
        sb.append(passengerCnt).append(",");
        sb.append(taxiId).append(",");
        sb.append(driverId);

        return sb.toString();
    }

    // 用于将文件中的每条记录转为对应的TaxiRide实例
    public static TaxiRide fromString(String line) {
        String[] tokens = line.split(",");
        if (tokens.length != 11) {
            throw new RuntimeException("Invalid record: " + line);
        }

        TaxiRide ride = new TaxiRide();
        // 完成对应记录内容与字段的对应
        try {
            ride.rideId = Long.parseLong(tokens[0]);

            switch (tokens[1]) {
                case "START":
                    ride.isStart = true;
                    ride.startTime = DateTime.parse(tokens[2], timeFormatter);
                    ride.endTime = DateTime.parse(tokens[3], timeFormatter);
                    break;
                case "END":
                    ride.isStart = false;
                    ride.endTime = DateTime.parse(tokens[2], timeFormatter);
                    ride.startTime = DateTime.parse(tokens[3], timeFormatter);
                    break;
                default:
                    throw new RuntimeException("Invalid record: " + line);
            }

            ride.startLon = tokens[4].length() > 0 ? Float.parseFloat(tokens[4]) : 0.0f;
            ride.startLat = tokens[5].length() > 0 ? Float.parseFloat(tokens[5]) : 0.0f;
            ride.endLon = tokens[6].length() > 0 ? Float.parseFloat(tokens[6]) : 0.0f;
            ride.endLat = tokens[7].length() > 0 ? Float.parseFloat(tokens[7]) : 0.0f;
            ride.passengerCnt = Short.parseShort(tokens[8]);
            ride.taxiId = Long.parseLong(tokens[9]);
            ride.driverId = Long.parseLong(tokens[10]);

        } catch (NumberFormatException nfe) {
            throw new RuntimeException("Invalid record: " + line, nfe);
        }

        return ride;
    }

   // 基于timestamp排序;注意当开始行程和结束行程的timestamp相同,将start行程放置在前
    public int compareTo(TaxiRide other) {
        if (other == null) {
            return 1;
        }
        int compareTimes = Long.compare(this.getEventTime(), other.getEventTime());
        if (compareTimes == 0) {
            if (this.isStart == other.isStart) {
                return 0;
            }
            else {
                if (this.isStart) {
                    return -1;
                }
                else {
                    return 1;
                }
            }
        }
        else {
            return compareTimes;
        }
    }

    @Override
    public boolean equals(Object other) {
        return other instanceof TaxiRide &&
                this.rideId == ((TaxiRide) other).rideId;
    }

    @Override
    public int hashCode() {
        return (int)this.rideId;
    }

   // 采用的event time
    public long getEventTime() {
        if (isStart) {
            return startTime.getMillis();
        }
        else {
            return endTime.getMillis();
        }
    }
    // 行程里程
    public double getEuclideanDistance(double longitude, double latitude) {
        if (this.isStart) {
            return GeoUtils.getEuclideanDistance((float) longitude, (float) latitude, this.startLon, this.startLat);
        } else {
            return GeoUtils.getEuclideanDistance((float) longitude, (float) latitude, this.endLon, this.endLat);
        }
    }


   // ===================本段代码可暂时忽视=========================
    public static class EnrichedRide extends TaxiRide{
        public int startCell;
        public int endCell;

        public EnrichedRide(){}
        public EnrichedRide(TaxiRide ride){
            this.rideId = ride.rideId;
            this.isStart = ride.isStart;
            this.startTime = ride.startTime;
            this.endTime = ride.endTime;
            this.startLon = ride.startLon;
            this.startLat = ride.startLat;
            this.endLon = ride.endLon;
            this.endLat = ride.endLat;
            this.passengerCnt = ride.passengerCnt;
            this.taxiId = ride.taxiId;
            this.driverId = ride.driverId;

            this.startCell = GeoUtils.mapToGridCell(ride.startLon, ride.startLat);
            this.endCell = GeoUtils.mapToGridCell(ride.endLon, ride.endLat);
        }

        @Override
        public String toString() {
            return super.toString() + "," +
                    Integer.toString(this.startCell) + "," +
                    Integer.toString(this.endCell);
        }
    }
}

二.自定义source

用来完成读取gz文件.在flink中自定义source需要实现接口SourceFunction:
1.public void run(SourceContext<TaxiRide> sourceContext) throws Exception
一般获取原始源对应的数据内容
2.public void cancel()
不过在实际生产中一般用org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction,它继承了上面SourceFunction和RichFunction的接口,并实现了部分功能.

public class TaxiRideSource implements SourceFunction<TaxiRide> {

    private final int maxDelayMsecs;
    private final int watermarkDelayMSecs;

    private final String dataFilePath;
    private final int servingSpeed;

    private transient BufferedReader reader;
    private transient InputStream gzipStream;
   
    public TaxiRideSource(String dataFilePath) {
        this(dataFilePath, 0, 1);
    }

    public TaxiRideSource(String dataFilePath, int servingSpeedFactor) {
        this(dataFilePath, 0, servingSpeedFactor);
    }

    public TaxiRideSource(String dataFilePath, int maxEventDelaySecs, int servingSpeedFactor) {
        if(maxEventDelaySecs < 0) {
            throw new IllegalArgumentException("Max event delay must be positive");
        }
        this.dataFilePath = dataFilePath;
        this.maxDelayMsecs = maxEventDelaySecs * 1000;
        this.watermarkDelayMSecs = maxDelayMsecs < 10000 ? 10000 : maxDelayMsecs;
        this.servingSpeed = servingSpeedFactor;
    }

   // 这是实现Source的关键步骤: 完成对应的source真正的操作
    @Override
    public void run(SourceContext<TaxiRide> sourceContext) throws Exception {
        // 获取gzipStream并结合BufferReader提供带有buffer的reader
        gzipStream = new GZIPInputStream(new FileInputStream(dataFilePath));
        reader = new BufferedReader(new InputStreamReader(gzipStream, "UTF-8"));
        // 读取gz文件
        generateUnorderedStream(sourceContext);
        // 文件获取结束的 资源释放
        this.reader.close();
        this.reader = null;
        this.gzipStream.close();
        this.gzipStream = null;
    }
     
    // 本段代码主要用于将gz中的记录增加
    private void generateUnorderedStream(SourceContext<TaxiRide> sourceContext) throws Exception {
        long servingStartTime = Calendar.getInstance().getTimeInMillis();
        long dataStartTime;

        Random rand = new Random(7452);
        PriorityQueue<Tuple2<Long, Object>> emitSchedule = new PriorityQueue<>(
                32,
                new Comparator<Tuple2<Long, Object>>() {
                    @Override
                    public int compare(Tuple2<Long, Object> o1, Tuple2<Long, Object> o2) {
                        return o1.f0.compareTo(o2.f0);
                    }
                });

        // read first ride and insert it into emit schedule
        String line;
        TaxiRide ride;
        if (reader.ready() && (line = reader.readLine()) != null) {
            // read first ride
            ride = TaxiRide.fromString(line);
            // extract starting timestamp
            dataStartTime = getEventTime(ride);
            // get delayed time
            long delayedEventTime = dataStartTime + getNormalDelayMsecs(rand);

            emitSchedule.add(new Tuple2<Long, Object>(delayedEventTime, ride));
            // schedule next watermark
            long watermarkTime = dataStartTime + watermarkDelayMSecs;
            Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1);
            emitSchedule.add(new Tuple2<Long, Object>(watermarkTime, nextWatermark));

        } else {
            return;
        }

        // peek at next ride
        if (reader.ready() && (line = reader.readLine()) != null) {
            ride = TaxiRide.fromString(line);
        }

        // read rides one-by-one and emit a random ride from the buffer each time
        while (emitSchedule.size() > 0 || reader.ready()) {

            // insert all events into schedule that might be emitted next
            long curNextDelayedEventTime = !emitSchedule.isEmpty() ? emitSchedule.peek().f0 : -1;
            long rideEventTime = ride != null ? getEventTime(ride) : -1;
            while(
                    ride != null && ( // while there is a ride AND
                            emitSchedule.isEmpty() || // and no ride in schedule OR
                                    rideEventTime < curNextDelayedEventTime + maxDelayMsecs) // not enough rides in schedule
            )
            {
                // insert event into emit schedule
                long delayedEventTime = rideEventTime + getNormalDelayMsecs(rand);
                emitSchedule.add(new Tuple2<Long, Object>(delayedEventTime, ride));

                // read next ride
                if (reader.ready() && (line = reader.readLine()) != null) {
                    ride = TaxiRide.fromString(line);
                    rideEventTime = getEventTime(ride);
                }
                else {
                    ride = null;
                    rideEventTime = -1;
                }
            }

            // emit schedule is updated, emit next element in schedule
            Tuple2<Long, Object> head = emitSchedule.poll();
            long delayedEventTime = head.f0;

            long now = Calendar.getInstance().getTimeInMillis();
            long servingTime = toServingTime(servingStartTime, dataStartTime, delayedEventTime);
            long waitTime = servingTime - now;

            Thread.sleep( (waitTime > 0) ? waitTime : 0);

            if(head.f1 instanceof TaxiRide) {
                TaxiRide emitRide = (TaxiRide)head.f1;
                // emit ride
                sourceContext.collectWithTimestamp(emitRide, getEventTime(emitRide));
            }
            else if(head.f1 instanceof Watermark) {
                Watermark emitWatermark = (Watermark)head.f1;
                // emit watermark
                sourceContext.emitWatermark(emitWatermark);
                // schedule next watermark
                long watermarkTime = delayedEventTime + watermarkDelayMSecs;
                Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1);
                emitSchedule.add(new Tuple2<Long, Object>(watermarkTime, nextWatermark));
            }
        }
    }
    
   // 处理数据记录时间
    public long toServingTime(long servingStartTime, long dataStartTime, long eventTime) {
        long dataDiff = eventTime - dataStartTime;
        return servingStartTime + (dataDiff / this.servingSpeed);
    }

    public long getEventTime(TaxiRide ride) {
        return ride.getEventTime();
    }

    public long getNormalDelayMsecs(Random rand) {
        long delay = -1;
        long x = maxDelayMsecs / 2;
        while(delay < 0 || delay > maxDelayMsecs) {
            delay = (long)(rand.nextGaussian() * x) + x;
        }
        return delay;
    }

   // 操作source时 出现异常触发的操作
    @Override
    public void cancel() {
        try {
            if (this.reader != null) {
                this.reader.close();
            }
            if (this.gzipStream != null) {
                this.gzipStream.close();
            }
        } catch(IOException ioe) {
            throw new RuntimeException("Could not cancel SourceFunction", ioe);
        } finally {
            this.reader = null;
            this.gzipStream = null;
        }
    }
}

三.自定义sink

主要实现redis作为sink,一般在实际的应用通过继承抽象类:RichSinkFunction
1.public void open(Configuration parameters) throws Exception
2.public void invoke(Tuple2<Long, Long> val, Context context) throws Exception
3.public void close() throws Exception

 private transient JedisPool jedisPool;
@Override
public void open(Configuration parameters) throws Exception { // 新建redis pool 初始化
 try {
       super.open(parameters);
       JedisPoolConfig config = new JedisPoolConfig();
       config.setMaxIdle(redisConfig.getMaxIdle());
       config.setMinIdle(redisConfig.getMinIdle());
       config.setMaxTotal(redisConfig.getMaxTotal());
       jedisPool = new JedisPool(config, redisConfig.getHost(), redisConfig.getPort(),
                redisConfig.getConnectionTimeout(), redisConfig.getPassword(),  redisConfig.getDatabase());
   } catch (Exception e) {
        LOGGER.error("redis sink error {}", e);
  }
}

 @Override
public void close() throws Exception { // 关闭redis链接 使用完的资源释放
try {
         jedisPool.close();
    } catch (Exception e) {
     LOGGER.error("redis sink error {}", e);
  }
}

@Override
public void invoke(Tuple2<Long, Long> val, Context context) throws Exception { // 执行将内容落地redis
   Jedis jedis = null;
  try {
       jedis = jedisPool.getResource();
       jedis.set("taxi:ride:" + val.f0,val.f1.toString());
 } catch (Exception e) {
     e.printStackTrace();
} finally {
    if (null != jedis){
        if (jedis != null) {
         try {
               jedis.close();
          } catch (Exception e) {
                e.printStackTrace();
          }
     }
   }
 }
}

四.实例

public class TaxiRideCount {
    /** logger */
    private static final Logger LOGGER = LoggerFactory.getLogger(TaxiRideCount.class);
    // main
    public static void main(String[] args) throws Exception {
        // 读取配置参数: 文件路径/最大延迟时间/
        final ParameterTool params = ParameterTool.fromArgs(args);
        String path = params.get("file-path","/home/wmm/go_bench/flink_sources/nycTaxiRides.gz");
        int maxDeply = params.getInt("max-delay",60);
        int servingSpeed = params.getInt("serving-speed",600);

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().disableSysoutLogging();

        // 指定TaxiRide 读取source内容
        DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(path, maxDeply, servingSpeed));
       // 
        DataStream<Tuple2<Long,Long>> tuples = rides.map(new MapFunction<TaxiRide, Tuple2<Long, Long>>() {
            @Override
            public Tuple2<Long, Long> map(TaxiRide ride) throws Exception {
                return new Tuple2<Long, Long>(ride.driverId, 1L); // 基于行程中的司机id划分数据 并进行统计
            }
        });

        KeyedStream<Tuple2<Long, Long>, Tuple> keyByDriverId = tuples.keyBy(0); // 基于司机id进行数据划分
        DataStream<Tuple2<Long, Long>> rideCounts = keyByDriverId.sum(1); // 累计每个司机的里程数

        RedisConfig redisConfig = new RedisConfig();
        redisConfig.setHost(params.get("output-redis","127.0.0.1"));
        redisConfig.setPort(6379);
        redisConfig.setPassword(null);
        //RedisSink redisSink = new RedisSink(redisConfig);
// 本段是将redis进行了封装对应的实体类 便于复用
//        rideCounts.map(new MapFunction<Tuple2<Long, Long>, RedisCommand>() { // 落地redis
//            @Override
//            public RedisCommand map(Tuple2<Long, Long> in) throws Exception {
//                return new RedisPushCommand("taxi:ride:" + in.f0, Long.toString(in.f1));
//                //return new RedisPushCommand("taxi:ride:" + in.f0, new String[]{Long.toString(in.f1)});
//            }
//        }).addSink(redisSink);

        // 直接使用匿名类实现redis sink
        rideCounts.addSink(new RichSinkFunction<Tuple2<Long, Long>>() {  // 定义sink
            private transient JedisPool jedisPool;
            @Override
            public void open(Configuration parameters) throws Exception { // 新建redis pool
                try {
                    super.open(parameters);
                    JedisPoolConfig config = new JedisPoolConfig();
                    config.setMaxIdle(redisConfig.getMaxIdle());
                    config.setMinIdle(redisConfig.getMinIdle());
                    config.setMaxTotal(redisConfig.getMaxTotal());
                    jedisPool = new JedisPool(config, redisConfig.getHost(), redisConfig.getPort(),
                            redisConfig.getConnectionTimeout(), redisConfig.getPassword(), redisConfig.getDatabase());
                } catch (Exception e) {
                    LOGGER.error("redis sink error {}", e);
                }
            }

            @Override
            public void close() throws Exception { // 关闭redis链接
                try {
                    jedisPool.close();
                } catch (Exception e) {
                    LOGGER.error("redis sink error {}", e);
                }
            }

            @Override
            public void invoke(Tuple2<Long, Long> val, Context context) throws Exception { // 执行将内容落地redis
                Jedis jedis = null;
                try {
                    jedis = jedisPool.getResource();
                    jedis.set("taxi:ride:" + val.f0,val.f1.toString()); // 直接存到redis中
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    if (null != jedis){
                        if (jedis != null) {
                            try {
                                jedis.close();
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
            }
        });
        //rideCounts.print();

        JobExecutionResult result = env.execute("Ride Count By DriverID");
     }
}

验证

通过使用redis-client查看对应的内容:
get taxi:ride:2013001713
command count
github提供完整实例

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,919评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,567评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,316评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,294评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,318评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,245评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,120评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,964评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,376评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,592评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,764评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,460评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,070评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,697评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,846评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,819评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,665评论 2 354