Kafka

1介绍

Kafka是一个分布式的、可分区的、可复制的消息系统,提供了一个生产者、缓冲区、消费者的模型。

kafka模型
  • Kafka作为一个集群,运行在一台或者多台服务器上.
  • Kafka 通过 topic 对存储的流数据进行分类。
  • 每条记录中包含一个key,一个value和一个timestamp(时间戳)。

Kafka有四个核心的API:

  • The Producer API 允许一个应用程序发布一串流式的数据到一个或者多个Kafka topic。
  • The Consumer API 允许一个应用程序订阅一个或多个 topic ,并且对发布给他们的流式数据进行处理。
  • The Streams API 允许一个应用程序作为一个流处理器,消费一个或者多个topic产生的输入流,然后生产一个输出流到一个或多个topic中去,在输入输出流中进行有效的转换。
  • The Connector API 允许构建并运行可重用的生产者或者消费者,将Kafka topics连接到已存在的应用程序或者数据系统。比如,连接到一个关系型数据库,捕捉表(table)的所有变更内容。

2关键词

  • Broker:
    Kafka集群包含一个或多个服务器,这种服务器称为Broker。

  • Topic:
    Topic 就是数据主题,是数据记录发布的地方,可以用来区分业务系统。Kafka中的Topics总是多订阅者模式,一个topic可以拥有一个或者多个消费者来订阅它的数据。

  • Partitions:
    对于每一个topic, Kafka集群都会维持一个分区日志,每个分区都是有序且顺序不可变的记录集,并且不断地追加到结构化的commit log文件。分区中的每一个记录都会分配一个id号来表示顺序,我们称之为offset,offset用来唯一的标识分区中每一条记录。

2.jpg
image.png
image.png

Kafka 集群保留所有发布的记录,无论他们是否已被消费,并通过一个可配置的参数,保留期限来控制. 举个例子, 如果保留策略设置为2天,一条记录发布后两天内,可以随时被消费,两天过后这条记录会被抛弃并释放磁盘空间。Kafka的性能和数据大小无关,所以长时间存储数据没有什么问题。

事实上,在每一个消费者中唯一保存的元数据是offset(偏移量)即消费在log中的位置.偏移量由消费者所控制:通常在读取记录后,消费者会以线性的方式增加偏移量,但是实际上,由于这个位置由消费者控制,所以消费者可以采用任何顺序来消费记录。例如,一个消费者可以重置到一个旧的偏移量,从而重新处理过去的数据;也可以跳过最近的记录,从"现在"开始消费。

当数据大小超过了单台服务器的限制,允许数据进行扩展。每个单独的分区都必须受限于主机的文件限制,不过一个topic可能有多个分区,因此可以处理无限量的数据。(一个server有多个分区,一个分区只会存在于一个server上面。原文:每个分区只属于一个台服务器,所以如果有20个分区,那么全部数据(包含读写负载)将由不超过20个服务器(不包含副本)处理。)

  • 生产者
    生产者可以将数据发布到所选择的topic(主题)中。生产者负责将记录分配到topic的哪一个 partition(分区)中。可以使用循环的方式来简单地实现负载均衡,也可以根据某些其他原则(例如:记录中的key)来完成。

  • 消费者

消费者使用一个 消费组 名称来进行标识,发布到topic中的每条记录被分配给订阅消费组中的一个消费者实例。一个消费者组的消费者只有一个会获取到消息。

传统的消息系统有两个模块: 队列 和 发布-订阅。在队列中,消费者池从server读取数据,每条记录被池子中的一个消费者消费; 在发布订阅中,记录被广播到所有的消费者。

在kafka中,两个极端情况:
如果所有的消费者实例在同一消费组中,消息记录只会给到一个消费者实例. 类似队列模式。
如果所有的消费者实例在不同的消费组中,每条消息记录会广播到所有的消费者进程.类似发布-订阅模式。

