storm实验想法

经验

WC:input 2K/S , max 设置3000

image.png

http://storm.apache.org/releases/1.2.2/FAQ.html
中文版:http://ifeve.com/storm-faq/

Metrics获取

storm

方法一:metrics_v2:尝试无结果,官网说明和例子都不全,不采用
方法二:metrics,但是kafkaspout里面重载不方便,不采用
方法三:Storm UI REST API,链接:http://storm.apache.org/releases/1.2.2/STORM-UI-REST-API.html

/api/v1/topology/summary                 返回所有 Topology 简要信息,获取id,应用到下面
/api/v1/topology/:id                          返回 Topology id 的统计信息

Throughput 和 latency

throughput:
①spout的ACK函数里面统计计算
②设计metrics接口
latency:
①spout里面给每个tuple建立ID,然后hashmap(ID,timestamp),在ACK里面计算该ID的currenttime和map里面的time

Storm vs. Trident: When not to use Trident?

因为Trident提供了exactly-onece保证,这增加了系统的复杂性,增加了额外的开销,降低了系统的性能,产生延迟。

Usage

wordcount:

/*usage:
 * 
 * cluster:
 * ./storm jar ./WordCount-0.0.1-SNAPSHOT-jar-with-dependencies.jar Topology.WordCountTopology WC.yaml pg10100.txt cluster
 * local:
 * ./storm jar ./WordCount-0.0.1-SNAPSHOT-jar-with-dependencies.jar Topology.WordCountTopology WC.yaml pg10100.txt
*/ 

topk:

/*usage:
 * 
 * cluster:
 * ./storm jar ./Top-K-0.0.1-SNAPSHOT-jar-with-dependencies.jar Topology.TopKtopology TK.yaml nofile cluster
 * local:
 * ./storm jar ./Top-K-0.0.1-SNAPSHOT-jar-with-dependencies.jar Topology.TopKtopology TK.yaml noflie
*/
*/ 

spamfilter:

/*usage:
 * 
 * cluster:
 * ./storm jar ./SpamFilter-0.0.1-SNAPSHOT-jar-with-dependencies.jar Topology.SpamFilterTopology ~/myconf/LSF.yaml enron2.json cluster
 * local:
 * ./storm jar ./SpamFilter-0.0.1-SNAPSHOT-jar-with-dependencies.jar Topology.SpamFilterTopology SF.yaml enron2.json
*/

smartgrid:

/*usage:
 * 
 * cluster:
 * ./storm jar ./SmartGrid-0.0.1-SNAPSHOT-jar-with-dependencies.jar Topology.SmartGridTopology SG.yaml sorted100M1.csv cluster
 * local:
 * ./storm jar ./SmartGrid-0.0.1-SNAPSHOT-jar-with-dependencies.jar Topology.SmartGridTopology SG.yaml sorted100M1.csv
*/

bargainindex:

/*usage:
 * 
 * cluster:
 * ./storm jar ./BargainIndex-0.0.1-SNAPSHOT-jar-with-dependencies.jar Topology.BargainIndexTopology BI.yaml nofile cluster
 * local:
 * ./storm jar ./BargainIndex-0.0.1-SNAPSHOT-jar-with-dependencies.jar Topology.BargainIndexTopology BI.yaml nofile
*/

Throughput的测试

一般的实验都是采用查看UI的10分钟的时间窗口统计计算

Throughput (i.e tuples/sec) was calculated by dividing the total ACKs for the 10 min window by 600


topolog的名字不能带空格

错误:会报错storm '*/stormconf.ser' does not exist 问题
原因:启动的时候需要寻找这个文件,但是目录中的拓扑名字中的空格符号变成目录中的“+”号,导致了不能正常识别路径


集群输出结果到TXT

经过试验测试,txt会在提交拓扑的节点上生成,而不是在执行sink节点上生成文件(sink节点的逻辑是生成txt文件,并且写入运算结果的信息)。这样导致了sink节点无法读取到生成的文件,并且写入数据。就算在sink节点上提交拓扑,是txt文件生成在该节点上,实验测试还是该错误。


