Flink-1.12(四)Flink API

Flink 开发一个简单的应用程序只需要构建环境、构建数据源、构建数据处理方案、构建数据输出及执行程序这五个步骤,但每个步骤都有对应其他强大的API,所以本文一一举例学习。

构建环境

流处理

StreamExecutionEnvironment env = null;
// 构建流环境,如果在本地则创建本地环境,如果是集群,则创建集群环境
env  = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建本地执行环境并设置并行数
env = StreamExecutionEnvironment.createLocalEnvironment(3);
// 创建远程执行环境,jobmanager的IP,端口,并行度,运行程序的位置
env = StreamExecutionEnvironment.createRemoteEnvironment("10.xxx.xx.103",6123,5,"D:/test/abc.jar");

第三种方式可以直接从本地代码中构建与远程集群的Flink JobManager 的RPC连接,通过指定应用程序所在的jar包,将运行程序远程拷贝到 JobManager 节点上,然后将Flink 应用运行在远程的环境中,本地程序相当于一个客户端。

批处理

ExecutionEnvironment env = null;
// 构建流环境,如果在本地则创建本地环境,如果是集群,则创建集群环境
env  = ExecutionEnvironment.getExecutionEnvironment();
// 创建本地执行环境并设置并行数
env = ExecutionEnvironment.createLocalEnvironment(3);
// 创建远程执行环境,jobmanager的IP,端口,并行度,运行程序的位置
env = ExecutionEnvironment.createRemoteEnvironment("10.xxx.xx.103",6123,5,"D:/test/abc.jar");

构建数据源

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 从集合中读取数据
        DataStream<Integer> ds = env.fromCollection(Arrays.asList(1,2,3,4,5,6));
        // 直接读取数据
        DataStream<Integer> ds1 = env.fromElements(1,2,3,4,5);
        // 从文件读取
        DataStream<String> ds2 = env.readTextFile("D:\\workspace\\spring\\middleware\\flink\\flink-test\\src\\main\\resources\\hello.txt");
        // 从 kafka 读取
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "10.240.30.104:9092");
        properties.setProperty("group.id", "test");
        properties.setProperty("enable.auto.commit", "true");
        properties.setProperty("auto.commit.interval.ms", "0");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>("topic1", new SimpleStringSchema(),properties);
        DataStream<String> ds3 = env.addSource(myConsumer);
        // 自定义数据源
        DataStream<String> ds4 = env.addSource(new CustomizeSource());

自定义数据源

package com.example.demo;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

/**
 * @author big uncle
 * @date 2021/6/3 13:54
 * @module
 **/
public class CustomizeSource implements SourceFunction<String> {

    private boolean running = true;

    /**
     * 读取数据
    **/
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        String str = "a-b-c:";
        int i =0;
        while (running){
            i++;
            ctx.collect(str+i);
            Thread.sleep(1000);
        }
    }

    /**
     * 关闭
    **/
    @Override
    public void cancel() {
        running = false;
    }
}

Transform

map fliter flatmap

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> dataStream = env.fromElements("a,b,c,d,e,1,2,3,4");
        // 读取值是 "a,b,c,d,e,1,2,3,4" 转变为 一个个的元素
        dataStream.flatMap(new FlatMapFunction<String, Object>() {
            @Override
            public void flatMap(String s, Collector<Object> collector) throws Exception {
                String[] str = s.split(",");
                for(String ss : str) {
                    collector.collect(ss);
                }
            }
        })
        // 为 true 才会输出,过滤掉不是 a 的值
        .filter(i -> i.equals("a"))
        // 转黄给 a 添加 字符串 0
        .map(i -> i+"0")
        // 输出结果为 a0
        .print();
        env.execute("a");
    }

key