consumer选择一个topic,通过id指定从哪个位置开始消费消息。消费完成之后保留id,下次可以从这个位置开始继续消费,也可以从其他任意位置开始消费。

即使消息被消费,消息仍然不会被立即删除.日志文件将会根据broker中的配置要求,保留一定的时间之后删除。

3 High-level API,Low-level api

High level api是consumer读的partition的offsite是存在zookeeper上。High level api 会启动另外一个线程去每隔一段时间,offsite自动同步到zookeeper上。如果使用了High level api, 每个message只能被读一次,一旦读了这条message之后,无论我consumer的处理是否ok。High level api的另外一个线程会自动的把offiste+1同步到zookeeper上。如果consumer读取数据出了问题,offsite也会在zookeeper上同步。

Low level api是consumer读的partition的offsite在consumer自己的程序中维护。不会同步到zookeeper上。但是为了kafka manager能够方便的监控,一般也会手动的同步到zookeeper上。这样的好处是一旦读取某个message的consumer失败了,这条message的offsite我们自己维护,我们不会+1。下次再启动的时候,还会从这个offsite开始读。

4容错

数据备份:
以partition为单位备份副本,副本数可设置。当副本数为N时,代表1个leader,N-1个followers,followers可以视为leader的consumer,拉取leader的消息,append到自己的系统中。

Leader选择:
ISR:某个分区内同步中的node组成一个集合,即该分区的ISR,ISR是followers的子集。
当leader处于非同步中时,系统从ISR中的followers中选举新leader。

当某个follower状态变为非同步中时,leader会将此follower剔除ISR,当此follower恢复并完成数据同步之后再次进入ISR。

当某个follower状态变为非同步中时,leader会将此follower剔除ISR,当此follower恢复并完成数据同步之后再次进入ISR。
kafka有个保障:当producer生产消息时,只有当消息被所有ISR确认时,才表示该消息提交成功。只有提交成功的消息,才能被consumer消费

当有N个副本时,N个副本都在ISR中,N-1个副本都出现异常时,系统依然能提供服务

假设N副本全挂了,node恢复后会面临同步数据的过程,这期间ISR中没有node,会导致该分区服务不可用。kafka采用一种降级措施来处理:选举第一个恢复的node作为leader提供服务,以它的数据为基准,这个措施被称为脏leader选举。

由于leader是主要提供服务的,kafka broker将多个partition的leader均分在不同的server上以均摊风险。

5 Zookeeper

Zookeeper 协调控制:

  • 管理broker与consumer的动态加入与离开。
  • 触发负载均衡,当broker或consumer加入或离开时会触发负载均衡算法,使得一 个consumer group内的多个consumer的消费负载平衡。
  • 维护消费关系及每个partition的消费信息。

Zookeeper上的细节:

  • 每个broker启动后会在zookeeper上注册一个临时的broker registry,包含broker的ip地址和端口号,所存储的topics和partitions信息。
  • 每个consumer启动后会在zookeeper上注册一个临时的consumer registry:包含consumer所属的consumer group以及订阅的topics。
  • 每个consumer group关联一个临时的owner registry和一个持久的offset registry。对于被订阅的每个partition包含一个owner registry,内容为订阅这个partition的consumer id;同时包含一个offset registry,内容为上一次订阅的offset。

同步复制&异步复制

同步复制:(默认)

  1. Producer联系zookeeper识别leader
  2. 向leader发送消息
  3. Leader收到消息写入本地log
  4. Follower从leader pull 消息
  5. Follower向本地写入log
  6. Follower向leader发送ack消息
  7. Leader收到所有的follower的ack消息
  8. Leader向producer发送ack

异步复制:
Leader在将消息写入本地log后,直接回传ack消息,不需要等待follower复制完成。

