kafka基础和python如何操作kafka

应用往Kafka写数据的原因有很多:用户行为分析、日志存储、异步通信等。多样化的使用场景带来了多样化的需求:消息是否能丢失?是否容忍重复?消息的吞吐量?消息的延迟?

kafka介绍

Kafka属于Apache组织,是一个高性能跨语言分布式发布订阅消息队列系统[7]。它的主要特点有:

以时间复杂度O(1)的方式提供消息持久化能力,并对大数据量能保证常数时间的访问性能;

高吞吐率,单台服务器可以达到每秒几十万的吞吐速率;

支持服务器间的消息分区,支持分布式消费,同时保证了每个分区内的消息顺序;

轻量级,支持实时数据处理和离线数据处理两种方式。

1.1. 主要功能

根据官网的介绍,ApacheKafka®是一个分布式流媒体平台,它主要有3种功能:

1:发布和订阅消息流,这个功能类似于消息队列,这也是kafka归类为消息队列框架的原因

2:以容错的方式记录消息流,kafka以文件的方式来存储消息流

3:可以在消息发布的时候进行处理

1.2. 使用场景

1:在系统或应用程序之间构建可靠的用于传输实时数据的管道,消息队列功能

2:构建实时的流数据处理程序来变换或处理数据流,数据处理功能

kafka生产者

首先,创建ProducerRecord必须包含Topic和Value,key和partition可选。然后,序列化key和value对象为ByteArray,并发送到网络。

接下来,消息发送到partitioner。如果创建ProducerRecord时指定了partition,此时partitioner啥也不用做,简单的返回指定的partition即可。如果未指定partition,partitioner会基于ProducerRecord的key生成partition。producer选择好partition后,增加record到对应topic和partition的batch record。最后,专有线程负责发送batch record到合适的Kafka broker。

当broker收到消息时,它会返回一个应答(response)。如果消息成功写入Kafka,broker将返回RecordMetadata对象(包含topic,partition和offset);相反,broker将返回error。这时producer收到error会尝试重试发送消息几次,直到producer返回error。

实例化producer后,接着发送消息。这里主要有3种发送消息的方法:

立即发送:只管发送消息到server端,不care消息是否成功发送。大部分情况下,这种发送方式会成功,因为Kafka自身具有高可用性,producer会自动重试;但有时也会丢失消息;

同步发送:通过send()方法发送消息,并返回Future对象。get()方法会等待Future对象,看send()方法是否成功;

异步发送:通过带有回调函数的send()方法发送消息,当producer收到Kafka broker的response会触发回调函数

以上所有情况,一定要时刻考虑发送消息可能会失败,想清楚如何去处理异常。

通常我们是一个producer起一个线程开始发送消息。为了优化producer的性能,一般会有下面几种方式:单个producer起多个线程发送消息;使用多个producer。

kafka消费者

kafka的消费模式总共有3种:最多一次,最少一次,正好一次。为什么会有这3种模式,是因为客户端处理消息,提交反馈(commit)这两个动作不是原子性。

1.最多一次:客户端收到消息后,在处理消息前自动提交,这样kafka就认为consumer已经消费过了,偏移量增加。

2.最少一次:客户端收到消息,处理消息,再提交反馈。这样就可能出现消息处理完了,在提交反馈前,网络中断或者程序挂了,那么kafka认为这个消息还没有被consumer消费,产生重复消息推送。

3.正好一次:保证消息处理和提交反馈在同一个事务中,即有原子性。

本文从这几个点出发,详细阐述了如何实现以上三种方式。

At-most-once(最多一次)

设置enable.auto.commit为ture

设置 auto.commit.interval.ms为一个较小的时间间隔.

client不要调用commitSync(),kafka在特定的时间间隔内自动提交。

At-least-once(最少一次)

方法一

设置enable.auto.commit为false

client调用commitSync(),增加消息偏移;

方法二

设置enable.auto.commit为ture

设置 auto.commit.interval.ms为一个较大的时间间隔.

client调用commitSync(),增加消息偏移;

Exactly-once(正好一次)

3.1 思路

如果要实现这种方式,必须自己控制消息的offset,自己记录一下当前的offset,对消息的处理和offset的移动必须保持在同一个事务中,例如在同一个事务中,把消息处理的结果存到mysql数据库同时更新此时的消息的偏移。

