RocketMQ 5.消费者核心应用

1. PushConsumer核心参数详解

  • consumeFromWhere

CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费
CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费
CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费

  • allocateMessageQueueStrategy
    平均分配的实现算法

如果消费者的个数可以除尽队列的个数,那么就完全平均分。
如果不能除尽。那么靠前的消费者多消费一个队列,靠后的消费平均数个队列。
如果消费者的个数大于队列的个数,那么靠前的消费者消费一个队列,后面的不消费。

  • subscription
    订阅关系
  • offsetStore
    消费进度相关类
  • consumeThreadMin/consumeThreadMax
    最小使用者线程数/最大使用者线程数
  • consumeConcurrentlyMaxSpan
    同时最大跨度偏移,它对顺序消耗没有影响
  • pullThresholdForQueue
    队列级别的流控制阈值,默认情况下每个消息队列最多缓存1000条消息,参考pullBatchSize,瞬时值可能会超过限制
  • pullInterval/pullBatchSize
    消息拉取间隔/消息拉取大小
  • consumeMessageBatchMaxSize
    批量消费消息大小

2. PushConsumer消费模式-集群模式

  • Clustering模式(默认)
    GroupName用于把多个Consumer组织到一起
    相同GroupName的Consumer只消费所订阅消息的一部分
    目的:达到天然的负载均衡机制
public class Consumer1 {