7好处

  1. 消费者可以根据需求,灵活指定offset消费

  2. 保证了消息不变性,为并发消费提供了线程安全的保证。每个consumer都保留自己的offset,互相之间不干扰,不存在线程安全问题

  3. 消息访问的并行高效性。每个topic中的消息被组织成多个partition,partition均匀分配到集群server中。生产、消费消息的时候,会被路由到指定partition,减少竞争,增加了程序的并行能力

  4. 增加消息系统的可伸缩性。每个topic中保留的消息可能非常庞大,通过partition将消息切分成多个子消息,并通过负责均衡策略将partition分配到不同server。这样当机器负载满的时候,通过扩容可以将消息重新均匀分配

  5. 保证消息可靠性。消息消费完成之后不会删除,可以通过重置offset重新消费,保证了消息不会丢失

  6. 灵活的持久化策略。可以通过指定时间段(如最近一天)来保存消息,节省broker存储空间

8 Windows上安装运行kafaka

8.1 下载压缩包

http://kafka.apache.org/downloads


说明:这个安装包包含了zookeeper的jar包,不需要在本地安装zookeeper了。

8.2 配置zookeeper.properties并启动zookeeper

位置:安装目录/config

注意:这个目录的斜杠格式要按照Linux的来。

启动zookeeper: >bin\windows\zookeeper-server-start.bat config\zookeeper.properties

启动成功后会在相应的目录生成zookeeper_data文件夹

8.3 配置server.properties并启动kafka

位置:安装目录/config

启动kafka:>bin\windows\kafka-server-start.bat config\server.properties

启动成功后会在相应的目录生成kafka-logs文件夹

注意:如果以上两步骤报jdk方面的错,就修改bin\windows\kafka-run-class.bat文件,给179行%CLASSPATH%添加上双引号。
Set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp "%CLASSPATH%" %KAFKA_OPTS% %*

8.4 创建一个topic

bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic t
est

8.5 查看topic列表

bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

8.6 启动一个Producer

bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

8.7 启动一个consumer

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

8.8 运行

最后在producer发送消息,就可以在consumer接收到了


9代码操作

9.1 依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>test</groupId>
    <artifactId>com.company.test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.0.1</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>
    
</project>

9.2 生产者 和 消费者


import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class Test {


    @org.junit.Test
    public void testSend(){

        Properties properties = new Properties();
        properties.put("metadata.broker.list","localhost:9092");
        properties.put("serializer.class","kafka.serializer.StringEncoder");
        properties.put("request.required.acks","1");//同步复制

        ProducerConfig producerConfig = new ProducerConfig(properties);
        Producer<String, String> producer = new Producer<String, String>(producerConfig);

        KeyedMessage<String, String> message = new KeyedMessage<String, String>("test","100","ggggggg");
        producer.send(message);
        System.out.println("send over");
    }

    @org.junit.Test
    public void testConsumer(){

        Properties properties = new Properties();
        properties.put("zookeeper.connect","localhost:2181");
        properties.put("group.id","g1");
        properties.put("zookeeper.session.timeout.ms","500");
        properties.put("zookeeper.sync.time.ms","250");
        properties.put("auto.commit.interval.ms","1000");

        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
        Map<String, Integer> map = new HashMap<String, Integer>();
        map.put("test",new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumerConnector.createMessageStreams(map);
        List<KafkaStream<byte[], byte[]>> list = messageStreams.get("test");
        for (KafkaStream<byte[], byte[]> stream : list) {
            ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
            while (iterator.hasNext()){
                byte[] message = iterator.next().message();
                System.out.println(message);
            }
        }
    }
}

9.3 代码操作更新版20190214

依赖


        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.0.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

代码

package com.base.web.aisino.kafka;

import com.base.web.aisino.domain.RecordDetailDO;
import com.base.web.aisino.service.ReStoreDetailRecordService;
import com.base.web.aisino.service.impl.ReStoreDetailRecordServiceImpl;
import com.base.web.system.filter.ChannelHeaderFilter;
import com.google.gson.Gson;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Properties;

/**
 * @author ah.zhanglei3@aisino.com
 * @ClassName
 * @Description
 * @Date 15:59 2019/2/13
 */
@Component
public class KafkaTool {
    private final Logger logger = LoggerFactory.getLogger(KafkaTool.class);

    @Autowired
    private ReStoreDetailRecordService reStoreDetailRecordService;

    /**
     * 接收kafka
     */
    public void received(){

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.220.184:10021,192.168.220.185:10020,192.168.220.186:10022");
        props.put("group.id", "test-group");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("jcpt"));

        Thread receivedKafkaThread = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(100);
                    ArrayList<RecordDetailDO> recordDetailDOS = new ArrayList<>();
                    for (ConsumerRecord<String, String> record : records) {
                        logger.info("received kafka message : partition:"+record.partition()+",topic:"+record.topic()
                                    +",offset:"+record.offset()+",value:"+record.value());
                        String detailRecordString = record.value();
                        Gson gson = new Gson();
                        RecordDetailDO recordDetailDO = gson.fromJson(detailRecordString, RecordDetailDO.class);
                        recordDetailDOS.add(recordDetailDO);
                    }
                    if (recordDetailDOS.size()>0){
                        reStoreDetailRecordService.batchSave(recordDetailDOS);
                    }
                }
            }
        });

        receivedKafkaThread.start();
    }



    /**
     * 发送String
     * @param msg
     */
    public void send(String msg){
        Properties properties = new Properties();
        properties.put("bootstrap.servers","192.168.220.184:10021,192.168.220.185:10020,192.168.220.186:10022");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("request.required.acks","1");//同步复制
        KafkaProducer kafkaProducer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String, String> record = new ProducerRecord<>("jcpt", msg);
        kafkaProducer.send(record);
    }
}