错误

错误指向的代码段
  • 方法1:每个节点提前生成result.txt文件,没用,可能因为分布式运算的问题,使用文件系统可能有用,比如HDFS。
  • 方法2:使用输出结果到kafka的topic中,有用

集群启动指令

./storm jar ./storm_topK_test-0.0.1-SNAPSHOT-jar-with-dependencies.jar Topology.TopKtopology TK.yaml nofile cluster 获取jar包内部resources的config文件,修改config.yaml后,需要重新编译jar包

./storm jar ./Top-K-0.0.1-SNAPSHOT-jar-with-dependencies.jar Topology.TopKtopology ~/myconf/LTK.yaml nofile cluster 获取jar包同一目录下的conf文件夹的config文件,只用修改config.yaml就行了,不需要重新编译jar包 ---------------------------推荐

但是
./storm jar ./Top-K-0.0.1-SNAPSHOT-jar-with-dependencies.jar Topology.TopKtopology ~/myconf/LTK.yaml ./dataset/xxx.txt cluster 无法获取本地的txt数据源文件

本地模式:
./storm jar ./storm_topK_test-0.0.1-SNAPSHOT-jar-with-dependencies.jar Topology.TopKtopology TK.yaml

./storm jar ./SpamFilter-0.0.1-SNAPSHOT-jar-with-dependencies.jar Topology.SpamFilterTopology SF.yaml enron2.json cluster


获取resources目录下的文件

windows:
String inpath = this.getClass().getClassLoader().getResource(path).getPath(); 成功

Linux:
inpath = this.getClass().getClassLoader().getResource(path).getPath(); 失败
main函数这样可以

        InputStream resource = null;
        resource = MyUtils.getResources(args);
        InputStreamReader isr=new InputStreamReader(resource,"utf8");                     
        BufferedReader  br=new BufferedReader(isr); 
        System.out.println("!!!!!!!!!"+br.readLine()); 

非main函数

image.png

序列化问题java.io.NotSerializableException: tools.Word

log

解决:使用Kryo数据类型加入序列化

public class Word implements Serializable {

    /**
     * 
     */
    private static final long serialVersionUID = 5797839444945009067L;
  public static class WordSerializer extends Serializer<Word> {
            @Override
            public void write (Kryo kryo, Output output, Word object) {
                output.writeString(object.word);
                output.writeInt(object.countBad);
                output.writeInt(object.countGood);
                output.writeFloat(object.rBad);
                output.writeFloat(object.rGood);
                output.writeFloat(object.pSpam);
            }

            @Override
            public Word read (Kryo kryo, Input input, Class<Word> type) {
                return new Word(input.readString(), input.readInt(), input.readInt(),
                        input.readFloat(), input.readFloat(), input.readFloat());
            }
        }
}

Checkpoint

initState:初始化bolt,使用之前存储的state,触发在prepare之后,在处理tuple之前。
实践:不管有没有故障,都会触发initState,而且在prepare之后。
checkpoint周期:默认每秒,可以通过topology.state.checkpoint.interval.ms设置
state的存储:可以通过配置topology.state.provider,例如redis:topology.state.provider: org.apache.storm.redis.state.RedisKeyValueStateProvider,还有个是Hbase,默认内存
Ps:topology.state.checkpoint.interval.ms is lower than topology.message.timeout.secs
Recovery:在拓扑开始的时候触发,按情况调用rollback,commit。结束后,bolt完成按state初始化。或者在bolts的ACK失败,worker crashed也会触发recovery。

“The recovery phase is triggered when the topology is started for the first time. If the previous transaction was not successfully prepared, a rollback message is sent across the topology so that if a bolt has some prepared transactions it can be discarded. If the previous transaction was prepared successfully but not committed, a commit message is sent across the topology so that the prepared transactions can be committed. After these steps are complete, the bolts are initialized with the state.”

