Apache Flink源码解析 (五)DataStream API

概述

这篇文章是但不仅仅是官方文档的中文翻译,还有里面每一个方法对应的Transformation和运行时对Task的影响。

Prerequisites

  • 关于算子想说的有很多,都在上一篇文章里,在这篇文章中,把算子理解为包含了一个函数(Flink实现的或自己实现的,比如MapFunction,FilterFunction)的持续获得输入并且将结果输出出去的任务就好。
  • 图中的Task表示一个节点,或者说是一个TaskManager中一个Slot执行的任务
  • 流程图中红色代表这个方法在生成Transformation和实际运行时对Task产生的影响

DataStream

  • Map

    • 消费一个元素并产出一个元素
    • 参数 MapFunction
    • 返回DataStream
    • 例子:
    DataStream<Integer> dataStream = //...
    dataStream.map(new MapFunction<Integer, Integer>() {
        @Override
        public Integer map(Integer value) throws Exception {
            return 2 * value;
        }
    });
    
    • Transformation: 生成一个OneInputTransformation并包含StreamMap算子


      StreamMapTransformation
    • Runtime:


      StreamMapTask
  • FlatMap

    • 消费一个元素并产生零到多个元素
    • 参数 FlatMapFunction
    • 返回 DataStream
    • 例子:
    dataStream.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String value, Collector<String> out)
            throws Exception {
            for(String word: value.split(" ")){
                out.collect(word);
            }
        }
    });
    
    • Transformation: 生成一个OneInputTransformation并包含StreamFlatMap算子


      StreamFlatMapTransformation
    • Runtime:


      StreamFlatMapTask
  • Filter

    • 根据FliterFunction返回的布尔值来判断是否保留元素,true为保留,false则丢弃
    • 参数 FilterFunction
    • 返回DataStream
    • 例子:
    dataStream.filter(new FilterFunction<Integer>() {
        @Override
        public boolean filter(Integer value) throws Exception {
            return value != 0;
        }
    });
    
    • Transformation:生成一个OneInputTransformation并包含StreamFilter算子


      StreamFilterTransformation
    • Runtime:


      StreamFilterTask
  • KeyBy

    • 根据指定的Key将元素发送到不同的分区,相同的Key会被分到一个分区(这里分区指的就是下游算子多个并行的节点的其中一个)。keyBy()是通过哈希来分区的。
    • 只能使用KeyedState(Flink做备份和容错的状态)
    • 参数 String,tuple的索引,覆盖了hashCode方法的POJO,不能使数组
    • 返回KeyedStream
    • 例子:
    dataStream.keyBy("someKey") // Key by field "someKey"
    dataStream.keyBy(0) // Key by the first element of a Tuple
    
    • Transformation: KeyBy会产生一个PartitionTransformation,并且通过KeySelector创建一个KeyGroupStreamPartitioner,目的是将输出的数据分区。此外还会把KeySelector保存到KeyedStream的属性中,在下一个Transformation创建时时将KeySelector注入进去。


      KeyByTransformation
    • Runtime: 生成StreamGraph时会将PartitionTransformation中的Partitioner 注入到StreamEdge当中,此外还会在下一个StreamNode创建过程中注入KeySelector用于提取元素的Key。之后将Partitioner注入StreamRecordWriter中用于将上一个Task的输出元素指定到某一个ResultSubParition中,此外KeySelector也被注入到下一个Task的算子当中。


      KeyBy Runtime
  • WindowAll

    • 将元素按照某种特性聚集在一起(如时间:滑动窗口,翻转窗口,会话窗口,又如出现次数:计数窗口)
    • 参数 WindowAssigner
    • 返回 AllWindowedStream
    • 例子:
    dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
    
    • Transformation:返回AllWindowedStream,不产生Transformation,详情见AllWindowedStream
    • Runtime:详情见AllWindowedStream
  • Union

    • 将两个或多个datastream合并,创造一个新的流包含这些datastream的所有元素
    • 参数DataStream(一个或多个)
    • 返回UnionStream
    • 例子:
    dataStream.union(otherStream1, otherStream2, ...);
    
    • Transformation: 从所有相关的stream中获取Transformation并注入到UnionTransformation的inputs中


      UnionTransformation
    • Runtime:这些Inputs会在下一个Transformation创建时被作为Input来穿件StreamEdge,如果上下游并行度一致则会生成ForwardPartitioner,不一致则是RebalancePartitioner。由于Partitioner是在处理下游Transformation生成的,所以这里没有图。
  • Join

    • 将两个DataStream按照key和window join在一起
    • 参数:1. KeySelector1 2. KeySelector2 3. DataStream 4. WindowAssigner 5. JoinFunction/FlatJoinFunction
    • 返回DataStream
    • 例子:
    dataStream.join(otherStream)
        .where(<key selector>).equalTo(<key selector>)
        .window(TumblingEventTimeWindows.of(Time.seconds(3)))
        .apply (new JoinFunction () {...});
    
    • Transformation:1. 调用join方法后生成JoinedStream,JoinedStream保存了两个input 2. 调用where方法生成一个内部类Where对象,注入KeySelector1 3. 调用equalTo生成内部类EqualTo对象,注入KeySelector2 4. 调用window升成内部静态类WithWindow,并且注入WindowAssigner(在该对象中还可以注入Trigger和Evictor 5. 最后调用apply方法将(Flat)JoinFunction注入并且用一个(Flat)JoinCoGroupFunction封装起来,而在这一步会将所有注入的对象用在coGroup上。详情见下一个Window CoGroup的解析。
    • Runtime: 与Window CoGroup相同,详情见下一个WIndow CoGroup解析
  • Window CoGroup

    • 根据Key和window将两个DataStream的元素聚集在两个集合中,根据CoGroupFunction来处理这两个集合,并产出结果
    • 参数 1. DataStream 2. KeySelector1 3. KeySelector2 4. WindowAssigner 5. CoGroupFunction
    • 返回DataStream
    • 例子:
    dataStream.coGroup(otherStream)
        .where(0).equalTo(1)
        .window(TumblingEventTimeWindows.of(Time.seconds(3)))
        .apply (new CoGroupFunction () {...});
    
    • Transformation:生成一个TaggedUnion类型和unionKeySelector,里面分别包含了两个流的元素类型和两个流的KeySelector。将两个流通过map分别输出为类型是TaggedUnion的两个流(map详情见StreamMap),再Union在一起(详情见Union),再使用合并过后的流和unionKeySelector生成一个KeyedStream(详情见KeyBy),最后使用KeyedStream的window方法并传入WindowAssigner生成WindowedStream,并apply CoGroupFunction来处理(详情见WindowedStream Apply方法)。总体来说,Flink对这个方法做了很多内部的转换,最后生成了两个StreamMapTransformation,一个PartitionTransformation和一个包含了WindowOperator的OneInputTransformation。


      CoGroupTransformation
    • Runtime:参考每个Transformation对应的Runtime情况

  • Connect

    • 将两个DataStream连接在一起,使得他们之间可以共享状态
    • 参数 DataStream
    • 返回ConnectedStreams
    • 例子:
    DataStream<Integer> someStream = //...
    DataStream<String> otherStream = //...
    
    ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
    
    • Transformation:在这一步会生成一个包含了两个DataStream的ConnectedStreams对象,不会有Transformation产生。详情见后续ConnectedStreams的API详解。
  • Split

    • 按照一个规则将一个流的元素产出到两个或多个支流(每个元素可以发送到不止一个支流)
    • 参数 OutputSelector
    • 返回 SplitStream
    • 例子:
    SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
        @Override
        public Iterable<String> select(Integer value) {
            List<String> output = new ArrayList<String>();
            if (value % 2 == 0) {
                output.add("even");
            }
            else {
                output.add("odd");
            }
            return output;
        }
    });
    
    • Transformation:在这一步会生成一个SplitTransformation,里面包含了OutputSelector。


      SplitTransformation
    • Runtime: 在生成StreamGraph时找到父Transformation,并将OutputSelector注入到父StreamNode中。生成JobGraph的时候在注入到对应的JobNode中,最后在运行时封装到OperatorChain的OutputCollector中并且注入算子。


      SplitRuntime
  • Iterate

    • 通过将一个算子的输出重定向到某个输入Operator上来创个一个循环。非常适合用来持续更新一个模型。
    • 过程 DataStream → IterativeStream → DataStream
    • 例子:
    IterativeStream<Long> iteration = initialStream.iterate();
    DataStream<Long> iterationBody = iteration.map (/*do something*/);
    DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
        @Override
        public boolean filter(Integer value) throws Exception {
            return value > 0;
        }
    });
    iteration.closeWith(feedback);
    DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
        @Override
        public boolean filter(Integer value) throws Exception {
            return value <= 0;
        }
    });              
    
    • Iterate不展开讲解
  • ExtractTimestamps

    • 从元素中提取timestamp来用作事件时间(EventTime)。
    • 参数 TimeStampExtractor
    • 返回 DataStream
    • 例子:
    stream.assignTimestamps (new TimeStampExtractor() {...}); 
    
    • Transformation:assignTimestamps会将TimeStampExtractor注入进刚创建的ExtractTimestampsOperator,再通过ExtractTimestampsOperator生成一个OneInputTransformation


      ExtractTimestampsTransformation
    • Runtime:


      ExtractTimestampsTask
  • Project

    • 如果元素是Tuple,直接通过index提取出Tuple中的字段组成新的Tuple,并产出结果
    • 参数 Tuple中的index(int, 一个或多个)
    • 返回 DataStream
    • 例子:
    DataStream<Tuple3<Integer, Double, String>> in = // [...]
    DataStream<Tuple2<String, Integer>> out = in.project(2,0);
    
    • Transformation:生成一个OneInputTransformation并包含StreamProjection算子


      StreamProjectionTransformation
    • Runtime


      StreamProjectionTask
  • Custom partitioning

    • 通过用户定义的流分区器(Partitioner)将每个元素传输到指定的subtask
    • 参数 Partitioner, Tuple索引/POJO属性名/KeySelector
    • 返回 DataStream
    • 例子:
    dataStream.partitionCustom(partitioner, "someKey");
    dataStream.partitionCustom(partitioner, 0);
    
    • Transformation:partitionCustom类似于KeyBy,不过partitioner是由自己定制并且输出的不是KeyedStream。首先会通过KeySelector和用户实现的Partitioner生成一个CustomPartitionerWrapper(StreamPartitioner),再讲它注入到PartitionTransformation。


      CustomPartitioningTransformation
    • Runtime:将Partitioner注入StreamRecordWriter中用于将上一个Task的输出元素指定到某一个ResultSubParition中


      CustomPartitioningTask
  • Random partitioning

    • 将元素按照均匀分布打散到下游
    • 返回 DataStream
    • 例子:
    dataStream.shuffle();
    
    • Transformation: 将partitioner换成ShufflePartitioner,其余同上
    • Runtime:同上
  • Rebalancing (Round-robin partitioning)

    • 通过轮询调度(Round-robin)将元素均匀的分配到下游
    • 返回 DataStream
    • 例子
    dataStream.rebalance();
    
    • Transformation: 将partitioner换成RebalancePartitioner,其余同上
    • Runtime:同上
  • Rescaling

    • 通过轮询调度将元素从上游的task一个子集发送到下游task的一个子集
    • 返回 DataStream
    • 原理:第一个task并行度为2,第二个task并行度为6,第三个task并行度为2。从第一个task到第二个task,Src的子集Src1 和 Map的子集Map1,2,3对应起来,Src1会以轮询调度的方式分别向Map1,2,3发送记录。从第二个task到第三个task,Map的子集1,2,3对应Sink的子集1,这三个流的元素只会发送到Sink1。
      假设我们每个TaskManager有三个Slot,并且我们开了SlotSharingGroup,那么通过rescale,所有的数据传输都在一个TaskManager内,不需要通过网络。
    • rescale.png
    • 例子
    dataStream.rescale();
    
    • Transformation: 将partitioner换成RescalePartitioner,其余同上
    • Runtime:同上
  • Broadcasting

    • 将元素广播到每个分区
    • 返回DataStream
    • 例子:
    dataStream.broadcast();
    