3.2 实现

设置enable.auto.commit为false

保存ConsumerRecord中的offset到数据库

当partition分区发生变化的时候需要rebalance,有以下几个事件会触发分区变化

1 consumer订阅的topic中的分区大小发生变化

2 topic被创建或者被删除

3 consuer所在group中有个成员挂了

4 新的consumer通过调用join加入了group

此时 consumer通过实现ConsumerRebalanceListener接口,捕捉这些事件,对偏移量进行处理。

consumer通过调用seek(TopicPartition, long)方法,移动到指定的分区的偏移位置。

Broker

Kafka是一个高吞吐量分布式消息系统,采用Scala和Java语言编写,它提供了快速、可扩展的、分布式、分区的和可复制的日志订阅服务。它由Producer、Broker、Consumer三部分构成.

Producer向某个Topic发布消息,而Consumer订阅某个Topic的消息。 一旦有某个Topic新产生的消息,Broker会传递给订阅它的所有Consumer,每个Topic分为多个分区,这样的设计有利于管理数据和负载均衡。

Broker:消息中间件处理结点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。

Controller:中央控制器Control,负责管理分区和副本状态并执行管理着这些分区的重新分配。(里面涉及到partition leader 选举)

ISR:同步副本组

Topic

在Kafka中,消息是按Topic组织的.

Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。

Segment:partition物理上由多个segment组成

offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中. partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息.

topic中partition存储分布

在Kafka文件存储中,同一个topic下有多个不同partition,每个partition为一个目录,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。

cleaner-offset-checkpoint:存了每个log的最后清理offset

meta.properties: broker.id 信息

recovery-point-offset-checkpoint:表示已经刷写到磁盘的记录。recoveryPoint以下的数据都是已经刷 到磁盘上的了。

replication-offset-checkpoint: 用来存储每个replica的HighWatermark的(high watermark (HW),表示已经被commited的message,HW以下的数据都是各个replicas间同步的,一致的。)

partiton中文件存储方式

每个partion(目录)由多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。

每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。

partiton中segment文件存储结构

partion中segment file组成和物理结构。

segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀".index"和“.log”分别表示为segment索引文件、数据文件.

segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。

以一对segment file文件为例,说明segment中index<—->data file对应关系物理结构如下

Index文件存储大量元数据,指向对应log文件中message的物理偏移地址。

log数据文件存储大量消息

其中以Index文件中元数据3,497为例,依次在数据文件中表示第3个message(在全局partiton表示第368772个message)、以及该消息的物理偏移地址为497。

下面看看segment data file的内部

segment data file由许多message组成,下面详细说明message物理结构如下:

2.4 在partition中如何通过offset查找message

例如读取offset=368776的message,需要通过下面2个步骤查找。

第一步查找segment file

上述图2为例,其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件。

当offset=368776时定位到00000000000000368769.index|log

第二步通过segment file查找message

通过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置(这个较小,可以放在内存中,直接操作)和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log 顺序查找 直到offset=368776为止。

从上述图2.3节可知这样做的优点,segment index file采取稀疏索引存储方式,它减少索引文件大小,通过map可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。

2.5 读写message总结

写message

消息从java堆转入page cache(即物理内存)。

由异步线程刷盘,消息从page cache刷入磁盘。

读message

消息直接从page cache转入socket发送出去。

当从page cache没有找到相应数据时,此时会产生磁盘IO,从磁

盘Load消息到page cache,然后直接从socket发出去

Kafka高效文件存储设计特点

topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。

通过索引信息可以快速定位message和确定response的最大大小。

通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。

通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。

Kafka消费者

消费组与分区重平衡

可以看到,当新的消费者加入消费组,它会消费一个或多个分区,而这些分区之前是由其他消费者负责的;另外,当消费者离开消费组(比如重启、宕机等)时,它所消费的分区会分配给其他分区。这种现象称为重平衡(rebalance)。重平衡是Kafka一个很重要的性质,这个性质保证了高可用和水平扩展。不过也需要注意到,在重平衡期间,所有消费者都不能消费消息,因此会造成整个消费组短暂的不可用。而且,将分区进行重平衡也会导致原来的消费者状态过期,从而导致消费者需要重新更新状态,这段期间也会降低消费性能。后面我们会讨论如何安全的进行重平衡以及如何尽可能避免。