KeyValueState implementation should also implement the methods defined in the org.apache.storm.state.KeyValueState interface.

  • The default state used is 'InMemoryKeyValueState' which does not persist the state across restarts. You could use 'RedisKeyValueState' to test state persistence by setting below property in conf/storm.yaml

spout的state功能好像还没有开发出来

State的在yaml中相关配置

topology.state.provider : "org.apache.storm.redis.state.RedisKeyValueStateProvider"
topology.state.provider.config : '{"keyClass": "java.lang.String","valueClass": "java.lang.Integer",
                                   "keySerializerClass": null, "valueSerializerClass": null,
                                   "jedisPoolConfig": {"host": "10.11.6.79", "port": 6379,
                                     "timeout": 10000, "database": 0,"password": null }}'

keySerializerClass:null是因为内置了默认的序列化,根据KeyClass来寻找到对应的序列化。
注意:需要写全名字,package写全。

内置默认序列化


Redis

CMD中输入ipconfig查看VPN82中的IP地址
然后redis的目录中redis.windows.conf中设置#bind
然后redis目录中启动.\redis-server redis.windows.conf服务
客户端:.\redis-cli.exe -h 10.11.8.11 -p 6379
按理说集群可以访问本机的redis,IP:10.11.8.11
需要修改redis.conf中的注释掉bindprotected-mode no,别的机器才能连接redis服务器

实践:集群似乎不能连接到window的redis,所以使用79作为redis服务器

worker

TOPOLOGY_WORKERS = "topology.workers";
设定多少个节点服务器来执行这个拓扑

Guaranteeing Message Processing

  • at least once:
    发射:
    需要加入msgID保证,tuple构成tuple tree,用于追踪是否完全完成
    /**
     * Emits a new tuple to the default output stream with the given message ID.
     * When Storm detects that this tuple has been fully processed, or has failed
     * to be fully processed, the spout will receive an ack or fail callback respectively
     * with the messageId as long as the messageId was not null. If the messageId was null,
     * Storm will not track the tuple and no callback will be received.
     * Note that Storm's event logging functionality will only work if the messageId
     * is serializable via Kryo or the Serializable interface. The emitted values must be immutable.
     *
     * @return the list of task ids that this tuple was sent to
     */
public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
        return _delegate.emit(streamId, tuple, messageId);
    }

接收,处理,发射:
加入anchor,or anchors

 /**
     * Emits a new tuple to the default stream anchored on a group of input tuples. The emitted
     * values must be immutable.
     * 
     * @param anchors the tuples to anchor to
     * @param tuple the new output tuple from this bolt
     * @return the list of task ids that this new tuple was sent to
     */
    public List<Integer> emit(Collection<Tuple> anchors, List<Object> tuple) {
        return emit(Utils.DEFAULT_STREAM_ID, anchors, tuple);
    }


    /**
     * Emits a new tuple to the default stream anchored on a single tuple. The emitted values must be 
     * immutable.
     * 
     * @param anchor the tuple to anchor to
     * @param tuple the new output tuple from this bolt
     * @return the list of task ids that this new tuple was sent to
     */
    public List<Integer> emit(Tuple anchor, List<Object> tuple) {
        return emit(Utils.DEFAULT_STREAM_ID, anchor, tuple);
    }

Multi-anchoring 适用于joins或者aggregations,一个 multi-anchored tuple失败的话,会导致上游多个spout重发相关的tuples。

  • exactly once:
    需要使用Trident

内部调用fail

可以手动设置OutputCollector的fail函数立即使元组树根处的spout元组失败。 例如,可能从数据库客户端出现了异常,通过捕获了异常显式的调用fail函数,能比等待超时出发fail机制,更快的重发tuple

每个tuple必须ack或者fail,storm使用内存来追踪每个tuple,如果没有ack/fail每个tuple,那么内存最终会耗尽