KeyedStream

  • Reduce

    • 根据ReduceFunction将元素与上一个reduce后的结果合并,产出合并之后的结果。
    • 参数 ReduceFunction
    • 返回 DataStream
    • 例子:
    keyedStream.reduce(new ReduceFunction<Integer>() {
        @Override
        public Integer reduce(Integer value1, Integer value2)
        throws Exception {
            return value1 + value2;
        }
    });
    
    • Transformation:生成一个OneInputTransformation并包含StreamGroupedReduce算子


      KeyedReduceTransformation
    • Runtime:


      KeyedReduceTask
  • Fold

    • 根据FoldFunction和初始值,将元素与上一个fold过后的结果合并,产出合并之后的结果。
    • 参数 FoldFunction
    • 返回 DataStream
    • 例子:
    DataStream<String> result =
      keyedStream.fold("start", new FoldFunction<Integer, String>() {
        @Override
        public String fold(String current, Integer value) {
            return current + "-" + value;
        }
      });
    
    • Transformation:将StreamGroupedReduce换成StreamGroupedFold,其余同Reduce
    • Runtime:将StreamGroupedReduce换成StreamGroupedFold,其余同Reduce
  • Aggregations

    • Flink实现的一系列聚合方法,具体作用由方法名就可以得知
    • 返回 DataStream
    • 例子:
    keyedStream.sum(0);
    keyedStream.sum("key");
    keyedStream.min(0);
    keyedStream.min("key");
    keyedStream.max(0);
    keyedStream.max("key");
    keyedStream.minBy(0);
    keyedStream.minBy("key");
    keyedStream.maxBy(0);
    keyedStream.maxBy("key");
    
    • Transformation:StreamGroupedReduce里注入了Flink内置的Aggregation方法实现,同Reduce
    • Transformation:同Reduce
  • Window

    • 窗口将同一个key的元素按照某种特性聚集在一起(如时间:滑动窗口,翻转窗口,会话窗口,又如出现次数:计数窗口)
    • 返回WindowedStream
    • 参数WindowAssigner
    • 例子:
    dataStream.window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
    
    • Transformation: 生成一个WindowedStream,不产生Transformation,详情见WindowedStream详解
    • Runtime:详情见WindowedStream
  • Interval Join

    • 给定一个时间间隔,将两个流中的元素按照key来做join
    • 满足条件e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
    • 参数 1. KeyedStream 2. Time: LowerBound and UpperBound 3. boolean(optional) 4. boolean(optional) 5. IntervalJoinFunction
    • 返回DataStream
    • 例子:
    // this will join the two streams so that
    // key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
    keyedStream.intervalJoin(otherKeyedStream)
        .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
        .upperBoundExclusive(true) // optional
        .lowerBoundExclusive(true) // optional
        .process(new IntervalJoinFunction() {...});
    