10kafka操作

  • 添加和删除 topics:
    bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name
    --partitions 20 --replication-factor 3 --config x=y
    ps:20个分区,3个副本
    bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name

  • 添加分区(不支持减少分区):
    bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name
    --partitions 40

  • 增加一个配置项:
    bin/kafka-configs.sh --zookeeper zk_host:port/chroot --entity-type topics --entity-name my_topic_name --alter --add-config x=y

  • 删除一个配置项:
    bin/kafka-configs.sh --zookeeper zk_host:port/chroot --entity-type topics --entity-name my_topic_name --alter --delete-config x

每当一个 borker 停止或崩溃时,该 borker 上的分区的leader 会转移到其他副本。这意味着,在 broker 重新启动时,默认情况下,它将只是所有分区的跟随者,这意味着它不会用于客户端的读取和写入。

  • 让Kafka集群尝试恢复已恢复副本的领导地位:
    bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
    也可以通过以下配置来自动配置Kafka:
    auto.leader.rebalance.enable=true

11问题

  • Kafka 对消费者分配分区规则
    Kafka有两种分区分配策略: RoundRobinAssignor+RangeAssignor
    RoundRobinAssignor:该策略把主题的所有分区逐个分配给消费者。如果使用 RoundRobinAssignor 策略来给消费者 C1 和消费者 C2 分配分区,那么消费者 C1 将分到主题 T1 的分区 0 和分区 2 以及主题 T2 的分区 1,消费者 C2 将分配到主题 T1 的分区 1 以及主题 T2 的分区 0 和分区 2。
    RangeAssignor:该策略会把主题的若干个连续的分区分配给消费者(kafka默认用该策略)。假设消费者 C1 和消费者 C2 同时 订阅了主题 T1 和主题 T2,并且每个主题有 3 个分区。那么消费者 C1 有可能分配到这 两个主题的分区 0 和分区 1,而消费者 C2 分配到这两个主题的分区 2。因为每个主题 拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消 费者更多的分区。
    可以通过配置partition.assignment.strategy参数指定用哪个策略

  • Offset往前读的问题
    消费者可以任意指定offset的位置,这个位置可以是以前的位置或者是现在的位置,但是当offset指定以后,就只能往后读取。

  • Follows和leader通信
    high watermark (HW):表示Partition各个replicas数据间同步且一致的offset位置,即表示allreplicas已经commit位置,每个Broker缓存中维护此信息,并不断更新。
    Kafka中每个Broker启动时都会创建一个副本管理服务(ReplicaManager),该服务负责维护ReplicaFetcherThread与其他Broker链路连接关系,该Broker中存在多少Follower的partitions对应leader partitions分布在不同的Broker上,有多少Broker就会创建相同数量的ReplicaFetcherThread线程同步对应partition数据,Kafka中partition间复制数据是由follower(扮演consumer角色)主动向leader获取消息,follower每次读取消息都会更新HW状态。每当Follower的partitions发生变更影响leader所在Broker变化时,ReplicaManager就会新建或销毁相应的ReplicaFetcherThread。

  • Broker是不是一个进程
    Kafka集群包含一个或多个服务器,这种服务器称为Broker,kafka启动以后,在服务器里面会有kafka进程。

  • 时间戳问题
    每条记录数据包含一个key,一个value和一个时间戳,在生产者发送消息的时候不需要指定时间戳,在方法内部封装的有时间戳。