对于filters和 simple的函数,storm封装了basicbolt ,它的BasicOutputCollector 会自动的anchor输入tuple。而且会在execute方法结束的时候自动的ack输入tuple。
!!!!!!!但是!!!!!!!!
对于那些 Aggregations and joins这种复杂的,需要延时ack直到完成一批tuples,并且这种 Aggregations and joins通常需要multi-anchor,这种超出了简单的封装IBasicBolt。
所以还是手动的用RichBolt设定ack和anchor。

如果丢失数据允许的话,可以设置[Config.TOPOLOGY_ACKERS]使acker bolts 为0 。也可以设置多个来处理但在某些情况下,您希望确保所有内容都至少处理过一次,并且没有任何内容被删除。 如果所有操作都是幂等的,或者重复删除可能会发生,则此功能尤其有用。

How does Storm implement reliability in an efficient way?

使用一系列的"acker" tasks 追踪每个 spout tuple,构成一个DAG(tuple tree)。当一个acker发现一个DAG完成了,那么他会发送一个信息给spout task确认该这个对应的spout tuple的可以发送ACK信息了。默认是每个worker一个task来处理ack。每个tuple生成的时候,不管在spout还是bolt内,会被系统随机分配一个64位的id,ackers使用这些id来跟踪每个spout tuple的tuple DAG。每个tuple都知道自己所在的DAG(tuple tree)中的spout tuple的ID。
每当你在bolt中生成一个新的tuple的时候,新tuple ancher的tuple的spout tuples ids会复制到新tuple的信息里。
当一个tuple发送了ACK信息,就相当于告诉了 acker tasks关于DAG(tuple tree)更新了。“我在这个tree中完成了特定spout tuple的任务,并且有新的下游tree节点的tuple要anchor我”
比如下面的C执行了ack,生成了新的tuple D,E:


DAG(tuple tree)

解读:移除了C,加入了D,E,这个tree不会过早的完成。

疑问:如果有多个acker tasks ,那么一个tuple 怎么知道自己应该给哪个acker发送acked信息呢?
答:storm 使用哈希函数吧spout id和acker task映射起来,因为每个tuple都携带有该tuple tree中spout id 的信息,所以知道可以对应找哪个acker task。
疑问2:acker tasks怎么把追踪的spout tuple和spout task对应起来?
答:当一个spout发送一个新的tuple的时候,它会发送一个信息给适当的acker,从而绑定task id和spout tuple。当一个tree完成的时候,acker会给spout task发送完成信号。

acker task不会完全的追踪tuple tree上所有的tuple,因为这样,如果一个tuple tree有成千上万的tuple,全部追踪的话会导致内存的消耗太大。而是采用另一种策略,每个spout分配固定的空间,大约20字节

acker task 存了一个map:{spout tuple:spout task id, 64位的ack val数字}
spout task id:表示该spout tuple在哪个spout task上生成,用于后面ack给该spout发送信息
ack val:代表整个tuple tree的状态,他是该tree中所有的tuple id(生成的/ACKed)异或运算的结果。

如果一个ack val变成了0,那么代表了tuple tree完成了。出错成0的概率很低.
如果你以每秒10K acks的速度进行数学运算,则需要花费50,000,000年才会出现错误。 即使这样,如果拓扑中的元组发生故障,它也只会导致数据丢失。

下面是storm怎么保证故障发生的时候,避免数据的丢失:

  • A tuple isn't acked because the task died:这种情况,tree上的这个tuple 对应的根节点spout tuple id会超时并且重发。
  • Acker task dies:这个acker追踪的所有sput tuples都会重发。
  • Spout task dies:spout对应联系的系统会重发消息。比如Kestrel系统,如果客户端失去了连接,那么pending信息会放入发送队列中。

另外Ack机制还常用于限流作用: 为了避免spout发送数据太快,而bolt处理太慢,常常设置pending数,当spout有等于或超过pending数的tuple没有收到ack或fail响应时,跳过执行nextTuple, 从而限制spout发送数据。

通过conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, pending);设置spout pend数。

image.png

window的ack