所有的聚合操作只能在 key 分组(分区)之后。sum()、min()、max()、minBy()、maxBy()、reduce() 比较器

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
        
        dataStream.keyBy(new KeySelector<Integer, Object>() {
            @Override
            // 把所有数据放到一个分区 0
            public Object getKey(Integer integer) throws Exception {
                return 0;
            }
        })
        // 对 分区进行聚合,聚合下标为 0,注意这里是下标,最后输出数据139,会显示累加值
        .sum(0).print();
        env.execute("test");
    }
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);

        // keyBy 可以根据位置,也可以根据字段
        dataStream.keyBy(new KeySelector<Integer, Object>() {
            @Override
            // 把所有数据放到一个分区 0
            public Object getKey(Integer integer) throws Exception {
                return 0;
            }
        })
        // 注意这里是下标,也可以是字段名
        .min(0).print();
        env.execute("test");
    }

min()、minBy() ,针对元祖和对象而言,min 会对比某个字段的大小,但不会更新其他字段值,minBy() 会更新其他字段值。

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);

        // keyBy 可以根据位置,也可以根据字段
        dataStream.keyBy(new KeySelector<Integer, Object>() {
            @Override
            // 把所有数据放到一个分区 0
            public Object getKey(Integer integer) throws Exception {
                return 0;
            }
        })
        .reduce(new ReduceFunction<Integer>() {
            @Override
            public Integer reduce(Integer t1, Integer t2) throws Exception {
                return t2 - t1 > 0 ? t2 : t1;
            }
        }).print();
        env.execute("test");
    }

reduce 可以比较两个值,更自由一些

split 和 Select (OutputTag)Connect和CoMap以及Union

从代码中可以看出帮我们收集了所有数据,我们可以在做其他操作,多用于做数据标签。

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);

        final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};

        SingleOutputStreamOperator<Object> stream = dataStream.process(new ProcessFunction<Integer, Object>() {

            @Override
            public void processElement(Integer value, Context ctx, Collector<Object> out) throws Exception {
                if(value > 10) {
                    out.collect(value);
                }
                ctx.output(outputTag,"其他类型的流"+value.toString());
            }
        });
        stream.print("原始数据进行过滤后:");
        DataStream<String> sideOutputStream = stream.getSideOutput(outputTag);
        sideOutputStream.print("标签处理:");

        // 合流
        sideOutputStream.connect(stream).flatMap(new CoFlatMapFunction<String,Object,Integer>() {

            // 第一个流如何处理
            @Override
            public void flatMap1(String value, Collector out) throws Exception {
                String str = value.toString();
                str = str.replaceAll("其他类型的流","");
                out.collect(Integer.valueOf(str));
            }
            // 第二流如何处理
            @Override
            public void flatMap2(Object value, Collector out) throws Exception {
                out.collect(Integer.valueOf(value.toString()));
            }

        }).keyBy(new KeySelector<Integer, Object>() {
            @Override
            public Object getKey(Integer value) throws Exception {
                return 0;
            }
        }).sum(0).print("合并后输出:");

        env.execute("test");
    }
标签处理::1> 其他类型的流3
标签处理::2> 其他类型的流7
标签处理::3> 其他类型的流1
标签处理::7> 其他类型的流10
合并后输出::6> 1
合并后输出::6> 8
合并后输出::6> 11
合并后输出::6> 21
原始数据进行过滤后::8> 27
原始数据进行过滤后::6> 19
标签处理::8> 其他类型的流27
原始数据进行过滤后::5> 22
原始数据进行过滤后::4> 50
标签处理::5> 其他类型的流22
标签处理::6> 其他类型的流19
标签处理::4> 其他类型的流50
合并后输出::6> 48
合并后输出::6> 75
合并后输出::6> 97
合并后输出::6> 119
合并后输出::6> 138
合并后输出::6> 157
合并后输出::6> 207
合并后输出::6> 257

该方法只能在以下处理时使用。

flink 1.12 没有 Split 和 Select,换成了 OutputTag

Union 要求多条流合并,必须是相同的数据类型,比较简单,这里就不说了。

分区