WindowedStream

  • Apply

    • 使用WindowFunction对window重的元素做处理(例如聚合操作)并产出结果
    • 参数 WindowFunction
    • 返回 DataStream
    • 例子:
    windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
        public void apply (Tuple tuple,
                Window window,
                Iterable<Tuple2<String, Integer>> values,
                Collector<Integer> out) throws Exception {
            int sum = 0;
            for (value t: values) {
                sum += t.f1;
            }
            out.collect (new Integer(sum));
        }
    });
    
    • Transformation:


      WindowApplyTransformation
    • Runtime:


      WindowApply Task
  • Reduce

    • 根据ReduceFunction将窗口中的元素按照key和window合并,并产出结果
    • 参数 ReduceFunction
    • 返回DataStream
    • 例子
    windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
            return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
        }
    });
    
    • Transformation:基本同上,将ReduceFunction注入到WindowOperator中(具体注入方式要看有没有evictor,这边不作赘述)。
    • Runtime:同上
  • Aggregations

    • Flink实现的一系列聚合方法,具体作用由方法名就可以得知,需要注意的是他们被分别作用在按key和window分割过后的元素集合上
    • 返回 DataStream
    • 例子:
    windowedStream.sum(0);
    windowedStream.sum("key");
    windowedStream.min(0);
    windowedStream.min("key");
    windowedStream.max(0);
    windowedStream.max("key");
    windowedStream.minBy(0);
    windowedStream.minBy("key");
    windowedStream.maxBy(0);
    windowedStream.maxBy("key");
    
    • Transformation:WindowOperator里注入了Flink内置的Aggregation方法实现,其余同上
    • Runtime:同上