目前只能支持at-least-once,execute(TupleWindow inputWindow)自动支持了anchored所有窗口内的tuple,下游bolts,应该ack收到上游窗口输出的tuple,这样才能建立完整的tuple tree,如果没有,将重发tuple,并重新计算窗口计算。
窗口内滑出的tuple会被自动ack,注意:

  • 对于时间窗口:topology.message.timeout.secs应该比windowLength + slidingInterval设置的更长,
  • 对于count窗口:应该调整topology.message.timeout.secs,使windowLength + slidingInterval的tuple 运算不会超时间。

stateful window

默认window内的tuple是存到内存里,直到他们被处理完并且过期,滑出窗口。
故障恢复会重发为过期的tuple,也就是没滑出窗口,没ACK的tuple。


image.png

第一种:Stateful windowing
根据messageID,缓存tuple重发,也能存储状态
使用kafka的时候把timestamp当成msgID.withMessageIdField("timestamp")
第二种:Window checkpointing
不需要messageID,因为定期存储窗口内tuple和状态到redis中,2.0.0版本支持

window的state

stateful 窗口是用来为了故障恢复的避免重复计算。

故障的时候,窗口内的tuple会被重发,如果你保存了计算状态并且回复了,这会造成重复的计算,得自己实现逻辑这些保存的状态信息,来保证不会造成窗口的重复计算。
storm 的状态机制,不会消除重复的计算,只能保证至少一次的。

image.png

有关于状态的保存,恢复来消除重复计算

If the system crashes tuples e1 to e8 gets replayed (assuming ack for e1 and e2 did not reach the acker) and w1, w2 and w3 will get re-evaluated. Stateful windowing tries to minimize the duplicate window evaluations by saving the last evaluated and last expired state of the window. Stateful windowing expects a monotonically increasing message id to be part of the tuple and uses the stateful abstractions discussed above to save the last expired and last evaluated message ids in the state. During recovery the last expired and last evaluated message ids are used to avoid duplicate window evaluations. Tuples with message id lower than last expired id are discarded and tuples with message id between the last expired and last evaluated message ids are fed into the system without activating any triggers. The tuples beyond the last evaluated message ids are processed as usual.

https://community.hortonworks.com/articles/14171/windowing-and-state-checkpointing-in-apache-storm.html

acker task

acker是轻量级,可以在UI的 "__acker"中查看他们的性能,一般不需要设置很多数目,如果吞吐量看起来不正常,那么需要增加acker的数目。

如果不在意reliability ,这功能会导致占用带宽的使用,因为传送tuple 的ack信息,id的信息。那么可以通过下面设置移除reliability保证。

  1. Config.TOPOLOGY_ACKERS = 0,这种情况下spout一发送一个tuple,就会调用ack信息,不会造成tuple tree的追踪。

  2. 在message的基础上除去。例如在某个spout 的tuple上省去id信息,那么这个tuple就不会被追踪了。SpoutOutputCollector.emit。

  3. 最后,如果您不关心拓扑中下游的元组的特定子集是否无法处理,则可以将它们作为unanchored 的tuples发出。 因为它们没有anchored到任何spout tuples,所以如果它们没有acked,它们将不会导致任何spout tuples 失败。

总结

所以是不是可以把kafka的offset当成msgid?

spout:在emit中放入该tuple第一无二的msgid
_collector.emit(new Values("field1", "field2", 3) , msgId);

bolt:在emit中放入ancher的上游tuple,并且调用ack函数
_collector.emit(tuple, new Values(word)); _collector.ack(tuple);
注意:封装的Basebolt会自动放入anchor和调用ack函数,但是只适用于简单的功能bolt,如filter等,对于那些join,aggregate等需要anchor多个输入tuple的需要使用Richbolt

(Tuples emitted to BasicOutputCollector are automatically anchored to the input tuple, and the input tuple is acked for you automatically when the execute method completes.)

ACK实践

kill bolt后,spout会提示是去连接那个bolt的提示,等连接成功,日记里有在是去连接的时候,还是发送一些spout tuple,这些tuple 属于丢失的数据,等spout收到这些tuple 的fail ack信息后,spout会重发这些缓存的数据。