keyBy 默认是按照 hash 分区,除了KeyBy 还有其他的分区方式。

  • broadcast() 广播,给下游分区全部广播同一份数据
  • shuffle() 随机把当前数据发配个下游分区
  • foeward() 只放在当前分区做计算
  • rebalance() 轮询给下游分区发数据
  • rescale() 给相同分组的下游分区轮询发数据
  • global() 丢给下游分区的第一个实例
  • partitionCustom() 用户自定义重分区方式

Sink 输出

kafka

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
        dataStream.addSink(new FlinkKafkaProducer("topic-test",new SimpleStringSchema(),new Properties()));
        env.execute("test");
    }

自定义

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
        dataStream.addSink(new MySink()).setParallelism(1);
        env.execute("test");
    }

    public static class MySink extends RichSinkFunction<Integer> {
        // 不建议 在open 和 clos 写类似数据库连接 把握不好..死翘翘,但可以在 open 和 clous 检查连接状态,或有链接池 获取也行
        @Override
        public void open(Configuration parameters) throws Exception {
            System.out.println("open 有多少数据呗调用多少次");
        }

        @Override
        public void close() throws Exception {
            System.out.println("close 有多少数据呗调用多少次");
        }

        @Override
        public void invoke(Integer value, Context context) throws Exception {
            // 输出数据 可以用来写 到 redis kafka  hdfs 等
            System.out.println(value);
        }
    }

想要直到 flink 支持更多的 source 或 sink 可以去 maven库里 搜索 flink connector

window

window 就是将无限流切割为有限流的一种方式,它会将流数据分发到有限大小的桶(bucket)中进行分析。window 不是等有数据才去做窗口,而是有一个桶,等数据,桶可以存在多个。只要桶不关闭,哪怕晚迟到的数据,依然可以被分配到对应的桶里。

基本类型

时间窗口(Time Window)

按时间段做为窗口,如8:01~8:02 为一个窗口
时间窗口分为:滚动时间窗口(TumBlingWindows)、滑动时间窗口(SlidingWindows)、会话窗口(Session Windows)

  • 滚动时间窗口:时间是对齐的,如8:00~9:00 下一个窗口就是 9:00~10:00,数据不会重叠(不重复数据),如果数据刚好在整点,按照 [ ) 左开右闭,会属于新时间。


  • 滑动时间窗口:窗口长度固定,可以有重叠(重复数据)


  • 会话窗口:一定时间没有收到数据会话结束,有新的数据来,开启新的会话。session gap 为最小的间隔。


计数窗口(Count Window)

按多少各数做为一个窗口,如10个数 为一个窗口。
计数窗口分为:滚动计数窗口、滑动计数窗口

代码演示

如果想要调用时间窗口,必须在 keyBy 之后,但也可以直接使用 windowALL() 方法,该方法把所有数据放到一个分区,并行度为一。

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
        // 滚动窗口,可以看这个方法的实现类
        dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(15)));
        // session 窗口
        dataStream.keyBy(0).window(EventTimeSessionWindows.withGap(Time.minutes(1)));
        // 滚动窗口
        dataStream.keyBy(0).timeWindow(Time.seconds(15));
        // 滑动窗口
        dataStream.keyBy(0).timeWindow(Time.seconds(20),Time.seconds(5));
        // 滑动窗口
        dataStream.keyBy(0).window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(5)));
        // 滚动计数窗口
        dataStream.keyBy(0).countWindow(10);
        // 滑动计数窗口
        dataStream.keyBy(0).countWindow(10,3);
    }

窗口函数

增量聚合函数

每条数据到来进行计算,保持一个简单的状态。一定会等到时间点(窗口结束点)到才会输出结果。这种窗口可以做一些max、min等比较,累计的函数

  • ResuceFunction
  • AggregateFunction
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> dataStream = env.socketTextStream("192.168.200.58",7777);
        // 滑动窗口
        dataStream.map(i -> Integer.valueOf(i)).keyBy(new KeySelector<Integer, Object>() {
            @Override
            public Object getKey(Integer value) throws Exception {
                return 0;
            }
        }).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