AllWindowedStream

  • Apply

    • 使用WindowFunction对window重的元素做处理(例如聚合操作)并产出结果
    • 与WindowedStream的区别在于是否有key
    • 参数 WindowFunction
    • 返回 DataStream
    • 例子
    // applying an AllWindowFunction on non-keyed window stream
    allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
        public void apply (Window window,
                Iterable<Tuple2<String, Integer>> values,
                Collector<Integer> out) throws Exception {
            int sum = 0;
            for (value t: values) {
                sum += t.f1;
            }
            out.collect (new Integer(sum));
        }
    });
    
    • Transformation:AllWindowedStream.apply()与WindowedStream.apply()基本是一致的,只是没有KeySelector
    • Runtime:通WindowedStream.apply()

ConnectedStreams

  • CoMap, CoFlatMap

    • 同时对两个流进行Map或FlatMap操作
    • 参数 CoMapFunction, CoFlatMapFunction
    • 返回 DataStream
    • 例子:
    connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
        @Override
        public Boolean map1(Integer value) {
            return true;
        }
        
        @Override
        public Boolean map2(String value) {
            return false;
        }
    });
    connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
    
       @Override
       public void flatMap1(Integer value, Collector<String> out) {
           out.collect(value.toString());
       }
    
       @Override
       public void flatMap2(String value, Collector<String> out) {
           for (String word: value.split(" ")) {
             out.collect(word);
           }
       }
    });
    
    • Transformation:ConnectedStream并不会产生Transformation,只会保存两个Input DataStream,从inputs中的DataStream获取父Transformation,并生成一个CoStream(Flat)Map算子。KeySelector依赖于父Transformation注入(如果是PartitionTransformation的话)。


      Co(Flat)MapTransformation
    • Runtime: Task会具体负责调用processElement1方法还是processElement2方法。


      CoStream(Flat)MapTask