12 SpringCloudStream 操作kafka

SpringCloudStream可以操作很多的消息队列,包括kafka,可以直接使用SpringCloudStream API,避免了直接使用kafka的API

12.1 maven

//上文的kafka的包就不需要了
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>

12.2 yml配置

#  #spring cloud steam 相关配置
  cloud:
    stream:
      bindings:
        #配置自己定义的通道与哪个中间件交互
        input: #Input和Output的值
          destination: test #目标主题
        output:
          destination: test
      default-binder: kafka #默认的binder是kafka
  kafka:
    bootstrap-servers: 192.168.220.184:10021,192.168.220.185:10020,192.168.220.186:10022 #kafka服务地址
    consumer:
      group-id: test-group
    producer:
      key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
      value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
      client-id: producer1

12.3 代码

接收

@Component
@EnableBinding(Sink.class)
public class KafkaReceive {
    private final Logger logger = LoggerFactory.getLogger(KafkaReceive.class);

    @Autowired
    private ReStoreDetailRecordService reStoreDetailRecordService;


    @StreamListener(Sink.INPUT)
    public void process(Message<?> message) {
        logger.info("监听到kafka数据->输入参数【message:"+message+"】");
        String receivedString = new String((byte[]) message.getPayload());
        logger.info("received kafka message : value:"+receivedString);
        Gson gson = new Gson();
        RecordDetailDO recordDetailDO = gson.fromJson(receivedString, RecordDetailDO.class);
        if (null!=recordDetailDO){
            logger.info("将kafka数据储存数据库"+recordDetailDO.toString());
            reStoreDetailRecordService.save(recordDetailDO);
        }
    }
}

发送

@EnableBinding(Source.class)
@Component
public class KafkaSender {

    private final Logger logger = LoggerFactory.getLogger(KafkaSender.class);

    @Autowired
    private Source source;

    public void sendMessage(String msg){
        try {
            logger.info("准备发送数据到kafka->数据:【message:"+msg+"】");
            source.output().send(MessageBuilder.withPayload(msg).build());
        }catch (Exception e){
            logger.info("准备发送数据到kafka->出错");
            e.printStackTrace();
        }
    }

}

13 还有一套kafkaTemplate也可以操作kafka,是springboot自己集成的kafka,自行搜索

参考链接:
http://kafka.apachecn.org/documentation.html
https://blog.csdn.net/qq_30130043/article/details/80098779
https://www.imooc.com/article/29209?block_id=tuijian_wz
https://blog.csdn.net/vinfly_li/article/details/79397201
https://blog.csdn.net/imgxr/article/details/80130878
https://blog.csdn.net/phantom_111/article/details/79903858
https://baijiahao.baidu.com/s?id=1608205621370302980&wfr=spider&for=pc
https://blog.csdn.net/wing_93/article/details/78513782
https://www.cnblogs.com/likehua/p/3999538.html
https://www.sohu.com/a/144225753_236714

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容