//               .reduce(new ReduceFunction<Integer>() {
//                   // 比较两个数大小等 可以实现 max min sum 等操作
//                   @Override
//                   public Integer reduce(Integer value1, Integer value2) throws Exception {
//                       return null;
//                   }
//               });
                // 实现一个计数器,统计有多少个数据
                .aggregate(new AggregateFunction<Integer, Integer, Integer>() {
                    // 创建一个累加器,初始值
                    @Override
                    public Integer createAccumulator() {
                        return 0;
                    }

                    // 累加规则
                    @Override
                    public Integer add(Integer value, Integer accumulator) {
                        return accumulator + 1;
                    }

                    // 输出结果
                    @Override
                    public Integer getResult(Integer accumulator) {
                        return accumulator;
                    }

                    // merge一般使用的是 session window 做合并操作
                    @Override
                    public Integer merge(Integer a, Integer b) {
                        return null;
                    }
                }).print();
        env.execute("test");
    }
全窗口函数

先把窗口所有数据收集起来,等到计算的时候会便利所有数据。这种窗口可以做一些中位数或平均数等操作。

  • ProcessWindowFunction
  • WindowFunction
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> dataStream = env.socketTextStream("192.168.200.58",7777);
        // 滑动窗口
        dataStream.map(i -> Integer.valueOf(i)).keyBy(new KeySelector<Integer, Object>() {
            @Override
            public Object getKey(Integer value) throws Exception {
                return 0;
            }
        }).window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 攒齐5s 的数据,求平均值
                // 参数1 输入类型,2 输出类型,3 key的类型,4 就是个TimeWindow
                .apply(new WindowFunction<Integer, Integer, Object, TimeWindow>() {
                    // 参数1 key的类型,2 就是window,3 所有输入的数据,4 输出收集器
                    @Override
                    public void apply(Object o, TimeWindow window, Iterable<Integer> input, Collector<Integer> out) throws Exception {
                        List<Integer> list = IteratorUtils.toList(input.iterator());
                        int count = list.size();
                        int sum = list.stream().reduce((i,y) -> i+y).get();
                        out.collect(sum/count);
                    }
                }).print();

        env.execute("test");
    }

把 apply 换成 process 是一样的。

                .process(new ProcessWindowFunction<Integer, Integer, Object, TimeWindow>() {
                    // context 会拿到的信息比 window 多,包含了 window
                    @Override
                    public void process(Object o, Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception {
                        List<Integer> list = IteratorUtils.toList(elements.iterator());
                        int count = list.size();
                        int sum = list.stream().reduce((i,y) -> i+y).get();
                        out.collect(sum/count);
                    }
                }).print();

在滑动时间窗口,或滑动计数中,如下:

我们会想滑动计数,应该是10个数,来2个滑动一次,但实际流操作中,当你输入2个数,就会输出一次,如下:

假设我们的程序是算平均值,第一次滑动是 (27+10)/2 第二次滑动则是 (27+10+3+7)/4 依此类推。这种方式是等补齐滑动窗口的长度,才会全额计算,就是10个一算。

其他可选API
  • trigger() 触发器:定义windows什么时候关闭,触发计算并输出结果,分两步的原因是,考虑数据会迟到,迟到的数据到windows关闭的时间,可以先输出结果,在等等认为会迟到的数据,然后关闭。
  • evictor 移除器:定义某些数据的逻辑。
  • allowedLateness() 允许处理迟到的数据,5秒后输出结果,等1分钟才关闭窗口,但1分钟内该窗口会实时更新数据,不断输出新逻辑数据。
window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .allowedLateness(Time.minutes(1));
  • sideOutputLateData() 将迟到的数据放入侧输出流
  • getSideOutput() 获取侧输出流
        // 标记输出流(侧输出流)
        OutputTag<Integer> outputTag = new OutputTag<Integer>("late"){};

        SingleOutputStreamOperator<Integer> ds = dataStream.map(i -> Integer.valueOf(i)).keyBy(new KeySelector<Integer, Object>() {
            @Override
            public Object getKey(Integer value) throws Exception {
                return 0;
            }
        }).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .allowedLateness(Time.minutes(1))
                // 如果1分钟后还有迟到数据,就放到一个侧输出流里,给一个标记,也可以不搭配 allowedLateness() 使用
                .sideOutputLateData(outputTag)
                .sum(0);

        ds.getSideOutput(outputTag).print();

这种等的方式,迟到数据底属于 1窗口 还是属于 2窗口呢?那就需要和数据本身的时间作比较了,所以以上只是API的调用,但不建议这样使用,针对迟到数据我们一定会用迟到数据的本身时间,来决定他在哪个窗口。

ProcessFunction API

ProcessFunction 可以访问时间戳、watermark以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。ProcessFunction用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window函数和转换算子、Richfunction都无法实现)。例如,flink sql 就是使用ProcessFunction 实现的。