    public Consumer1() {
        try {
            String group_name = "test_model_consumer_name1";
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
            consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
            consumer.subscribe("test_model_topic", "TagA");
            consumer.setMessageModel(MessageModel.CLUSTERING);
//            consumer.setMessageModel(MessageModel.BROADCASTING);
            consumer.registerMessageListener(new Listener());
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Consumer1 c1 = new Consumer1();
        System.out.println("c1 start..");

    }

    class Listener implements MessageListenerConcurrently {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            try {
                for (MessageExt msg : msgs) {
                    String topic = msg.getTopic();
                    String msgBody = new String(msg.getBody(), StandardCharsets.UTF_8);
                    String tags = msg.getTags();
                    //if(tags.equals("TagA")) {
                    System.out.println("收到消息:" + "  topic :" + topic + "  ,tags : " + tags + " ,msg : " + msgBody);
                    //}
                }
            } catch (Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

    }
}
public class Consumer2 {

    public Consumer2() {
        try {
            String group_name = "test_model_consumer_name2";
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
            consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
            consumer.subscribe("test_model_topic", "TagB");
            consumer.setMessageModel(MessageModel.CLUSTERING);
//            consumer.setMessageModel(MessageModel.BROADCASTING);
            consumer.registerMessageListener(new Listener());
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Consumer2 c2 = new Consumer2();
        System.out.println("c2 start..");

    }

    class Listener implements MessageListenerConcurrently {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            try {
                for (MessageExt msg : msgs) {
                    String topic = msg.getTopic();
                    String msgBody = new String(msg.getBody(), StandardCharsets.UTF_8);
                    String tags = msg.getTags();
                    //if(tags.equals("TagB")) {
                    System.out.println("收到消息:" + "  topic :" + topic + "  ,tags : " + tags + " ,msg : " + msgBody);
                    //}
                }
            } catch (Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

    }
}

public class Consumer3 {

    public Consumer3() {
        try {
            String group_name = "test_model_consumer_name2";
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
            consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
            consumer.subscribe("test_model_topic", "TagB");
            consumer.setMessageModel(MessageModel.CLUSTERING);
//            consumer.setMessageModel(MessageModel.BROADCASTING);
            consumer.registerMessageListener(new Listener());
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Consumer3 c2 = new Consumer3();
        System.out.println("c3 start..");

    }

    class Listener implements MessageListenerConcurrently {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            try {
                for (MessageExt msg : msgs) {
                    String topic = msg.getTopic();
                    String msgBody = new String(msg.getBody(), StandardCharsets.UTF_8);
                    String tags = msg.getTags();
                    //if(tags.equals("TagB")) {
                    System.out.println("收到消息:" + "  topic :" + topic + "  ,tags : " + tags + " ,msg : " + msgBody);
                    //}
                }
            } catch (Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

    }
}
其中Consumer1订阅Tag 为TagA, 主题 Consumer2、 Consumer3订阅Tag 为TagB
public class Producer {

    public static void main(String[] args) throws MQClientException, InterruptedException {
        String group_name = "test_model_producer_name";
        DefaultMQProducer producer = new DefaultMQProducer(group_name);
        producer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
        producer.start();

        for (int i = 0; i < 8; i++) {
            try {
                String tag = (i % 2 == 0) ? "TagA" : "TagB";
                Message msg = new Message("test_model_topic",// topic
                        tag,// tag
                        ("信息内容" + i).getBytes()// body
                );
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }
}
consumer1,2,3 先于Producer启动
Producer 运行输出:
SendResult [sendStatus=SEND_OK, msgId=0A01017D353418B4AAC221992CE10000, offsetMsgId=276A70F600002A9F0000000000004F6C, messageQueue=MessageQueue [topic=test_model_topic, brokerName=broker-a, queueId=0], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A01017D353418B4AAC221992D040001, offsetMsgId=276A70F600002A9F0000000000005022, messageQueue=MessageQueue [topic=test_model_topic, brokerName=broker-a, queueId=1], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A01017D353418B4AAC221992D0F0002, offsetMsgId=276A70F600002A9F00000000000050D8, messageQueue=MessageQueue [topic=test_model_topic, brokerName=broker-a, queueId=2], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A01017D353418B4AAC221992D180003, offsetMsgId=276A70F600002A9F000000000000518E, messageQueue=MessageQueue [topic=test_model_topic, brokerName=broker-a, queueId=3], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A01017D353418B4AAC221992D230004, offsetMsgId=276A70F600002A9F0000000000005244, messageQueue=MessageQueue [topic=test_model_topic, brokerName=broker-a, queueId=0], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=0A01017D353418B4AAC221992D2D0005, offsetMsgId=276A70F600002A9F00000000000052FA, messageQueue=MessageQueue [topic=test_model_topic, brokerName=broker-a, queueId=1], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=0A01017D353418B4AAC221992D3D0006, offsetMsgId=276A70F600002A9F00000000000053B0, messageQueue=MessageQueue [topic=test_model_topic, brokerName=broker-a, queueId=2], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=0A01017D353418B4AAC221992D470007, offsetMsgId=276A70F600002A9F0000000000005466, messageQueue=MessageQueue [topic=test_model_topic, brokerName=broker-a, queueId=3], queueOffset=3]
12:34:46.783 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[39.106.5.160:9876] result: true
12:34:46.785 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[39.106.112.246:10911] result: true
12:34:46.785 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[39.106.5.160:10911] result: true

Process finished with exit code 0

Consumer1,2,3示例运行输出
c1 start..
收到消息:  topic :test_model_topic  ,tags : TagA ,msg : 信息内容0
收到消息:  topic :test_model_topic  ,tags : TagA ,msg : 信息内容2
收到消息:  topic :test_model_topic  ,tags : TagA ,msg : 信息内容4
收到消息:  topic :test_model_topic  ,tags : TagA ,msg : 信息内容6
c2 start..
收到消息:  topic :test_model_topic  ,tags : TagB ,msg : 信息内容1
收到消息:  topic :test_model_topic  ,tags : TagB ,msg : 信息内容5
c3 start..
收到消息:  topic :test_model_topic  ,tags : TagB ,msg : 信息内容3
收到消息:  topic :test_model_topic  ,tags : TagB ,msg : 信息内容7
最佳实践:
假如生产队列是8个(queueId个数),那么消费者最好是2个、4个、8个(约数)

3. PushConsumer消费模式-广播模式

  • BROADCASTING模式(广播模式)
    同一个ConsumerGroup里的Consumer都消费订阅Topic全部信息
    也就是一条消息会被每一个Consumer消费
    setMessageModel方法

将 PushConsumer消费模式-集群模式的 Consumer1,2,3示例改为BROADCASTING模式(广播模式),同时按上述示例步骤运行

//            consumer.setMessageModel(MessageModel.CLUSTERING);
            consumer.setMessageModel(MessageModel.BROADCASTING);
Consumer1,2,3示例运行输出
c1 start..
收到消息:  topic :test_model_topic  ,tags : TagA ,msg : 信息内容0
收到消息:  topic :test_model_topic  ,tags : TagA ,msg : 信息内容2
收到消息:  topic :test_model_topic  ,tags : TagA ,msg : 信息内容4
收到消息:  topic :test_model_topic  ,tags : TagA ,msg : 信息内容6
c2 start..
收到消息:  topic :test_model_topic  ,tags : TagB ,msg : 信息内容1
收到消息:  topic :test_model_topic  ,tags : TagB ,msg : 信息内容3
收到消息:  topic :test_model_topic  ,tags : TagB ,msg : 信息内容7
收到消息:  topic :test_model_topic  ,tags : TagB ,msg : 信息内容5
c3 start..
收到消息:  topic :test_model_topic  ,tags : TagB ,msg : 信息内容7
收到消息:  topic :test_model_topic  ,tags : TagB ,msg : 信息内容3
收到消息:  topic :test_model_topic  ,tags : TagB ,msg : 信息内容1
收到消息:  topic :test_model_topic  ,tags : TagB ,msg : 信息内容5
结论 :
通过c2、c3输出得出同一个ConsumerGroup里的Consumer都消费订阅Topic全部信息

4.消息存储核心-偏移量Offset

Offset是消息消费进度的核心
Offset指某个topic下的一条消息在某个MessageQueue里的位置
通过Offset可以进行定位到这条消息
Offset的存储实现分为远程文件类型和本地文件类型两种

5.集群模式-RemoteBrokerOffsetStore解析

默认集群模式Clustering,采用远程文件存储Offset
本质上因为多消费模式,每个Consumer消费所订阅主题的一部分

6.广播模式-LocalFileOffsetStore解析

广播模式下,由于每个Consumer都会收到消息且消费
各个Consumer之间没有任何干扰,独立线程消费
所以使用LocalFileOffsetStore,也就是把Offset存储到本地

7.消费者长轮询模式分析

DefaultPushConsumer是使用长轮询模式进行实现的
通常主流消息获取模式:Push消息推送模式&Pull消息拉取模式
长轮询机制

consumer 拉取消息,对应的 queue 如果没有数据,broker 不会立即返回,而是以一种长轮询的方式处理,把 PullReuqest 保存起来,等待 queue 有了消息后,或者长轮询阻塞时间到了,再重新处理该 queue 上的所有 PullRequest。

rocketMQ 长轮询

8.RocketMQ消费者-PullConsumer使用

  • 消息拉取方式:DefaultMQPullConsumer
  • Pull方式主要了三件事:
    获取Message Queue并遍历
    维护OffsetStore
    根据不同的消息状态做不同的处理
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        String group_name = "test_pull_producer_name";
        DefaultMQProducer producer = new DefaultMQProducer(group_name);
        producer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
        producer.start();

        for (int i = 0; i < 10; i++) {
            try {
                Message msg = new Message("test_pull_topic",// topic
                        "TagA",// tag
                        ("Hello RocketMQ " + i).getBytes()// body
                );

                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(3000);
            }
        }

        producer.shutdown();
    }
}
public class PullConsumer {
    /**
     * Map<key, value>  key为指定的队列,value为这个队列拉取数据的最后位置
     * <p>
     * 重启,当前offset不会改变,因为offset没有存储到数据里,而只是存储到内存中
     */

    private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();

    public static void main(String[] args) throws MQClientException {

        String group_name = "test_pull_consumer_name";
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(group_name);
        consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
        consumer.start();
        System.err.println("consumer start");
        //  从TopicTest这个主题去获取所有的队列(默认会有4个队列)
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("test_pull_topic");
        //  遍历每一个队列,进行拉取数据
        for (MessageQueue mq : mqs) {
            System.out.println("Consume from the queue: " + mq);

            SINGLE_MQ:
            while (true) {
                try {
                    //  从queue中获取数据,从什么位置开始拉取数据 单次对多拉取32条记录
                    PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.println(pullResult);
                    System.out.println(pullResult.getPullStatus());
                    System.out.println();
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            List<MessageExt> list = pullResult.getMsgFoundList();
                            for (MessageExt msg : list) {
                                System.out.println(new String(msg.getBody()));
                            }
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            System.out.println("没有新的数据啦...");
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        consumer.shutdown();
    }


    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        offseTable.put(mq, offset);
    }


    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = offseTable.get(mq);
        if (offset != null)
            return offset;
        return 0;
    }

}

先运行Producer再运行PullConsumer
Producer运行结果

SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0B3F10000, offsetMsgId=276A70F600002A9F0000000000000E60, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=2], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0B7F00001, offsetMsgId=276A70F600002A9F0000000000000F18, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=3], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0BBE50002, offsetMsgId=276A70F600002A9F0000000000000FD0, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=0], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0BFDB0003, offsetMsgId=276A70F600002A9F0000000000001088, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=1], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0C3DA0004, offsetMsgId=276A70F600002A9F0000000000001140, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=2], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0C7D50005, offsetMsgId=276A70F600002A9F00000000000011F8, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=3], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0CBD90006, offsetMsgId=276A70F600002A9F00000000000012B0, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=0], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0CFDA0007, offsetMsgId=276A70F600002A9F0000000000001368, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=1], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0D3D00008, offsetMsgId=276A70F600002A9F0000000000001420, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=2], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0D7D00009, offsetMsgId=276A70F600002A9F00000000000014D8, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=3], queueOffset=7]
17:22:47.425 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[39.106.5.160:9876] result: true
17:22:47.430 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[39.106.112.246:10911] result: true
17:22:47.431 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[39.106.5.160:10911] result: true