消费者通过定期发送心跳(hearbeat)到一个作为组协调者(group coordinator)的broker来保持在消费组内存活。这个broker不是固定的,每个消费组都可能不同。当消费者拉取消息或者提交时,便会发送心跳。

如果消费者超过一定时间没有发送心跳,那么它的会话(session)就会过期,组协调者会认为该消费者已经宕机,然后触发重平衡。可以看到,从消费者宕机到会话过期是有一定时间的,这段时间内该消费者的分区都不能进行消息消费;通常情况下,我们可以进行优雅关闭,这样消费者会发送离开的消息到组协调者,这样组协调者可以立即进行重平衡而不需要等待会话过期。

在0.10.1版本,Kafka对心跳机制进行了修改,将发送心跳与拉取消息进行分离,这样使得发送心跳的频率不受拉取的频率影响。另外更高版本的Kafka支持配置一个消费者多长时间不拉取消息但仍然保持存活,这个配置可以避免活锁(livelock)。活锁,是指应用没有故障但是由于某些原因不能进一步消费。

1.3. 详细介绍

Kafka目前主要作为一个分布式的发布订阅式的消息系统使用,下面简单介绍一下kafka的基本机制

1.3.1 消息传输流程

Producer即生产者,向Kafka集群发送消息,在发送消息之前,会对消息进行分类,即Topic,上图展示了两个producer发送了分类为topic1的消息,另外一个发送了topic2的消息。

Topic即主题,通过对消息指定主题可以将消息分类,消费者可以只关注自己需要的Topic中的消息

Consumer即消费者,消费者通过与kafka集群建立长连接的方式,不断地从集群中拉取消息,然后可以对这些消息进行处理。

从上图中就可以看出同一个Topic下的消费者和生产者的数量并不是对应的。

1.3.2 kafka服务器消息存储策略

谈到kafka的存储,就不得不提到分区,即partitions,创建一个topic时,同时可以指定分区数目,分区数越多,其吞吐量也越大,但是需要的资源也越多,同时也会导致更高的不可用性,kafka在接收到生产者发送的消息之后,会根据均衡策略将消息存储到不同的分区中。

在每个分区中,消息以顺序存储,最晚接收的的消息会最后被消费。

kafka中的message以topic的形式存在,topic在物理上又分为很多的partition,partition物理上由很多segment组成,segment是存放message的真正载体。

下面具体介绍下segment文件:

(1) 每个partition(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file消息数量不一定相等,这种特性方便old segment file快速被删除。

(2) 每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。

(3) segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件.

(4) segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。

segment中index<—->data file对应关系物理结构如下:

index与log映射关系

.index文件存放的是message逻辑相对偏移量(相对offset=绝对offset-base offset)与在相应的.log文件中的物理位置(position)。但.index并不是为每条message都指定到物理位置的映射,而是以entry为单位,每条entry可以指定连续n条消息的物理位置映射(例如:假设有20000~20009共10条消息,.index文件可配置为每条entry

指定连续10条消息的物理位置映射,该例中,index entry会记录偏移量为20000的消息到其物理文件位置,一旦该条消息被定位,20001~20009可以很快查到。)。每个entry大小8字节,前4个字节是这个message相对于该log segment第一个消息offset(base offset)的相对偏移量,后4个字节是这个消息在log文件中的物理位置。

1.3.3 与生产者的交互

生产者在向kafka集群发送消息的时候,可以通过指定分区来发送到指定的分区中


也可以通过指定均衡策略来将消息发送到不同的分区中

如果不指定,就会采用默认的随机均衡策略,将消息随机的存储到不同的分区中

1.3.4 与消费者的交互

在消费者消费消息时,kafka使用offset来记录当前消费的位置

在kafka的设计中,可以有多个不同的group来同时消费同一个topic下的消息,如图,我们有两个不同的group同时消费,他们的的消费的记录位置offset各不项目,不互相干扰。

对于一个group而言,消费者的数量不应该多于分区的数量,因为在一个group中,每个分区至多只能绑定到一个消费者上,即一个消费者可以消费多个分区,一个分区只能给一个消费者消费

因此,若一个group中的消费者数量大于分区数量的话,多余的消费者将不会收到任何消息。

Kafka安装与使用

1: 下载:

sudo wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.0/kafka_2.12-2.6.0.tgz

tar -xzf kafka_2.12-2.6.0.tgz

cd kafka_2.11-1.0.0

kafka运行需要java环境:

sudo apt-get install openjdk-8-jdk

2: 运行:

cd进入kafka解压目录,启动zookeeper,输入:

bin/zookeeper-server-start.sh config/zookeeper.properties

启动kafka服务:

bin/kafka-server-start.sh config/server.properties

3:创建一个topic

Kafka通过topic对同一类的数据进行管理,同一类的数据使用同一个topic可以在处理数据时更加的便捷, 在kafka解压目录打开终端,输入

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

在创建topic后可以通过输入

bin/kafka-topics.sh --list --zookeeper localhost:2181

来查看已经创建的topic

4:创建一个消息消费者:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

5: 创建一个消息生产者

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

用回车(Enter)发送消息

6: 设置多个代理集群

到目前为止,我们一直在使用单个代理,这并不好玩。对 Kafka来说,单个代理只是一个大小为一的集群,除了启动更多的代理实例外,没有什么变化。 为了深入了解它,让我们把集群扩展到三个节点(仍然在本地机器上)。

首先,为每个代理创建一个配置文件 (在Windows上使用copy 命令来代替):

1> cp config/server.properties config/server-1.properties

2> cp config/server.properties config/server-2.properties

现在编辑这些新文件并设置如下属性:

1 config/server-1.properties:

2    broker.id=1

3    listeners=PLAINTEXT://:9093

4    log.dir=/tmp/kafka-logs-1

6 config/server-2.properties:

7    broker.id=2

8    listeners=PLAINTEXT://:9094

9    log.dir=/tmp/kafka-logs-2

broker.id属性是集群中每个节点的名称,这一名称是唯一且永久的。我们必须重写端口和日志目录,因为我们在同一台机器上运行这些,我们不希望所有的代理尝试在同一个端口注册,或者覆盖彼此的数据。

我们已经建立Zookeeper和一个单节点了,现在我们只需要启动两个新的节点:

1  > bin/kafka-server-start.sh config/server-1.properties &

2  ...

3  > bin/kafka-server-start.sh config/server-2.properties &

4  ...

现在创建一个副本为3的新topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

Good,现在我们有一个集群,但是我们怎么才能知道那些代理在做什么呢?运行"describe topics"命令来查看:

1  > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

2  Topic:my-replicated-topic  PartitionCount:1    ReplicationFactor:3 Configs:

3      Topic: my-replicated-topic  Partition: 0    Leader: 1  Replicas: 1,2,0 Isr: 1,2,0

以下是对输出信息的解释。第一行给出了所有分区的摘要,下面的每行都给出了一个分区的信息。因为我们只有一个分区,所以只有一行。

“leader”是负责给定分区所有读写操作的节点。每个节点都是随机选择的部分分区的领导者。

“replicas”是复制分区日志的节点列表,不管这些节点是leader还是仅仅活着。

“isr”是一组“同步”replicas,是replicas列表的子集,它活着并被指到leader。

请注意,在示例中,节点1是该主题中唯一分区的领导者。

我们可以在已创建的原始主题上运行相同的命令来查看它的位置:

1  > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

2  Topic:test  PartitionCount:1    ReplicationFactor:1 Configs:

3      Topic: test Partition: 0    Leader: 0  Replicas: 0 Isr: 0

这没什么大不了,原来的主题没有副本且在服务器0上。我们创建集群时,这是唯一的服务器。

让我们发表一些信息给我们的新topic:

1  > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

2  ...

3  my test message 1

4  my test message 2

5  ^C

现在我们来消费这些消息:

1  > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

2  ...

3  my test message 1

4  my test message 2

5  ^C

让我们来测试一下容错性。 Broker 1 现在是 leader,让我们来杀了它:

1  > ps aux | grep server-1.properties 

2  7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...

3  > kill -9 7564

在 Windows 上用:

1  > wmic process where "caption = 'java.exe' and commandline like '%server-1.properties%'" get processid

2  ProcessId

3  6016

4  > taskkill /pid 6016 /f

领导权已经切换到一个从属节点,而且节点1也不在同步副本集中了:

1  > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

2  Topic:my-replicated-topic  PartitionCount:1    ReplicationFactor:3 Configs:

3      Topic: my-replicated-topic  Partition: 0    Leader: 2  Replicas: 1,2,0 Isr: 2,0

不过,即便原先写入消息的leader已经不在,这些消息仍可用于消费:

1  > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic

2  ...

3  my test message 1

4  my test message 2

5  ^C


python操作kafka

参考文档: https://kafka-python.readthedocs.io/en/master/usage.html, https://zhuanlan.zhihu.com/p/38330574

1 安装:

pip3 install kafka-python

消费者:


from kafka import KafkaConsumer

from kafka import TopicPartition

import json

consumer = KafkaConsumer('my_topic', bootstrap_servers=['127.0.0.1:9092'])

 consumer = KafkaConsumer('my_topic', group_id="group2", bootstrap_servers=['127.0.0.1:9092'])

第1个参数为 topic的名称

group_id : 指定此消费者实例属于的组名,可以不指定

bootstrap_servers : 指定kafka服务器

auto_offset_reset='earliest':读取目前最早可读的消息

consumer.seek(TopicPartition(topic='test', partition=0), 5)  #重置偏移量,从第5个偏移量消费

consumer = KafkaConsumer(group_id="group2", bootstrap_servers=['127.0.0.1:9092'], consumer_timeout_ms=1000)

 consumer.assign([TopicPartition(topic='my_topic', partition=0)])

 print(type(consumer))

若不指定 consumer_timeout_ms,默认一直循环等待接收,若指定,则超时返回,不再等待

consumer_timeout_ms : 毫秒数

手动指定分区:consumer.assign([TopicPartition(topic='my_topic', partition=0)])


订阅多个topic,  consumer.subscribe(topics=[])


consumer = KafkaConsumer(group_id='group2', bootstrap_servers=['127.0.0.1:9092'])

consumer.subscribe(topics=['my_topic', 'topic_1'])

# 或者使用正则订阅一类的topic

consumer.subscribe(pattern='^topic.*')

print(len(consumer))

for msg in consumer:

    print(msg)


生产者:


from kafka import KafkaProducer

from kafka.errors import KafkaError

import json

producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092']) 

future = producer.send('topic_1', key='my_key', value='hello, python', partition=0)

try:

    record_metadata = future.get(timeout=10)

except KafkaError:

    # Decide what to do if produce request failed...

    log.exception()

# Successful result returns assigned partition and offset

print(record_metadata.topic)

print(record_metadata.partition)

print(record_metadata.offset)

producer.send函数为发送消息

第1个参数为 topic名称,必须指定

key : 键,必须是字节字符串,可以不指定(但key和value必须指定1个),默认为None

value : 值,必须是字节字符串,可以不指定(但key和value必须指定1个),默认为None

partition : 指定发送的partition,由于kafka默认配置1个partition,固为0

future.get函数等待单条消息发送完成或超时,经测试,必须有这个函数,不然发送不出去,或用time.sleep代替

转成json格式发送

producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'], value_serializer=lambda m: json.dumps(m).encode('ascii'))

 future = producer.send('topic_1', value= {'my_key' : 'hello, python'}, partition=0)


压缩发送消息 compression_type='gzip'

producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'], compression_type='gzip')

future = producer.send('topic_1', key=b'my_key', value=b'hello, python', partition=0)


生产者运行结果:

my_topic

0

6

消费者运行结果:

ConsumerRecord(topic='my_topic', partition=0, offset=6, timestamp=1604989500586, timestamp_type=0, key=b'my_key', value=b'my_value', headers=[], checksum=None, serialized_key_size=6, serialized_value_size=8, serialized_header_size=-1)

topic

partition

offset : 这条消息的偏移量

timestamp : 时间戳

timestamp_type : 时间戳类型

key : key值,字节类型

value : value值,字节类型

checksum : 消息的校验和

serialized_key_size : 序列化key的大小

serialized_value_size : 序列化value的大小,可以看到value=None时,大小为-1

参考文档:

https://kafka.apachecn.org/

https://kafka-python.readthedocs.io/en/master/usage.html

https://www.pythonf.cn/read/115538

https://zhuanlan.zhihu.com/p/38330574

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容