flink 提供了8个 ProcessFunction

  • ProcessFunction
  • KeyedProcessFunction 分组之后得到KeyedStream,调用该 ProcessFunction
  • CoProcessFunction
  • ProcessJooinFunction
  • BroadcastProcessFunction
  • KeyedBroadcastProcessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction

KeyedProcessFunction

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> dataStream = env.socketTextStream("192.168.200.58", 7777);
        // 数据转换
        DataStream<EventData> stream = dataStream.map(new MapFunction<String, EventData>() {
            @Override
            public EventData map(String value) throws Exception {
                String[] strs = value.split(",");
                return new EventData(
                        strs[0],
                        Long.valueOf(strs[1]),
                        String.valueOf((Long.valueOf(strs[1])-5)),
                        Integer.valueOf(strs[3])
                );
            }
        });
        stream.keyBy(new KeySelector<EventData, String>() {
            @Override
            public String getKey(EventData value) throws Exception {
                return value.getId();
            }
        }).process(new MyProcess()).print();
        env.execute("test");
    }

    public static class MyProcess extends KeyedProcessFunction<String,EventData,Integer> {
        @Override
        public void processElement(EventData value, Context ctx, Collector<Integer> out) throws Exception {
            out.collect(value.hashCode());
            // context
            ctx.timestamp(); //获取数据时间戳
            ctx.getCurrentKey();// 获取key
            //ctx.output();//测输出流,可以通过构造函数吧 测输出流传入
            ctx.timerService().currentProcessingTime();// 获取处理时间(属于时间语义的一种)
            ctx.timerService().currentWatermark();// 获取水位(事件时间)(属于时间语义的一种)
            ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 1000);//处理时间
            // 事件时间,让定时器 延时10s 处理
            ctx.timerService().registerEventTimeTimer((value.getEventTime() + 10) * 1000L);
            // 删除处理定时器,根据相同的时间为标准,有多个定时器,每个定时器的时间肯定不同,根据时间进行删除
            ctx.timerService().deleteProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 1000);
            ctx.timerService().deleteEventTimeTimer((value.getEventTime() + 10) * 1000L);
        }
        // 触发定时器 timestamp,就是触发时间
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
            super.onTimer(timestamp, ctx, out);
            ctx.getCurrentKey();
//            ctx.output();
            // 查看基于什么类型的时间
            ctx.timeDomain();
        }
    }

(1)处理时间——调⽤ ctx.timerService().registerProcessingTimeTimer()注册;onTimer()在系统时间戳达到Timer设定的时间戳时触发。
(2)事件时间——调⽤ ctx.timerService().registerEventTimeTimer()注册;onTimer()在Flink内部⽔印达到或超过Timer设定的时间戳时触发。

ProcessFunction 相关视频

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,284评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,115评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,614评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,671评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,699评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,562评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,309评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,223评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,668评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,859评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,981评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,705评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,310评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,904评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,023评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,146评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,933评论 2 355

推荐阅读更多精彩内容