PullConsumer 运行结果

consumer start
Consume from the queue: MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=3]
PullResult [pullStatus=FOUND, nextBeginOffset=5, minOffset=0, maxOffset=5, msgFoundList=5]
FOUND

Hello RocketMQ 3
Hello RocketMQ 7
Hello RocketMQ 1
Hello RocketMQ 5
Hello RocketMQ 9
PullResult [pullStatus=NO_NEW_MSG, nextBeginOffset=5, minOffset=0, maxOffset=5, msgFoundList=0]
NO_NEW_MSG

没有新的数据啦...
Consume from the queue: MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=0]
PullResult [pullStatus=FOUND, nextBeginOffset=5, minOffset=0, maxOffset=5, msgFoundList=5]
FOUND

Hello RocketMQ 0
Hello RocketMQ 4
Hello RocketMQ 8
Hello RocketMQ 2
Hello RocketMQ 6
PullResult [pullStatus=FOUND, nextBeginOffset=6, minOffset=0, maxOffset=6, msgFoundList=1]
FOUND
...省略
注意:重启,当前offset不会改变,因为offset没有存储到数据里,而只是存储到内存中
上述主动pull 消息问题:1. 没有同步远程更新消费offset 2.没有拉取消息的周期
优化
public class PullScheduleService {