spout的log

rebalance

image.png

parallelism

大于1的话,多个线程执行对不同的数据执行相同的代码。
多窗口的话,每个窗口维护自己的长度等,变量也是自己的

乱序问题

乱序的问题需要使用滑动窗口

控制速率

使用模拟数据:生成数据spout,可以设置sleep延时,使用map来缓存源数据,然后ACK和FAIL来处理缓存数据。但是重发的时候会乱序发送未被ack的数据。乱序问题。
使用kafkaspout,内嵌了ACK机制,按顺序发送原来的数据,但是速率不知道如何控制,如果在下一个bolt,比如split bolt设置延时,会导致spout缓存太多数据,然后不断message timeout。
比如kafka spout-----split------wordcount,topic有1000条数据,split延时1s。实验发现kafkaspout一次性发送1000条数据,堵在split慢慢发送,count不能正常初始化状态等,设置多并行度也不行,而且kafkaspout会超时堵住的数据,重发,超时,重发,超时······

checkpoint版本编写

  1. 关于bolt,extend stateful bolt,state的put更新和initState。
  2. 关于topology,在config里面写入
topology.state.provider : "org.apache.storm.redis.state.RedisKeyValueStateProvider"
topology.state.provider.config : '{"keyClass": "java.lang.String","valueClass": "java.lang.String",
                                   "keySerializerClass": null, "valueSerializerClass": null,
                                   "jedisPoolConfig": {"host": "10.11.6.79", "port": 6379,
                                     "timeout": 10000, "database": 0,"password": null }}'
  1. 在pom里面写入redis的依赖
<dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-redis</artifactId>
            <version>1.2.1</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>

Knobs参数

  1. input rate
    调整输入流流速,两种设定:
    1)Constant value,即跑完一次实验,输入流流速为固定值,无变化;但是不同组实验有不同的constant value;暂定1000 tuples/s、2000 tuples/s、5000 tuples/s、10^4 tuples/s、10^5 tuples/s。
    2)Fluctuating value,即跑完一次实验的过程中,输入流流速是随着时间而变化的,存在较大的波动(比如徒然上升很多)。暂定 100 ~ 5*10^5 tuples/s 范围内波动。

  2. checkpoint rate
    发起状态检查点创建的频率,即两次创建检查点之间的时间间隔。
    暂定为500ms、1s、5s、10s、50s,默认值为5s。

  3. window size
    窗口大小,本文使用时间窗口,即窗口时长
    暂定为1s、10s、30s、1min、10min,默认值为10s。

  4. skew distribution
    源数据的倾斜分布,用zipfdistribution来控制,参数z越大,倾斜程度越大。
    暂定:0.0均匀分布、0.5、1.0、2.0、5.0,默认值为1.0。


评估指标

  1. processing latency
  2. throughput
  3. resource overhead
  4. recovery latency
  5. recovery accuracy
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,686评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,668评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,160评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,736评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,847评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,043评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,129评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,872评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,318评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,645评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,777评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,861评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,589评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,687评论 2 351

推荐阅读更多精彩内容

  • 遇到聊的来的人,不要总是犹豫,顺其自然也很重要,然后就到了意想不到的地方,有时候也有这种事的。 不...
    爱笑的蚂蚁阅读 214评论 0 0
  • 前言: 之前开发所有版本的APP中,WebView使用的很少,一般都是详情页里面没有交互作用的页面展示,或者是商家...
    春江潮阅读 3,719评论 1 1
  • 当你第一眼看到“中”这个字的时候,是否第一个想起的名词就是我们伟大的祖国-中国,其次再到(中间)这个词呢? ...
    雨夜舞酒阅读 294评论 0 3
  • ​前面写文章说了二八理论,告诉大家投资也好做企业也好,都要关心什么是最重要的那20%的资源。今天的一个新理论“长尾...
    财扑船长阅读 624评论 0 1