SplitStream

  • Select

    • 根据SplitStream中OutputSelector设定的规则获取一个或多个DataStream
    • 参数 OutputNames
    • 返回 DataStream
    • 例子:
    SplitStream<Integer> split;
    DataStream<Integer> even = split.select("even");
    DataStream<Integer> odd = split.select("odd");
    DataStream<Integer> all = split.select("even","odd");
    
    • Transformation:生成SelectTransformation,里面包含了OutputSelector


      SelectTransformation
    • Runtime:生成StreamGraph时会将OutputNames注入到新生成的StreamEdge中,然后注入到对应的JobEdge中,最后用它来生成OutputCollector中的outputMap,发送消息时根据相应的selectedName发送到相应的下游Task


      Select Runtime
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,718评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,683评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,207评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,755评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,862评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,050评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,136评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,882评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,330评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,651评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,789评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,477评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,135评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,864评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,099评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,598评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,697评论 2 351

推荐阅读更多精彩内容

  • pyspark.sql模块 模块上下文 Spark SQL和DataFrames的重要类: pyspark.sql...
    mpro阅读 9,448评论 0 13
  • 原文链接:https://ci.apache.org/projects/flink/flink-docs-rele...
    写Bug的张小天阅读 37,374评论 4 19
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,637评论 18 139
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,602评论 18 399
  • 昨夜梦痕犹在醒来细雨淅沥时光几移 疲惫拖着沉重的步子疼痛则有其自然的规律先是如灼伤般切肤后而针刺似的入骨 猬务丛集...
    湛予阅读 295评论 0 0