    public static void main(String[] args) throws MQClientException {

        String group_name = "test_pull_consumer_name";

        final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService(group_name);

        scheduleService.getDefaultMQPullConsumer().setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);

        scheduleService.setMessageModel(MessageModel.CLUSTERING);

        scheduleService.registerPullTaskCallback("test_pull_topic", new PullTaskCallback() {

            @Override
            public void doPullTask(MessageQueue mq, PullTaskContext context) {
                MQPullConsumer consumer = context.getPullConsumer();
                System.err.println("-------------- queueId: " + mq.getQueueId() + "-------------");
                try {
                    // 获取从哪里拉取
                    long offset = consumer.fetchConsumeOffset(mq, false);
                    if (offset < 0)
                        offset = 0;

                    PullResult pullResult = consumer.pull(mq, "*", offset, 32);
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            List<MessageExt> list = pullResult.getMsgFoundList();
                            for (MessageExt msg : list) {
                                //消费数据...
                                System.out.println(new String(msg.getBody()));
                            }
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                    //更新ConsumeOffset 消费记录
                    consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
                    // 设置再过3000ms后重新拉取
                    context.setPullNextDelayTimeMillis(3000);

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        scheduleService.start();
    }
}

-------------- queueId: 3-------------
-------------- queueId: 0-------------
-------------- queueId: 2-------------
-------------- queueId: 1-------------
Hello RocketMQ 2
Hello RocketMQ 6
Hello RocketMQ 0
Hello RocketMQ 4
Hello RocketMQ 8
Hello RocketMQ 1
Hello RocketMQ 3
Hello RocketMQ 0
Hello RocketMQ 4
Hello RocketMQ 8
Hello RocketMQ 2
Hello RocketMQ 6
Hello RocketMQ 2
Hello RocketMQ 6
Hello RocketMQ 7
Hello RocketMQ 5
Hello RocketMQ 0
Hello RocketMQ 4
Hello RocketMQ 9
Hello RocketMQ 1
Hello RocketMQ 3
Hello RocketMQ 7
Hello RocketMQ 3
Hello RocketMQ 7
Hello RocketMQ 8
Hello RocketMQ 5
Hello RocketMQ 9
Hello RocketMQ 1
Hello RocketMQ 5
Hello RocketMQ 9
-------------- queueId: 0-------------
-------------- queueId: 3-------------
-------------- queueId: 2-------------
-------------- queueId: 1-------------
-------------- queueId: 0-------------
...省略

特别感谢:
阿神

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,366评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,521评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,689评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,925评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,942评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,727评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,447评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,349评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,820评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,990评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,127评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,812评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,471评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,017评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,142评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,388评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,066评论 2 355

推荐阅读更多精彩内容